In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('log_reg').getOrCreate()

In [2]:
df = spark.read.csv('Log_Reg_dataset.csv', inferSchema=True, header=True)

In [3]:
from pyspark.sql.functions import *

In [4]:
# check the shape of the data
print((df.count(), len(df.columns)))

(20000, 6)


In [5]:
#printSchema
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Repeat_Visitor: integer (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Web_pages_viewed: integer (nullable = true)
 |-- Status: integer (nullable = true)



In [6]:
df.columns

['Country', 'Age', 'Repeat_Visitor', 'Platform', 'Web_pages_viewed', 'Status']

In [7]:
df.show(5)

+---------+---+--------------+--------+----------------+------+
|  Country|Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|
+---------+---+--------------+--------+----------------+------+
|    India| 41|             1|   Yahoo|              21|     1|
|   Brazil| 28|             1|   Yahoo|               5|     0|
|   Brazil| 40|             0|  Google|               3|     0|
|Indonesia| 31|             1|    Bing|              15|     1|
| Malaysia| 32|             0|  Google|              15|     1|
+---------+---+--------------+--------+----------------+------+
only showing top 5 rows



In [10]:
df.describe().show()

+-------+--------+-----------------+-----------------+--------+-----------------+------------------+
|summary| Country|              Age|   Repeat_Visitor|Platform| Web_pages_viewed|            Status|
+-------+--------+-----------------+-----------------+--------+-----------------+------------------+
|  count|   20000|            20000|            20000|   20000|            20000|             20000|
|   mean|    null|         28.53955|           0.5029|    null|           9.5533|               0.5|
| stddev|    null|7.888912950773227|0.500004090187782|    null|6.073903499824976|0.5000125004687693|
|    min|  Brazil|               17|                0|    Bing|                1|                 0|
|    max|Malaysia|              111|                1|   Yahoo|               29|                 1|
+-------+--------+-----------------+-----------------+--------+-----------------+------------------+



In [11]:
df.groupBy('Country').count().show()

+---------+-----+
|  Country|count|
+---------+-----+
| Malaysia| 1218|
|    India| 4018|
|Indonesia|12178|
|   Brazil| 2586|
+---------+-----+



In [13]:
df.groupBy('Platform').count().show()

+--------+-----+
|Platform|count|
+--------+-----+
|   Yahoo| 9859|
|    Bing| 4360|
|  Google| 5781|
+--------+-----+



In [14]:
df.groupBy('Country').mean().show()

+---------+------------------+-------------------+---------------------+--------------------+
|  Country|          avg(Age)|avg(Repeat_Visitor)|avg(Web_pages_viewed)|         avg(Status)|
+---------+------------------+-------------------+---------------------+--------------------+
| Malaysia|27.792282430213465| 0.5730706075533661|   11.192118226600986|  0.6568144499178982|
|    India|27.976854156296664| 0.5433051269288203|   10.727227476356397|  0.6212045793927327|
|Indonesia| 28.43159796354081| 0.5207751683363442|    9.985711939563148|  0.5422893742814913|
|   Brazil|30.274168600154677|  0.322892498066512|    4.921113689095128|0.038669760247486466|
+---------+------------------+-------------------+---------------------+--------------------+



In [15]:
df.groupBy('Status').mean().show()

+------+--------+-------------------+---------------------+-----------+
|Status|avg(Age)|avg(Repeat_Visitor)|avg(Web_pages_viewed)|avg(Status)|
+------+--------+-------------------+---------------------+-----------+
|     1| 26.5435|             0.7019|              14.5617|        1.0|
|     0| 30.5356|             0.3039|               4.5449|        0.0|
+------+--------+-------------------+---------------------+-----------+



In [16]:
from pyspark.ml.feature import StringIndexer

In [19]:
#Indexing

search_engine_indexer = StringIndexer(inputCol="Platform", outputCol="PlatformNum").fit(df)
df = search_engine_indexer.transform(df)

In [20]:
df.show(3)

+-------+---+--------------+--------+----------------+------+-----------+
|Country|Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|PlatformNum|
+-------+---+--------------+--------+----------------+------+-----------+
|  India| 41|             1|   Yahoo|              21|     1|        0.0|
| Brazil| 28|             1|   Yahoo|               5|     0|        0.0|
| Brazil| 40|             0|  Google|               3|     0|        1.0|
+-------+---+--------------+--------+----------------+------+-----------+
only showing top 3 rows



In [21]:
from pyspark.ml.feature import OneHotEncoder

In [23]:
search_engine_encoder = OneHotEncoder(inputCol="PlatformNum", outputCol="Platform_Vector")
df = search_engine_encoder.transform(df)

In [24]:
df.show(3)

+-------+---+--------------+--------+----------------+------+-----------+---------------+
|Country|Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|PlatformNum|Platform_Vector|
+-------+---+--------------+--------+----------------+------+-----------+---------------+
|  India| 41|             1|   Yahoo|              21|     1|        0.0|  (2,[0],[1.0])|
| Brazil| 28|             1|   Yahoo|               5|     0|        0.0|  (2,[0],[1.0])|
| Brazil| 40|             0|  Google|               3|     0|        1.0|  (2,[1],[1.0])|
+-------+---+--------------+--------+----------------+------+-----------+---------------+
only showing top 3 rows



In [25]:
df.groupBy('Platform').count().orderBy('count', ascending=False).show(5)

+--------+-----+
|Platform|count|
+--------+-----+
|   Yahoo| 9859|
|  Google| 5781|
|    Bing| 4360|
+--------+-----+



In [26]:
df.groupby('PlatformNum').count().orderBy('count', ascending=False).show(5)

+-----------+-----+
|PlatformNum|count|
+-----------+-----+
|        0.0| 9859|
|        1.0| 5781|
|        2.0| 4360|
+-----------+-----+



In [27]:
country_indexer = StringIndexer(inputCol="Country", outputCol="Country_Num").fit(df)
df = country_indexer.transform(df)

In [28]:
df.select(['Country', 'Country_Num']).show(5)

+---------+-----------+
|  Country|Country_Num|
+---------+-----------+
|    India|        1.0|
|   Brazil|        2.0|
|   Brazil|        2.0|
|Indonesia|        0.0|
| Malaysia|        3.0|
+---------+-----------+
only showing top 5 rows



In [29]:
country_encoder = OneHotEncoder(inputCol="Country_Num", outputCol="Country_Vector")
df = country_encoder.transform(df)

In [30]:
df.select(['Country', 'country_Num', 'Country_Vector']).show(5)

+---------+-----------+--------------+
|  Country|country_Num|Country_Vector|
+---------+-----------+--------------+
|    India|        1.0| (3,[1],[1.0])|
|   Brazil|        2.0| (3,[2],[1.0])|
|   Brazil|        2.0| (3,[2],[1.0])|
|Indonesia|        0.0| (3,[0],[1.0])|
| Malaysia|        3.0|     (3,[],[])|
+---------+-----------+--------------+
only showing top 5 rows



In [32]:
df.groupBy('Country').count().orderBy('count', ascending=False).show(5)

+---------+-----+
|  Country|count|
+---------+-----+
|Indonesia|12178|
|    India| 4018|
|   Brazil| 2586|
| Malaysia| 1218|
+---------+-----+



In [33]:
from pyspark.ml.feature import VectorAssembler


In [36]:
df_assembler = VectorAssembler(inputCols=['Platform_Vector','Country_Vector','Age', 'Repeat_Visitor','Web_pages_viewed'], outputCol="features")
df = df_assembler.transform(df)

In [37]:
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Repeat_Visitor: integer (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Web_pages_viewed: integer (nullable = true)
 |-- Status: integer (nullable = true)
 |-- PlatformNum: double (nullable = false)
 |-- Platform_Vector: vector (nullable = true)
 |-- Country_Num: double (nullable = false)
 |-- Country_Vector: vector (nullable = true)
 |-- features: vector (nullable = true)



In [38]:
model_df = df.select(['features', 'Status'])

In [39]:
from pyspark.ml.classification import LogisticRegression


In [40]:
#split data
training_df, test_df=model_df.randomSplit([0.75, 0.25])

In [41]:
training_df.count()

15040

In [42]:
log_reg = LogisticRegression(labelCol='Status').fit(training_df)

In [44]:
train_results = log_reg.evaluate(training_df).predictions

In [46]:
train_results.filter(train_results['Status']==1).filter(train_results['prediction']==1).select(['Status', 'prediction', 'probability']).show(10)

+------+----------+--------------------+
|Status|prediction|         probability|
+------+----------+--------------------+
|     1|       1.0|[0.30379965106063...|
|     1|       1.0|[0.17016883547648...|
|     1|       1.0|[0.17016883547648...|
|     1|       1.0|[0.17016883547648...|
|     1|       1.0|[0.08789651442783...|
|     1|       1.0|[0.08789651442783...|
|     1|       1.0|[0.08789651442783...|
|     1|       1.0|[0.08789651442783...|
|     1|       1.0|[0.04332410704866...|
|     1|       1.0|[0.04332410704866...|
+------+----------+--------------------+
only showing top 10 rows



In [47]:
correct_preds=train_results.filter(train_results['Status']==1).filter(train_results['prediction']==1).count()

In [48]:
training_df.filter(training_df['Status']==1).count()


7530

In [49]:
#accuracy of training_dataset
float(correct_preds)/(training_df.filter(training_df['Status']==1).count())

0.9395750332005313