# Cell for testing

Use this cell to test temporary code.

In [0]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.util import MLUtils


test = sc.textFile("/home/denizhan/Downloads/test_dataset.txt")
test = test.map(parsePoint)

# Compute raw scores on the test set
svmPredictionAndLabels = test.map(lambda lp: (float(svmModel.predict(lp.features)), lp.label))

# Instantiate metrics object
svmMetrics = BinaryClassificationMetrics(svmPredictionAndLabels)

# Area under ROC curve
print("Area under ROC for SVMWithSGD = %s" % svmMetrics.areaUnderROC)

#metrics.rootMeanSquaredError

In [0]:
test.toDF().show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,0.0036231884...|  0.0|
|[0.00253164556962...|  0.0|
|[0.0,0.0036231884...|  0.0|
|[0.03544303797468...|  0.0|
|[0.0,0.0036231884...|  0.0|
|[0.00253164556962...|  0.0|
|[0.00253164556962...|  0.0|
|[0.0,0.0,0.0,0.0,...|  0.0|
|[0.01518987341772...|  0.0|
|[0.00253164556962...|  0.0|
|[0.0,0.0036231884...|  0.0|
|[0.00253164556962...|  0.0|
|[0.0,0.0036231884...|  0.0|
|[0.00253164556962...|  0.0|
|[0.00253164556962...|  0.0|
|[0.01518987341772...|  0.0|
|[0.01012658227848...|  0.0|
|[0.00506329113924...|  0.0|
|[0.0,0.0036231884...|  0.0|
|[0.00253164556962...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [0]:
randomTestLabelsAndPredictions = randomTestLabelsAndPredictions.toDF()

randomTestLabelsAndPredictions = randomTestLabelsAndPredictions[["_2", "_1"]]

In [0]:
randomTestPredictionsAndLabels

DataFrame[_2: double, _1: double]

In [0]:
test.map(lambda lp: (float(svmModel.predict(lp.features)), lp.label))

PythonRDD[933] at RDD at PythonRDD.scala:48

In [0]:
svmPredictionAndLabels

PythonRDD[932] at RDD at PythonRDD.scala:48

# Likelihood Detection with PySpark on Ubuntu

![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)
![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
![](https://assets.ubuntu.com/v1/57a889f6-ubuntu-logo112.png)

> *"Because Windows is an ass"* 

> -- Anonymous

In this notebook, I'll train two classifiers to predict whether a user has bought a product in a given session. I will be using Apache Spark local mode in this notebook.

# Define the Problem

"How likely is the user going to make a purchase with the information provided?"

## Objectives

### Classify:
- Remove bias features. Explain why.
- Use any classification algorithm, tune model.
- Evaluate model with accuracy and other parameters.

### Score (Probability):
- Score any session between 0-1 indicating probability.
- Use any algorithm, tune model.
- Evaluate model with Mean Square Error, show model success.

Explain and discuss evalutation results and metrics. Implement using Machine Learning library Spark MLLib.

---


# Gather and organize the available information

## Main assumptions made:
- We do NOT have access to whether the product has sold (obviously)
- We do NOT have access to current_sale_amount. 

## Descriptions of variables from observation

- current_total_landing_count == Total landed pages
- current_other_landing_count == Total pages not involving product/landing
- current_product_landing_count == Total landed product pages
- current_cart_landing_count == Total landed cart pages
- current_sale_amount == TOTAL SOLD
- current_is_sale == PRODUCT SOLD
- current_avg_cart_amount == Average cart money
- current_avg_visited_product_price == Average product price
- referrer == How was the site found
- last_1_day_session_count == How many times did the user enter in the last day
- last_7_day_session_count == How many times did the user enter in the last week
- date == What is the date during the session

In [0]:
# (RUN)

# Initialize Spark
import findspark
findspark.init('/usr/local/spark')

#from pyspark import SparkContext, SparkConf, SQLContext
import pyspark
from pyspark import SparkContext, SparkConf, SQLContext

# Run Spark instance
sc = pyspark.SparkContext()

In [0]:
# (RUN)

# Import mllib algorithms
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.tree import DecisionTree

# Import sklearn
import pandas as pd


# Read from csv
#X = sc.textFile("/home/denizhan/Downloads/Likelihood_to_puchase_sample_data.csv")
X = pd.read_csv("/home/denizhan/Downloads/Likelihood_to_puchase_sample_data.csv")
whole = X.iloc[:]

# Seperate sales from fails to describe and analyze later on.
sale = X[X['current_is_sale']==1]
fail = X[X['current_is_sale']==0]

# Drop the sale state from X, load to y for training.
y = X.pop("current_is_sale")

In [0]:
from IPython.display import display

print("Whole dataset")
display(whole.head())
display(whole.describe()) # Describe all sessions

Whole dataset


Unnamed: 0.1,Unnamed: 0,current_total_landing_count,current_other_landing_count,current_product_landing_count,current_cart_landing_count,current_sale_amount,current_is_sale,current_avg_cart_amount,current_avg_visited_product_price,referrer,last_1_day_session_count,last_7_day_session_count,date
0,1,1,1,0,0,0,0,0,0,google,0,0,2014-05-08 09:09:47
1,2,1,0,1,0,0,0,0,0,google,0,3,2014-05-09 18:33:46
2,3,1,1,0,0,0,0,0,0,google,0,1,2014-05-17 17:59:32
3,4,2,2,0,0,0,0,0,0,facebook,1,7,2014-05-18 15:33:13
4,5,1,0,1,0,0,0,69,0,other,13,13,2014-05-15 16:59:16


Unnamed: 0.1,Unnamed: 0,current_total_landing_count,current_other_landing_count,current_product_landing_count,current_cart_landing_count,current_sale_amount,current_is_sale,current_avg_cart_amount,current_avg_visited_product_price,last_1_day_session_count,last_7_day_session_count
count,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0
mean,430717.5,4.141388,2.388968,1.41343,0.104562,0.759889,0.005728,133.3423,24.301161,15.525838,93.699272
std,248674.720233,8.583689,5.095713,3.648843,1.073234,14.755843,0.075464,85660.48,36.291586,107.294186,749.340184
min,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,215359.25,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,430717.5,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
75%,646075.75,4.0,2.0,1.0,0.0,0.0,0.0,0.0,49.0,3.0,5.0
max,861434.0,588.0,395.0,276.0,287.0,2925.0,1.0,79287580.0,547.0,1250.0,7006.0


In [0]:
print("Fail")
display(fail.head())
display(fail.describe()) # Describe the sessions which have not sold

Fail


Unnamed: 0.1,Unnamed: 0,current_total_landing_count,current_other_landing_count,current_product_landing_count,current_cart_landing_count,current_sale_amount,current_is_sale,current_avg_cart_amount,current_avg_visited_product_price,referrer,last_1_day_session_count,last_7_day_session_count,date
0,1,1,1,0,0,0,0,0,0,google,0,0,2014-05-08 09:09:47
1,2,1,0,1,0,0,0,0,0,google,0,3,2014-05-09 18:33:46
2,3,1,1,0,0,0,0,0,0,google,0,1,2014-05-17 17:59:32
3,4,2,2,0,0,0,0,0,0,facebook,1,7,2014-05-18 15:33:13
4,5,1,0,1,0,0,0,69,0,other,13,13,2014-05-15 16:59:16


Unnamed: 0.1,Unnamed: 0,current_total_landing_count,current_other_landing_count,current_product_landing_count,current_cart_landing_count,current_sale_amount,current_is_sale,current_avg_cart_amount,current_avg_visited_product_price,last_1_day_session_count,last_7_day_session_count
count,856500.0,856500.0,856500.0,856500.0,856500.0,856500.0,856500.0,856500.0,856500.0,856500.0,856500.0
mean,430691.635119,4.024987,2.342592,1.37763,0.08198,0.0,0.0,133.5948,24.197647,15.507959,93.56177
std,248671.766492,8.132322,4.939385,3.509799,0.905087,0.0,0.0,85906.86,36.280884,107.233111,748.772775
min,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,215348.75,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,430680.5,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
75%,646012.25,4.0,2.0,1.0,0.0,0.0,0.0,0.0,49.0,3.0,5.0
max,861434.0,588.0,395.0,276.0,287.0,0.0,0.0,79287580.0,547.0,1250.0,7006.0


In [0]:
from IPython.display import display

print("Sale")
display(sale.head())
display(sale.describe()) # Describe the sessions which have sold

Sale


Unnamed: 0.1,Unnamed: 0,current_total_landing_count,current_other_landing_count,current_product_landing_count,current_cart_landing_count,current_sale_amount,current_is_sale,current_avg_cart_amount,current_avg_visited_product_price,referrer,last_1_day_session_count,last_7_day_session_count,date
158,159,8,3,2,2,74,1,74,87,other,0,19,2014-05-17 18:49:40
365,366,25,13,5,6,56,1,55,47,other,2,2,2014-05-07 18:45:03
720,721,17,9,4,3,99,1,58,99,google,0,6,2014-05-18 20:05:07
768,769,55,15,11,21,308,1,155,84,other,8,11,2014-05-18 12:46:53
825,826,8,2,2,3,215,1,134,0,google,0,0,2014-05-17 15:59:08


Unnamed: 0.1,Unnamed: 0,current_total_landing_count,current_other_landing_count,current_product_landing_count,current_cart_landing_count,current_sale_amount,current_is_sale,current_avg_cart_amount,current_avg_visited_product_price,last_1_day_session_count,last_7_day_session_count
count,4934.0,4934.0,4934.0,4934.0,4934.0,4934.0,4934.0,4934.0,4934.0,4934.0,4934.0
mean,435207.421159,24.347588,10.4394,7.628091,4.024524,132.670045,1.0,89.521889,42.270166,18.62951,117.568504
std,249171.498004,31.192766,15.269372,12.136184,6.59171,143.241857,0.0,96.529689,33.576671,117.386577,841.808283
min,159.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
25%,217184.25,6.0,1.0,1.0,1.0,59.0,1.0,30.25,11.0,0.0,0.0
50%,435455.0,14.0,5.0,3.0,2.0,99.0,1.0,64.5,42.5,1.0,2.0
75%,657047.0,30.0,13.0,9.0,4.75,165.0,1.0,116.0,68.0,4.0,7.0
max,861418.0,416.0,207.0,167.0,122.0,2925.0,1.0,1655.0,138.0,1225.0,6992.0


# Analysis of the general numerical data

---
### Observations

- `current_total_landing_count` mean is much higher than the failures by x6.
 - `current_other_landing_count` mean is higher by x5
 - `current_product_landing_count` mean is higher by x5.5
 - `current_cart_landing_count` mean is higher by a whopping x50!
- `current_sale_amount` mean is 0 if fail, above 0 if sale. Obvious bias.
- `current_avg_cart_amount` mean is interestingly lower by x0.5
- `current_avg_visited_product_price` mean is higher by x2
- `last_1_day_session_count` mean is higher only by x1.2
- `last_7_day_session_count` mean is similar: x1.25

`current_total_landing_count` is hierarchically the upper category which encompasses the other 3 landing counts. Hence, to prevent bias it is to be removed. **Check what the titanic dataset ahmed beysad(?) did for high category information.**

`current_sale_amount` mean is basically above 0 when a sale is made. Infinite bias, must be removed.


---
### Hypotheses

`current_cart_landing count` is more important than the other two landing subcategories, as it could be expected that those who are willing to buy are naturally going to ponder more on the cart page, returning again and again.


`current_avg_cart_amount` could be lower as the customer is most likely searching for a specific application, and can only afford so much.



`current_avg_visited_product_price` could be higher since, if our hypothesis that the customer is searching for a specific application is correct, they will more likely be searching through every product to find the correct one including those which are more expensive. It is very likely this is standard procedure for enterprises.

# Prepare features

Dummify string information and replace.

`referrer_other` dropped to avoid the dummy variable trap.

Index information dropped, assumed arbitrary.

`date` dropped until effective **day time parsing comes to fruition.**

`current_sale_amount` dropped.

In [0]:
from datetime import datetime
from datetime import date
import calendar

def date_string_to_weekday(date):
    datetime_object = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
    
    return datetime_object.weekday() # 0 - 6 monday tuesday etc.

def date_string_to_time_of_day(date):
    datetime_object = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
    
    hour = datetime_object.hour
    
    if 5 < hour < 12:
        return 0 # morning
    elif 12 < hour < 17:
        return 1 # afternoon
    elif 17 < hour < 21:
        return 2 # evening
    else:
        return 3 # night


def prep_features(X):
  # Referrers are dummified to get numerical information.
  referrer_dummies = pd.get_dummies(X['referrer'],prefix='referrer')
  X_refer = X.copy()
  X_refer = pd.concat([X,referrer_dummies],axis=1)
  # original refferer variable dropped since it is replaced.
  X_refer.drop('referrer',axis=1,inplace=True)
  # referrer_other dropped to avoid the dummy variable trap.
  X_refer.drop('referrer_other',axis=1,inplace=True)
  X = X_refer

  X["weekday"] = X["date"].apply(date_string_to_weekday)
  X["time_of_day"] = X["date"].apply(date_string_to_time_of_day)

  X.drop(X.columns[0],axis=1,inplace=True) # dropped as index information
  X.drop("date",axis=1,inplace=True) # dropped until effective day time parsing
  X.drop("current_sale_amount", 1, inplace=True)
  X.drop("last_1_day_session_count", 1, inplace=True)
  X.drop("current_total_landing_count", 1, inplace=True)
  return X

X = prep_features(X)

X.head()

sale = prep_features(sale)
fail = prep_features(fail)


# date = sale["date"].iloc[0]

# #datetime_object = datetime.strptime('Jun 1 2005  1:33PM', '%b %d %Y %I:%M%p')
# datetime_object = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')

# datetime_object.#.isoweekday()



#X = X[['last_1_day_session_count', 'referrer_google']].copy()

Create dataset with equal number of sales and fails for testing purposes

In [0]:
#both = pd.concat([sale.iloc[:4934], fail.iloc[:100000]], ignore_index=True)
both = pd.concat([sale.iloc[:], fail.iloc[:]], ignore_index=True)

display(both.describe())
display(both.head(10))

Unnamed: 0,current_other_landing_count,current_product_landing_count,current_cart_landing_count,current_is_sale,current_avg_cart_amount,current_avg_visited_product_price,last_7_day_session_count,referrer_facebook,referrer_google,weekday,time_of_day
count,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0,861434.0
mean,2.388968,1.41343,0.104562,0.005728,133.3423,24.301161,93.699272,0.102881,0.287885,3.3792,1.44248
std,5.095713,3.648843,1.073234,0.075464,85660.48,36.291586,749.340184,0.303803,0.452778,1.971747,1.172067
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0
50%,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,4.0,1.0
75%,2.0,1.0,0.0,0.0,0.0,49.0,5.0,0.0,1.0,5.0,3.0
max,395.0,276.0,287.0,1.0,79287580.0,547.0,7006.0,1.0,1.0,6.0,3.0


Unnamed: 0,current_other_landing_count,current_product_landing_count,current_cart_landing_count,current_is_sale,current_avg_cart_amount,current_avg_visited_product_price,last_7_day_session_count,referrer_facebook,referrer_google,weekday,time_of_day
0,3,2,2,1,74,87,19,0,0,5,2
1,13,5,6,1,55,47,2,0,0,2,2
2,9,4,3,1,58,99,6,0,1,6,2
3,15,11,21,1,155,84,11,0,0,6,3
4,2,2,3,1,134,0,0,0,1,5,1
5,8,5,3,1,58,61,0,0,1,0,0
6,0,1,2,1,129,39,1,0,0,2,2
7,4,1,1,1,67,44,14,0,0,5,1
8,0,1,1,1,73,109,17,0,0,4,1
9,5,2,5,1,171,82,6,0,0,5,0


Seperate features and target of both

In [0]:
# Get index of sales as list
# Use list to get the X's of each sold item.

both_y = both.pop("current_is_sale")

Scale features

In [0]:
import numpy as np
from random import randint
from sklearn.preprocessing import MinMaxScaler

# (RUN) Scale
scaler = MinMaxScaler()
scaler.fit(X)
X = scaler.transform(X)
scaler.fit(both)
both = scaler.transform(both)
#validate = scaler.transform(validate)

Shuffle train and test dataframes

In [0]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    both, both_y, test_size=0.1, random_state=666, shuffle=True)

valid_set = (X_test, y_test)

In [0]:
display(X_train.shape)
display(y_train.shape)

(775290, 10)

(775290,)

In [0]:
spark_dataset = np.concatenate((y_train[:, None], X_train), axis=1)

In [0]:
test_dataset = np.concatenate((y_test[:, None], X_test), axis=1)

In [0]:
# from pyspark import SQLContext

# sqlCtx = SQLContext(sc)
# training = sqlCtx.createDataFrame(spark_dataset)

In [0]:
# rdd = sc.parallelize(spark_dataset)

In [0]:
# rdd.take(20)

In [0]:
np.savetxt('spark_dataset.txt', spark_dataset, delimiter=' ')

In [0]:
np.savetxt("test_dataset.txt", test_dataset, delimiter=' ')

In [0]:
# from pyspark.ml.classification import LogisticRegression

# spark = SparkSession(sc)

# # Load training data
# training = spark.read.format("libsvm").load("/home/denizhan/Downloads/spark_dataset.txt")

# lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# # Fit the model
# lrModel = lr.fit(training)

# # Print the coefficients and intercept for logistic regression
# print("Coefficients: " + str(lrModel.coefficients))
# print("Intercept: " + str(lrModel.intercept))

# # We can also use the multinomial family for binary classification
# mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# # Fit the model
# mlrModel = mlr.fit(training)

# # Print the coefficients and intercepts for logistic regression with multinomial family
# print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
# print("Multinomial intercepts: " + str(mlrModel.interceptVector))


## https://spark.apache.org/docs/2.1.0/mllib-linear-methods.html#logistic-regression

Limited-memory BFGS (L-BFGS or LM-BFGS) is an optimization algorithm in the family of quasi-Newton methods that approximates the Broyden–Fletcher–Goldfarb–Shanno (BFGS) algorithm using a limited amount of computer memory. It is a popular algorithm for parameter estimation in machine learning.

Very fast thanks to its localizing optimization feature, parallelizing the logistic regression significantly.

In [0]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("/home/denizhan/Downloads/spark_dataset.txt")
parsedData = data.map(parsePoint)

# Build the model
logModel = LogisticRegressionWithLBFGS.train(parsedData, initialWeights=[1000000000,1])

# Evaluating the model on training data
logLabelsAndPreds = parsedData.map(lambda p: (p.label, logModel.predict(p.features)))
trainErr = logLabelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
#logModel.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
#sameModel = LogisticRegressionModel.load(sc,
#                                         "target/tmp/pythonLogisticRegressionWithLBFGSModel")

Training Error = 0.009765378116575733


## https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html#threshold-tuning

In [0]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import RegressionMetrics


test = sc.textFile("/home/denizhan/Downloads/test_dataset.txt")
test = test.map(parsePoint)

# Compute raw scores on the test set
logPredictionAndLabels = test.map(lambda lp: (float(logModel.predict(lp.features)), lp.label))

# Instantiate metrics object
logMetrics = BinaryClassificationMetrics(logPredictionAndLabels)

# Area under ROC curve
print("Area under ROC for LogisticRegressionWithLBFGS = %s" % logMetrics.areaUnderROC)

metrics = RegressionMetrics(logPredictionAndLabels)

# Root mean squared error
print("RMSE = %s" % metrics.rootMeanSquaredError)

Area under ROC for LogisticRegressionWithLBFGS = 0.5247364014034257
RMSE = 0.10043784162215795


In [0]:
from pyspark.mllib.evaluation import MulticlassMetrics
metrics = MulticlassMetrics(logPredictionAndLabels)

precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)
print("Accuracy = %s" % metrics.accuracy)

Summary Stats
Precision = 0.9899122399702823
Recall = 0.9899122399702823
F1 Score = 0.9899122399702823
Accuracy = 0.9899122399702823


## https://spark.apache.org/docs/2.3.0/mllib-linear-methods.html#linear-support-vector-machines-svms

In [0]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

data = sc.textFile("/home/denizhan/Downloads/spark_dataset.txt")
parsedData = data.map(parsePoint)

# Build the model
svmModel = SVMWithSGD.train(parsedData, iterations=100)

# Evaluating the model on training data
svmLabelsAndPreds = parsedData.map(lambda p: (p.label, svmModel.predict(p.features)))
trainErr = svmLabelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
svmModel.save(sc, "target/tmp/pythonSVMWithSGDModel")
sameModel = SVMModel.load(sc, "target/tmp/pythonSVMWithSGDModel")

Training Error = 0.005728179132969599


In [0]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.util import MLUtils


test = sc.textFile("/home/denizhan/Downloads/test_dataset.txt")
test = test.map(parsePoint)

# Compute raw scores on the test set
svmPredictionAndLabels = test.map(lambda lp: (float(svmModel.predict(lp.features)), lp.label))

# Instantiate metrics object
svmMetrics = BinaryClassificationMetrics(svmPredictionAndLabels)

# Area under ROC curve
print("Area under ROC for SVMWithSGD = %s" % svmMetrics.areaUnderROC)

#metrics.rootMeanSquaredError

Area under ROC for SVMWithSGD = 0.5


In [0]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

data = sc.textFile("/home/denizhan/Downloads/spark_dataset.txt")
parsedData = data.map(parsePoint)

# Load and parse the data file into an RDD of LabeledPoint.
#data = MLUtils.loadLibSVMFile(sc, '/home/denizhan/Downloads/spark_dataset.txt')
# Split the data into training and test sets (30% held out for testing)
#(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
randomModel = RandomForest.trainClassifier(parsedData, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=3, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)

# Evaluate model on test instances and compute test error
randomPredictions = randomModel.predict(parsedData.map(lambda x: x.features))
randomLabelsAndPredictions = parsedData.map(lambda lp: lp.label).zip(randomPredictions)
testErr = randomLabelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(test.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(randomModel.toDebugString())

# Save and load model
#randomModel.save(sc, "target/tmp/myRandomForestClassificationModel")
#sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")

Test Error = 0.05148356240713224
Learned classification forest model:
TreeEnsembleModel classifier with 3 trees

  Tree 0:
    If (feature 4 <= 9.140767824497258E-4)
     If (feature 2 <= 0.0017421602787456446)
      If (feature 5 <= 3.5683699685983445E-4)
       If (feature 3 <= 6.306158071823054E-9)
        Predict: 0.0
       Else (feature 3 > 6.306158071823054E-9)
        Predict: 0.0
      Else (feature 5 > 3.5683699685983445E-4)
       If (feature 8 <= 0.75)
        Predict: 0.0
       Else (feature 8 > 0.75)
        Predict: 0.0
     Else (feature 2 > 0.0017421602787456446)
      If (feature 2 <= 0.005226480836236934)
       If (feature 7 <= 0.5)
        Predict: 0.0
       Else (feature 7 > 0.5)
        Predict: 0.0
      Else (feature 2 > 0.005226480836236934)
       If (feature 6 <= 0.5)
        Predict: 0.0
       Else (feature 6 > 0.5)
        Predict: 0.0
    Else (feature 4 > 9.140767824497258E-4)
     If (feature 2 <= 0.005226480836236934)
      If (feature 2 <= 0.001742

In [0]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.util import MLUtils
from sklearn.metrics import roc_auc_score
# For scoring accuracy of classification


test = sc.textFile("/home/denizhan/Downloads/test_dataset.txt")
parsedTest = test.map(parsePoint)

# Compute raw scores on the test set
#test.map(lambda lp: lp.label).zip(randomPredictions)
randomTestPredictions = randomModel.predict(parsedTest.map(lambda x: x.features))
randomTestLabelsAndPredictions = parsedTest.map(lambda lp: lp.label).zip(randomTestPredictions)
#randomTestPredictionAndLabels = test.map(lambda lp: randomTestPredictions.zip(lp.label))

randomTestLabelsAndPredictions = randomTestLabelsAndPredictions.toDF()
randomTestPredictionsAndLabels = randomTestLabelsAndPredictions[["_2", "_1"]] # Switched around

roc_auc_score(np.array(randomTestPredictionsAndLabels.select("_2").collect()), np.array(randomTestLabelsAndPredictions.select("_1").collect()))

#metrics.rootMeanSquaredError

# ValueError: Only one class present in y_true. ROC AUC score is not defined in that case.
# Since we learn that there is only one class in y, we understand that the model predicts all zero,
# hence a poor model.

In [0]:
zeroAndLabels = np.concatenate((np.zeros(y_test.shape)[:,None], y_test[:, None]))

# For classification score
from sklearn.metrics import roc_auc_score
# For scoring accuracy of classification
from sklearn.metrics import accuracy_score
# For evaulating the Mean Squared Error of the model's scoring
from sklearn.metrics import mean_squared_error
from sklearn.metrics import average_precision_score

display(roc_auc_score(y_test[:, None], np.zeros(y_test.shape)[:,None]))
display(mean_squared_error(y_test[:, None], np.zeros(y_test.shape)[:,None]))
display(accuracy_score(y_test[:, None], np.zeros(y_test.shape)[:,None]))

Baseline with every prediction = 0

In [0]:
zeroAndLabels = sc.parallelize(np.concatenate((np.zeros(y_test.shape)[:,None], y_test[:, None])))
zeroAndLabels = zeroAndLabels.map(parsePoint)

# Instantiate metrics object
metrics = BinaryClassificationMetrics(zeroAndLabels)

# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

# zeroAndLabels = test.map(lambda lp: (float(index=np.arange(len(data)), lp.label)))
# df.withColumn('c1', when(df.c1.isNotNull(), 1))

In [0]:
zero.shape

In [0]:
zeroAndLabels.toDF().show()

In [0]:
logPredictionAndLabels.distinct().count()

4

In [0]:
df = logPredictionAndLabels.toDF()

In [0]:
df.describe()

DataFrame[summary: string, _1: string, _2: string]

In [0]:
df.printSchema()

root
 |-- _1: double (nullable = true)
 |-- _2: double (nullable = true)



In [0]:
from pyspark.sql import functions as F

df.select(df._1, F.when(df._1 > 0.5, 1).when(df._2 > 0.5, 1).otherwise(0)).show()

display(df[(df._1 > 0.5) & (df._2 > 0.5)].count())

display(df[(df._1 > 0.5)].count())
    
display(df[(df._2 > 0.5)].count())

+---+-------------------------------------------------------------+
| _1|CASE WHEN (_1 > 0.5) THEN 1 WHEN (_2 > 0.5) THEN 1 ELSE 0 END|
+---+-------------------------------------------------------------+
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                                            0|
|0.0|                                           

27

424

499

# TO DO
- 


# DONE
- Make Dataframe with all zeros