In [38]:
import os
import sys
import requests
requests.packages.urllib3.disable_warnings()
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import pyspark
from pyspark import SQLContext
from pyspark.sql.functions import min, max, col, count, array
from pyspark.sql import SparkSession, functions as F
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import VectorAssembler, VectorSizeHint, StringIndexer, ChiSqSelector
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", True)
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize",10000)
spark.conf.set("spark.sql.shuffle.partitions",100)

In [73]:
df = spark.read.json('medium-sparkify-event-data.json')
df = df.withColumn('sessionId', col('sessionId').cast('int')) \
    .withColumn('ts', col('ts').cast('int')).dropDuplicates()
df = df.filter(df.userId != "").persist()
ac = df.columns
train_data, test_data = df.randomSplit([.9999, .0001], seed = 1234)

In [74]:
train_data, test_data = test_data.randomSplit([.8,.2],seed=1234)

In [75]:
train_data.show(1, vertical = True)

-RECORD 0-----------------------------
 artist        | null                 
 auth          | Logged In            
 firstName     | Allisson             
 gender        | F                    
 itemInSession | 121                  
 lastName      | Oneill               
 length        | null                 
 level         | paid                 
 location      | Mansfield, OH        
 method        | PUT                  
 page          | Add Friend           
 registration  | 1532260956000        
 sessionId     | 2757                 
 song          | null                 
 status        | 307                  
 ts            | 1039871736           
 userAgent     | Mozilla/5.0 (X11;... 
 userId        | 204                  
only showing top 1 row



In [76]:
col_names = []
for each in paid.columns:
    col_names.append(each + "Index")

In [93]:

indexer = StringIndexer(inputCols = paid.columns, outputCols = col_names, handleInvalid = 'skip')
vecAssembler = VectorAssembler(inputCols=col_names, outputCol = 'features')
css = ChiSqSelector(featuresCol = 'features', outputCol="selectedFeatures", labelCol = 'sessionId', fpr = 0.05)
lr = LogisticRegression(labelCol = "levelIndex", featuresCol = "selectedFeatures", predictionCol = 'prediction', \
                        rawPredictionCol = 'rawPrediction', maxIter = 10)
pipeline = Pipeline(stages = [indexer, vecAssembler, css, lr])

evaluator=BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='levelIndex')

In [94]:
tr_data = pipeline.fit(train_data).transform(train_data)
tr_data.show(1, vertical = True)


-RECORD 0----------------------------------
 artist             | Santigold            
 auth               | Logged In            
 firstName          | Kaelyn               
 gender             | F                    
 itemInSession      | 96                   
 lastName           | Parker               
 length             | 234.91873            
 level              | paid                 
 location           | Sioux Falls, SD      
 method             | PUT                  
 page               | NextSong             
 registration       | 1531320812000        
 sessionId          | 1421                 
 song               | Starstruck           
 status             | 200                  
 ts                 | -2009606264          
 userAgent          | Mozilla/5.0 (X11;... 
 userId             | 184                  
 lengthIndex        | 22.0                 
 firstNameIndex     | 32.0                 
 lastNameIndex      | 29.0                 
 userAgentIndex     | 24.0      

In [55]:
tt_data = pipeline.fit(test_data).transform(test_data)

In [87]:
tr_data.select('levelIndex', 'rawPrediction', 'prediction', 'probability').show(5)

+----------+--------------------+----------+--------------------+
|levelIndex|       rawPrediction|prediction|         probability|
+----------+--------------------+----------+--------------------+
|       0.0|[9.12961165811446...|       0.0|[0.99989160407478...|
|       0.0|[9.81440319670280...|       0.0|[0.99994534434400...|
|       1.0|[-10.679374866924...|       1.0|[2.30142289937649...|
|       0.0|[8.73824161721772...|       0.0|[0.99983969013188...|
|       1.0|[-11.145114341387...|       1.0|[1.44454830065698...|
+----------+--------------------+----------+--------------------+
only showing top 5 rows



In [86]:
print("The area under ROC for train set is {}".format(evaluator.evaluate(tr_data)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(tt_data)))

The area under ROC for train set is 1.0
The area under ROC for test set is 1.0


In [47]:
model = indexer.fit(paid)
indexed = model.transform(paid)
indexed.columns

Exception ignored in: <function JavaWrapper.__del__ at 0x7fa803688ae8>
Traceback (most recent call last):
  File "/home/duecer2/anaconda3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'StringIndexer' object has no attribute '_java_obj'
Exception ignored in: <function JavaWrapper.__del__ at 0x7fa803688ae8>
Traceback (most recent call last):
  File "/home/duecer2/anaconda3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'StringIndexer' object has no attribute '_java_obj'
Exception ignored in: <function JavaWrapper.__del__ at 0x7fa803688ae8>
Traceback (most recent call last):
  File "/home/duecer2/anaconda3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeErr

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId',
 'lengthIndex',
 'firstNameIndex',
 'lastNameIndex',
 'userAgentIndex',
 'tsIndex',
 'itemInSessionIndex',
 'pageIndex',
 'methodIndex',
 'songIndex',
 'levelIndex',
 'genderIndex',
 'userIdIndex',
 'registrationIndex',
 'artistIndex',
 'sessionIdIndex',
 'locationIndex',
 'authIndex',
 'statusIndex']

In [34]:
label_stringIdx =  StringIndexer(inputCol="sessionIdIndex", outputCol="sessionId_label")
m1 = label_stringIdx.fit(indexed)
m1 = m1.transform(indexed)

In [None]:

pipeline = Pipeline(stages = [indexer, vecAssembler, sizeHint, kMeans])