In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=d5c4abf552a049beba424eb22f891705c45e6574007cec6758e2728d1ace0b5f
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


Spark session and read

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('random_forest').getOrCreate()
df=spark.read.csv('affairs.csv',inferSchema=True,header=True)
print((df.count(), len(df.columns)))
df.printSchema()

Exploratory Data Analysis


In [None]:
df.show(5)

df.describe().select('summary','rate_marriage','age','yrs_married','children','religious').show()

df.groupBy('affairs').count().show()
df.groupBy('rate_marriage').count().show()

df.groupBy('rate_marriage','affairs').count().orderBy('rate_marriage','affairs','count',ascending=True).show()

df.groupBy('religious','affairs').count().orderBy('religious','affairs',"count",
ascending=True).show()

df.groupBy('children','affairs').count().orderBy("children",'affairs',"count",
ascending=True).show()

df.groupBy('affairs').mean().show()


Feature engineering

In [None]:
from pyspark.ml.feature import VectorAssembler
df_assembler = VectorAssembler(inputCols=['rate_marriage',
'age', 'yrs_married', 'children',
'religious'], outputCol="features")
df = df_assembler.transform(df)
df.show(10, False)


df.select(['features','affairs']).show(10,False)
model_df=df.select(['features','affairs'])

+-------------+----+-----------+--------+---------+-------+-----------------------+
|rate_marriage|age |yrs_married|children|religious|affairs|features               |
+-------------+----+-----------+--------+---------+-------+-----------------------+
|5            |32.0|6.0        |1.0     |3        |0      |[5.0,32.0,6.0,1.0,3.0] |
|4            |22.0|2.5        |0.0     |2        |0      |[4.0,22.0,2.5,0.0,2.0] |
|3            |32.0|9.0        |3.0     |3        |1      |[3.0,32.0,9.0,3.0,3.0] |
|3            |27.0|13.0       |3.0     |1        |1      |[3.0,27.0,13.0,3.0,1.0]|
|4            |22.0|2.5        |0.0     |1        |1      |[4.0,22.0,2.5,0.0,1.0] |
|4            |37.0|16.5       |4.0     |3        |1      |[4.0,37.0,16.5,4.0,3.0]|
|5            |27.0|9.0        |1.0     |1        |1      |[5.0,27.0,9.0,1.0,1.0] |
|4            |27.0|9.0        |0.0     |2        |1      |[4.0,27.0,9.0,0.0,2.0] |
|5            |37.0|23.0       |5.5     |2        |1      |[5.0,37.0,23.0,5.

Splitting the dataset

In [None]:
train_df,test_df=model_df.randomSplit([0.75,0.25])
print(train_df.count())
train_df.groupBy('affairs').count().show()

4822
+-------+-----+
|affairs|count|
+-------+-----+
|      1| 1562|
|      0| 3260|
+-------+-----+



Build an RF Model

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf_classifier=RandomForestClassifier(labelCol='affairs',
numTrees=51).fit(train_df)

rf_predictions=rf_classifier.transform(test_df)
rf_predictions.show()

rf_predictions.groupBy('prediction').count().show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|affairs|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[1.0,17.5,0.5,0.0...|      0|[35.1697605685447...|[0.68960314840283...|       0.0|
|[1.0,22.0,2.5,0.0...|      1|[18.5316630555468...|[0.36336594226562...|       1.0|
|[1.0,22.0,2.5,1.0...|      0|[20.8589313508405...|[0.40899865393804...|       1.0|
|[1.0,27.0,2.5,0.0...|      1|[21.0618308428013...|[0.41297707534904...|       1.0|
|[1.0,27.0,2.5,0.0...|      1|[21.0618308428013...|[0.41297707534904...|       1.0|
|[1.0,27.0,6.0,1.0...|      1|[19.5695584432387...|[0.38371683222036...|       1.0|
|[1.0,27.0,6.0,1.0...|      0|[19.2482143618197...|[0.37741596787881...|       1.0|
|[1.0,27.0,6.0,1.0...|      1|[19.2125939870234...|[0.37671752915732...|       1.0|
|[1.0,27.0,6.0,3.0...|      0|[16.3067449836652...|[0.31974009771892...|    

Accuracy

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf_accuracy=MulticlassClassificationEvaluator(labelCol="affairs",
metricName='accuracy').evaluate(rf_predictions)

print('The accuracy of RF on test data is {0:.0%}'.format(rf_accuracy))

The accuracy of RF on test data is 72%


Precision

In [None]:
rf_precision=MulticlassClassificationEvaluator(labelCol="affairs",
metricName='weightedPrecision').evaluate(rf_predictions)

print('The precision rate on test data is {0:.0%}'.format(rf_precision))

The precision rate on test data is 70%


AUC

In [None]:
rf_auc=BinaryClassificationEvaluator(labelCol='affairs').evaluate(rf_predictions)

print( rf_auc)

rf_classifier.featureImportances

df.schema["features"].metadata["ml_attr"]["attrs"]

0.7300516611446684


{'numeric': [{'idx': 0, 'name': 'rate_marriage'},
  {'idx': 1, 'name': 'age'},
  {'idx': 2, 'name': 'yrs_married'},
  {'idx': 3, 'name': 'children'},
  {'idx': 4, 'name': 'religious'}]}

# Log_Reg_dataset.csv

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('random_forest').getOrCreate()
df=spark.read.csv('Log_Reg_dataset.csv',inferSchema=True,header=True)
print((df.count(), len(df.columns)))
df.printSchema()

(20000, 6)
root
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Repeat_Visitor: integer (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Web_pages_viewed: integer (nullable = true)
 |-- Status: integer (nullable = true)



Exploratory Data Analysis

In [None]:
df.show(5)

+---------+---+--------------+--------+----------------+------+
|  Country|Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|
+---------+---+--------------+--------+----------------+------+
|    India| 41|             1|   Yahoo|              21|     1|
|   Brazil| 28|             1|   Yahoo|               5|     0|
|   Brazil| 40|             0|  Google|               3|     0|
|Indonesia| 31|             1|    Bing|              15|     1|
| Malaysia| 32|             0|  Google|              15|     1|
+---------+---+--------------+--------+----------------+------+
only showing top 5 rows



In [None]:
df.show(5)

df.describe().select('summary','Country','Age','Repeat_Visitor','Platform','Web_pages_viewed').show()

df.groupBy('Platform').count().show()
df.groupBy('Country').count().show()

df.groupBy('Country','Platform').count().orderBy('Country','Platform','count',ascending=True).show()


df.groupBy('Web_pages_viewed').mean().show()

+---------+---+--------------+--------+----------------+------+
|  Country|Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|
+---------+---+--------------+--------+----------------+------+
|    India| 41|             1|   Yahoo|              21|     1|
|   Brazil| 28|             1|   Yahoo|               5|     0|
|   Brazil| 40|             0|  Google|               3|     0|
|Indonesia| 31|             1|    Bing|              15|     1|
| Malaysia| 32|             0|  Google|              15|     1|
+---------+---+--------------+--------+----------------+------+
only showing top 5 rows

+-------+--------+-----------------+-----------------+--------+-----------------+
|summary| Country|              Age|   Repeat_Visitor|Platform| Web_pages_viewed|
+-------+--------+-----------------+-----------------+--------+-----------------+
|  count|   20000|            20000|            20000|   20000|            20000|
|   mean|    null|         28.53955|           0.5029|    null|        

String Index

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [None]:
Platform_indexer =StringIndexer(inputCol="Platform",
outputCol="Platform_Num").fit(df)
df = Platform_indexer.transform(df)
df.show(5,False)

+---------+---+--------------+--------+----------------+------+------------+
|Country  |Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|Platform_Num|
+---------+---+--------------+--------+----------------+------+------------+
|India    |41 |1             |Yahoo   |21              |1     |0.0         |
|Brazil   |28 |1             |Yahoo   |5               |0     |0.0         |
|Brazil   |40 |0             |Google  |3               |0     |1.0         |
|Indonesia|31 |1             |Bing    |15              |1     |2.0         |
|Malaysia |32 |0             |Google  |15              |1     |1.0         |
+---------+---+--------------+--------+----------------+------+------------+
only showing top 5 rows



In [None]:
country_indexer = StringIndexer(inputCol="Country",
outputCol="Country_Num").fit(df)
df = country_indexer.transform(df)


In [None]:
df.show(5)

+---------+---+--------------+--------+----------------+------+------------+-----------+
|  Country|Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|Platform_Num|Country_Num|
+---------+---+--------------+--------+----------------+------+------------+-----------+
|    India| 41|             1|   Yahoo|              21|     1|         0.0|        1.0|
|   Brazil| 28|             1|   Yahoo|               5|     0|         0.0|        2.0|
|   Brazil| 40|             0|  Google|               3|     0|         1.0|        2.0|
|Indonesia| 31|             1|    Bing|              15|     1|         2.0|        0.0|
| Malaysia| 32|             0|  Google|              15|     1|         1.0|        3.0|
+---------+---+--------------+--------+----------------+------+------------+-----------+
only showing top 5 rows



Feature Engineering

In [None]:
df_assembler = VectorAssembler(inputCols=['Platform_Num','Country_Num','Age', 'Repeat_Visitor',
'Web_pages_viewed'], outputCol="features")
df = df_assembler.transform(df)
df.show(10, False)


df.select(['features','Status']).show(10,False)
model_df=df.select(['features','Status'])

+---------+---+--------------+--------+----------------+------+------------+-----------+-----------------------+
|Country  |Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|Platform_Num|Country_Num|features               |
+---------+---+--------------+--------+----------------+------+------------+-----------+-----------------------+
|India    |41 |1             |Yahoo   |21              |1     |0.0         |1.0        |[0.0,1.0,41.0,1.0,21.0]|
|Brazil   |28 |1             |Yahoo   |5               |0     |0.0         |2.0        |[0.0,2.0,28.0,1.0,5.0] |
|Brazil   |40 |0             |Google  |3               |0     |1.0         |2.0        |[1.0,2.0,40.0,0.0,3.0] |
|Indonesia|31 |1             |Bing    |15              |1     |2.0         |0.0        |[2.0,0.0,31.0,1.0,15.0]|
|Malaysia |32 |0             |Google  |15              |1     |1.0         |3.0        |[1.0,3.0,32.0,0.0,15.0]|
|Brazil   |32 |0             |Google  |3               |0     |1.0         |2.0        |[1.0,2.0

Splitting the Dataset

In [None]:
train_df,test_df=model_df.randomSplit([0.75,0.25])
print(train_df.count())
train_df.groupBy('Status').count().show()

14821
+------+-----+
|Status|count|
+------+-----+
|     1| 7403|
|     0| 7418|
+------+-----+



Building the RF Model

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf_classifier=RandomForestClassifier(labelCol='Status',
numTrees=51).fit(train_df)

rf_predictions=rf_classifier.transform(test_df)
rf_predictions.show()

rf_predictions.groupBy('prediction').count().show()

+--------------------+------+--------------------+--------------------+----------+
|            features|Status|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(5,[2,4],[17.0,1.0])|     0|[49.3331405364608...|[0.96731648110707...|       0.0|
|(5,[2,4],[17.0,1.0])|     0|[49.3331405364608...|[0.96731648110707...|       0.0|
|(5,[2,4],[17.0,2.0])|     0|[49.3331405364608...|[0.96731648110707...|       0.0|
|(5,[2,4],[17.0,2.0])|     0|[49.3331405364608...|[0.96731648110707...|       0.0|
|(5,[2,4],[17.0,3.0])|     0|[49.3331405364608...|[0.96731648110707...|       0.0|
|(5,[2,4],[17.0,3.0])|     0|[49.3331405364608...|[0.96731648110707...|       0.0|
|(5,[2,4],[17.0,4.0])|     0|[49.3331405364608...|[0.96731648110707...|       0.0|
|(5,[2,4],[17.0,5.0])|     0|[49.3331405364608...|[0.96731648110707...|       0.0|
|(5,[2,4],[17.0,5.0])|     0|[49.3331405364608...|[0.96731648110707...|       0.0|
|(5,

Accuracy

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf_accuracy=MulticlassClassificationEvaluator(labelCol="Status",
metricName='accuracy').evaluate(rf_predictions)

print('The accuracy of RF on test data is {0:.0%}'.format(rf_accuracy))

The accuracy of RF on test data is 94%


Precision

In [None]:
rf_precision=MulticlassClassificationEvaluator(labelCol="Status",
metricName='weightedPrecision').evaluate(rf_predictions)

print('The precision rate on test data is {0:.0%}'.format(rf_precision))

The precision rate on test data is 94%


AUC

In [None]:
rf_auc=BinaryClassificationEvaluator(labelCol='Status').evaluate(rf_predictions)

print( rf_auc)

rf_classifier.featureImportances

df.schema["features"].metadata["ml_attr"]["attrs"]

0.9823961211276675


{'numeric': [{'idx': 2, 'name': 'Age'},
  {'idx': 3, 'name': 'Repeat_Visitor'},
  {'idx': 4, 'name': 'Web_pages_viewed'}],
 'nominal': [{'vals': ['Yahoo', 'Google', 'Bing'],
   'idx': 0,
   'name': 'Platform_Num'},
  {'vals': ['Indonesia', 'India', 'Brazil', 'Malaysia'],
   'idx': 1,
   'name': 'Country_Num'}]}