# ***Classification with Pyspark***

### Author: Salma OUARDI
 Dataset : [StumbleUpon Evergreen Classification](https://www.kaggle.com/c/stumbleupon/data)



In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 58.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805911 sha256=5a91e26c70f4ddb906f168b74248c459e854804ce5d36efdff0557603d8bd357
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [None]:
from pyspark import SparkContext
sc=SparkContext()

# Features

## **A function to extract the features**

*The features we extracted here are all numeric. We excluded the first 4 culumns* **(url, urlid, boilerplate,alchemy_category)**. We also excluded the last column which contains the target **(—1 is evergreen, while 0 is non-evergreen)**.

In [None]:
def extract_features_dt(record):
  mylist=[]
  for i in record[4:25]:
    if i != '?':
      mylist.append(float(i))
    else:
      mylist.append(float(0))
  return np.array(mylist)

## **A function to extract the target or label**

*We extract the last column* **(-1 for evergreen and 0 for non-evergreen)**.

In [None]:
def extract_label(record):
  return int(record[-1])

## Change each record to LabeledPoint





> **LabeledPoint** *is a class that represents the features and labels of a data point.*



In [None]:
from pyspark.mllib.regression import LabeledPoint

In [None]:
lines = sc.textFile('train_noheader.tsv')
replace = lines.map(lambda row: row.replace("\"", ""))
split = replace.map(lambda row: row.split("\t"))
label = split.map(lambda p: int(p[-1]))
data = split.map(lambda r: LabeledPoint(extract_label(r), extract_features_dt(r)))
(training, test) = data.randomSplit([0.8,0.2])
training.cache()
test.cache()

PythonRDD[3] at RDD at PythonRDD.scala:53

# Models

## train models

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree

iterations = 10

# logistic regression
logisticModel = LogisticRegressionWithLBFGS.train(data, iterations)

# SVM
svmModel = SVMWithSGD.train(data, iterations)

# naive bayes
naiveBayesModel = NaiveBayes.train(nbData, 1.0) # 1.0 is the smoothing parameter

# decision tree
decisionTreeModel = DecisionTree.trainClassifier(data, numClasses = 2, categoricalFeaturesInfo = {}, 
                                                 impurity = 'entropy', maxDepth = 5)

## prediction

In [None]:
lrLabelPrediction = data.map(lambda x: (x.label, logisticModel.predict(x.features)))
svmLabelPrediction = data.map(lambda x: (x.label, svmModel.predict(x.features)))
nbLabelPrediction = nbData.map(lambda x: (x.label, naiveBayesModel.predict(x.features)))
dtPrediction = decisionTreeModel.predict(data.map(lambda x : x.features))
dtLabelPrediction = data.map(lambda x: x.label).zip(dtPrediction)

## evaluation

### accuracy

In [None]:
print lrLabelPrediction.take(10)
print svmLabelPrediction.take(10)
print nbLabelPrediction.take(10)
print dtLabelPrediction.take(10)
 
def accuracy(labelAndPrediction):
    return labelAndPrediction.filter(lambda x: x[0]==x[1]).count()/float(labelAndPrediction.count())

lrAccuracy = accuracy(lrLabelPrediction)
svmAccuracy = accuracy(svmLabelPrediction)
nbAccuracy = accuracy(nbLabelPrediction)
dtAccuracy = accuracy(dtLabelPrediction)
print 'lrAccuracy:{0}\nsvmAccuracy:{1}\nnbAccuracy:{2}\ndtAccuracy:{3}'.format(lrAccuracy, svmAccuracy, nbAccuracy, dtAccuracy)

[(0.0, 1), (1.0, 0), (1.0, 0), (1.0, 0), (0.0, 1), (0.0, 1), (1.0, 1), (0.0, 1), (1.0, 1), (1.0, 1)]
[(0.0, 1), (1.0, 1), (1.0, 1), (1.0, 1), (0.0, 1), (0.0, 1), (1.0, 1), (0.0, 1), (1.0, 1), (1.0, 1)]
[(0.0, 1.0), (1.0, 0.0), (1.0, 0.0), (1.0, 0.0), (0.0, 1.0), (0.0, 0.0), (1.0, 0.0), (0.0, 1.0), (1.0, 0.0), (1.0, 1.0)]
[(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (1.0, 0.0), (0.0, 1.0), (0.0, 1.0), (1.0, 0.0), (0.0, 1.0), (1.0, 1.0), (1.0, 1.0)]
lrAccuracy:0.627180527383
svmAccuracy:0.514672075727
nbAccuracy:0.580392156863
dtAccuracy:0.648275862069


### precision and recall

In [None]:
def precisionAndRecall(labelAndPrediction):
    TP = labelAndPrediction.filter(lambda x: x[0] == 1 and x[1] == 1).count()
    FP = labelAndPrediction.filter(lambda x: x[0] == 0 and x[1] == 1).count()
    FN = labelAndPrediction.filter(lambda x: x[0] == 1 and x[1] == 0).count()
    return (float(TP)/(TP + FP), float(TP)/(TP + FN))

print('preciosn and recall for logistic regression', precisionAndRecall(lrLabelPrediction))
print('preciosn and recall for svm', precisionAndRecall(svmLabelPrediction))
print('preciosn and recall for naive bayes', precisionAndRecall(nbLabelPrediction))
print('preciosn and recall for decision tree', precisionAndRecall(dtLabelPrediction))

preciosn and recall for logistic regression (0.6220342964529011, 0.6975763962065332)
preciosn and recall for svm (0.5140300935339569, 0.9989462592202318)
preciosn and recall for naive bayes (0.6222222222222222, 0.46469968387776606)
preciosn and recall for decision tree (0.6673200784094091, 0.6277660695468915)


## ROC and AUC

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# evaluate with built-in function
def ROCAndAUC(labelAndPrediction):
    metrics = BinaryClassificationMetrics(labelAndPrediction.map(lambda x:(float(x[1]), float(x[0]))))
    return (metrics.areaUnderPR, metrics.areaUnderROC)

print('model\tArea under PR\tArea under ROC')
print('Logistic Regression',ROCAndAUC(lrLabelPrediction))
print('SVM', ROCAndAUC(svmLabelPrediction))
print('Naive Bayes', ROCAndAUC(nbLabelPrediction))
print('Decision Tree', ROCAndAUC(dtLabelPrediction))

model	Area under PR	Area under ROC
Logistic Regression (0.7374253598523676, 0.6252538830157423)
SVM (0.7567586293858841, 0.5014181143280931)
Naive Bayes (0.6808510815151734, 0.5835585110136261)
Decision Tree (0.7430805993331199, 0.6488371887050935)


# Improving model performance and tuning parameters

## Feature standardization

In [None]:
from pyspark.mllib.linalg.distributed import RowMatrix 
features = data.map(lambda d: d.features)
matrix = RowMatrix(features)
matrixSummary = matrix.computeColumnSummaryStatistics()
print(matrixSummary.mean())
print(matrixSummary.variance())
# print(matrixSummary.min())
# print(matrixSummary.max())

[  4.12258053e-01   2.76182319e+00   4.68230473e-01   2.14079926e-01
   9.20623607e-02   4.92621604e-02   2.25510345e+00  -1.03750428e-01
   0.00000000e+00   5.64227450e-02   2.12305612e-02   2.33778177e-01
   2.75709037e-01   6.15551048e-01   6.60311021e-01   3.00770791e+01
   3.97565923e-02   5.71659824e+03   1.78754564e+02   4.96064909e+00
   1.72864050e-01   1.01220792e-01]
[  1.09742442e-01   7.43008248e+01   4.12631699e-02   2.15334363e-02
   9.21181745e-03   5.27493347e-03   3.25391871e+01   9.39698870e-02
   0.00000000e+00   1.71774103e-03   2.07826348e-02   2.75483942e-03
   3.68378892e+00   2.36679961e-01   2.24330712e-01   4.15878559e+02
   3.81811688e-02   7.87733008e+07   3.22081162e+04   1.04530090e+01
   3.35936340e-02   6.27753288e-03]


In [None]:
from pyspark.mllib.feature import StandardScaler
scaler = StandardScaler(withMean = True, withStd = True).fit(features)

# the following line is not allowed in distributed mode
# scaledData = data.map(lambda d : LabeledPoint(d.label, scaler.transform(d.features)))
labels = data.map(lambda d : d.label)
scaledData = labels.zip(scaler.transform(features)).map(lambda x : LabeledPoint(x[0],x[1]))
scaledFeatures = scaledData.map(lambda d: d.features)
scaledMatrix = RowMatrix(scaledFeatures)
scaledMatrixSummary = scaledMatrix.computeColumnSummaryStatistics()
print(scaledMatrixSummary.mean())
print(scaledMatrixSummary.variance())

[ -3.28296418e-16   5.27355937e-16   1.74166237e-15   3.24393290e-16
   1.70523318e-15  -9.47538489e-16  -9.67975700e-16  -1.45716772e-16
   0.00000000e+00  -3.50414142e-16  -4.77048956e-18   3.43475248e-16
  -3.67761377e-16   8.32667268e-17  -3.29597460e-17  -2.94729519e-15
  -6.93889390e-18  -5.36029554e-16  -1.14491749e-15  -2.48065457e-16
   2.00534034e-15  -1.72258041e-15]
[ 1.  1.  1.  1.  1.  1.  1.  1.  0.  1.  1.  1.  1.  1.  1.  1.  1.  1.
  1.  1.  1.  1.]


In [None]:
# retrain the model with scaled data
iterations = 10

# logistic regression
lr = LogisticRegressionWithLBFGS.train(scaledData, iterations)
lrLP = scaledData.map(lambda x: (x.label, lr.predict(x.features)))
print('preciosn and recall', precisionAndRecall(lrLP))
print('Area under PR and ROC',ROCAndAUC(lrLP))

('preciosn and recall', (0.62464046021093, 0.6865121180189674))
('ROC and AUC', (0.7360360592298912, 0.6256956255557466))


## Additional features

In [None]:
categories = dataFile.map(lambda line:line.split('\t')[3]).distinct().zipWithIndex().collect()
categories = dict(categories)
print(categories)

{u'"recreation"': 0, u'"gaming"': 7, u'"arts_entertainment"': 11, u'"computer_internet"': 1, u'"?"': 2, u'"sports"': 8, u'"business"': 3, u'"health"': 4, u'"science_technology"': 12, u'"religion"': 10, u'"law_crime"': 5, u'"unknown"': 9, u'"culture_politics"': 6, u'"weather"': 13}


In [None]:
# broadCate = sc.broadcast(categories)
def addCateFeature(line):
    fields = line.strip().split('\t')
    cateFeature = [0] * len(categories)
    idx = categories[fields[3]]
    cateFeature[idx] = 1
    label = int(fields[-1].replace('"', ''))
    for f in fields[4:-1]:
        f = f.replace('"', '')
        if f != '?':
            cateFeature.append(float(f))
        else:
            cateFeature.append(0.0)
    return LabeledPoint(label, cateFeature)

cateData = dataFile.map(addCateFeature)    
print(cateData.first())

(0.0,[0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575])


In [None]:
cateFeatures = cateData.map(lambda x : x.features)
cateLabels = cateData.map(lambda x : x.label)
cateScaler = StandardScaler(withMean = True, withStd = True).fit(cateFeatures)
scaleCateData = cateLabels.zip(cateScaler.transform(cateFeatures)).map(lambda x : LabeledPoint(x[0], x[1]))
print scaleCateData.first()

(0.0,[-0.446421204794,-0.204182210579,-0.680752790425,2.72073665645,-0.270999069693,-0.0648775723926,-0.220526884579,-0.101894690972,-0.232727977095,-0.028494000387,-0.0991499193088,-0.381813223243,-0.201654052319,-0.0232621058984,1.1376473365,-0.0819355716929,1.02513981289,-0.0558635644254,-0.468893253129,-0.354305326308,-0.317535217236,0.33845079824,0.0,0.828822173315,-0.147268943346,0.229639823578,-0.141625969099,0.790238049918,0.717194729453,-0.297996816496,-0.20346257793,-0.0329672096969,-0.0487811297558,0.940069975117,-0.108698488525,-0.278820782314])


In [None]:
iterations = 10

# logistic regression
lr = LogisticRegressionWithLBFGS.train(scaleCateData, iterations)
lrLP = scaleCateData.map(lambda x: (x.label, lr.predict(x.features)))
print('preciosn and recall', precisionAndRecall(lrLP))
print('Area under PR and ROC',ROCAndAUC(lrLP))

('preciosn and recall', (0.67293997965412, 0.6970495258166491))
('Area under PR and ROC', (0.7627499927624298, 0.6698640238141318))


## Using the correct form of data

In [None]:
# For naive bayesian MLlib implements a multinomial model
# This model workson input in the form of non-zero count data. This can include a binary representation
# of categorical features (such as the 1-of-k encoding covered previously) or frequency
# data (such as the frequency of occurrences of words in a document
def cateFeature(line):
    fields = line.strip().split('\t')
    cateFeature = [0] * len(categories)
    idx = categories[fields[3]]
    cateFeature[idx] = 1
    label = int(fields[-1].replace('"', ''))
    return LabeledPoint(label, cateFeature)

nbData = dataFile.map(cateFeature)

# naive bayes
nbModel = NaiveBayes.train(nbData, 1.0) # 1.0 is the smoothing parameter
nbLP = nbData.map(lambda x: (x.label, nbModel.predict(x.features)))
print('preciosn and recall', precisionAndRecall(nbLP))
print('Area under PR and ROC',ROCAndAUC(nbLP))

('preciosn and recall', (0.5916885212830341, 0.7726554267650158))
('Area under PR and ROC', (0.7405222106704076, 0.6051384941549446))
