In [1]:
import warnings
warnings.filterwarnings('ignore')
import pandas as pd


from pyspark.ml import *
from pyspark.ml.classification import *
from pyspark.ml.feature import *
from pyspark.ml.param import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import rand 
from sklearn.metrics import classification_report
from time import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row

In [8]:

sc =SparkContext()
print (sc.version)



3.0.1


In [9]:
rdd1 = sc.textFile("amazon_cells_labelled.txt")
rdd1.take(5)

['So there is no way for me to plug it in here in the US unless I go by a converter.\t0',
 'Good case, Excellent value.\t1',
 'Great for the jawbone.\t1',
 'Tied to charger for conversations lasting more than 45 minutes.MAJOR PROBLEMS!!\t0',
 'The mic is great.\t1']

In [10]:
# Split a review sentence and a corresponding label 
rdd2 = rdd1.map(lambda x: x.split("\t"))
rdd2.take(5)

[['So there is no way for me to plug it in here in the US unless I go by a converter.',
  '0'],
 ['Good case, Excellent value.', '1'],
 ['Great for the jawbone.', '1'],
 ['Tied to charger for conversations lasting more than 45 minutes.MAJOR PROBLEMS!!',
  '0'],
 ['The mic is great.', '1']]

In [11]:
# Create a RDD of Rows with the field names : label and review
rdd3 = rdd2.map(lambda x: Row(review=x[0],label=x[1]))
rdd3.take(5)

[Row(review='So there is no way for me to plug it in here in the US unless I go by a converter.', label='0'),
 Row(review='Good case, Excellent value.', label='1'),
 Row(review='Great for the jawbone.', label='1'),
 Row(review='Tied to charger for conversations lasting more than 45 minutes.MAJOR PROBLEMS!!', label='0'),
 Row(review='The mic is great.', label='1')]

In [20]:
sqlContext = SQLContext(sc)
# Converting an RDD to DataFrame
df=sqlContext.createDataFrame(rdd3)
# Registers this DataFrame as a temporary table using the given name
df.registerTempTable("df")
# convert String labels to Double type
df = df.withColumn("label", df.label.cast(DoubleType()))
df.show()

+--------------------+-----+
|              review|label|
+--------------------+-----+
|So there is no wa...|  0.0|
|Good case, Excell...|  1.0|
|Great for the jaw...|  1.0|
|Tied to charger f...|  0.0|
|   The mic is great.|  1.0|
|I have to jiggle ...|  0.0|
|If you have sever...|  0.0|
|If you are Razr o...|  1.0|
|Needless to say, ...|  0.0|
|What a waste of m...|  0.0|
|And the sound qua...|  1.0|
|He was very impre...|  1.0|
|If the two were s...|  0.0|
|Very good quality...|  1.0|
|The design is ver...|  0.0|
|Highly recommend ...|  1.0|
|I advise EVERYONE...|  0.0|
|    So Far So Good!.|  1.0|
|       Works great!.|  1.0|
|It clicks into pl...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [21]:
# convert the distinct labels in the input dataset to index values
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df)
# tokenizer 
tokenizer = RegexTokenizer(inputCol="review", outputCol="words", pattern="\W")##'\w' remove none-word letters
df_tokenized = tokenizer.transform(df)
# remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df_removed = remover.transform(df_tokenized)
# Convert to TF words vector
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
df_TF = hashingTF.transform(df_removed)
# Convert to TF*IDF words vector
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(df_TF)
df_idf = idfModel.transform(df_TF)
for features_label in df_idf.select("features", "label").take(3):
    print(features_label)

Row(features=SparseVector(262144, {51471: 4.8293, 109156: 5.8101, 113100: 4.4238, 148675: 4.9628, 235273: 5.5225, 254682: 6.2156}), label=0.0)
Row(features=SparseVector(262144, {78745: 3.5766, 113432: 2.5913, 123499: 5.117, 192310: 3.5076}), label=1.0)
Row(features=SparseVector(262144, {135499: 5.5225, 261870: 2.3238}), label=1.0)


In [23]:
# Split data aproximately into training (80%) and test (20%)
(train, test)=df.randomSplit([0.8,0.2], seed = 0)
# Cache the train and test data in-memory 
train = train.cache()
test = test.cache()
print ('Sample number in the train set : {}'.format(train.count()))
print ('Sample number in the test set : {}'.format(test.count()))
train.groupby('label').count().toPandas()

Sample number in the train set : 808
Sample number in the test set : 192


Unnamed: 0,label,count
0,0.0,422
1,1.0,386


In [25]:
def grid_search(p1,p2,p3,p4):
    lr = LogisticRegression()
    pipeline = Pipeline(stages=[labelIndexer,tokenizer, remover, hashingTF, idfModel, lr])
  
    #Create ParamGrid for Cross Validation
    paramGrid = (ParamGridBuilder()
                 .addGrid(hashingTF.numFeatures, [p1])
                 .addGrid(lr.regParam, [p2])
                 .addGrid(lr.elasticNetParam, [p3])
                 .addGrid(lr.maxIter, [p4])
                 .build())
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=4)
    
    ########  Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(train)
    # average cross-validation accuracy metric/s on all folds
    average_score = cvModel.avgMetrics
    print ('average cross-validation accuracy = {}'.format(average_score[0]))
    return average_score[0]

In [27]:
score=0.0
for p1 in [45000,50000,55000]:
    for p2 in [0.09,0.10,0.11]:
        for p3 in [0.09,0.10,0.11]:
            for p4 in [9,10,11]:
                t0 = time()
                print ('(numFeatures,regParam,elasticNetParam,maxIter)=({},{},{},{})'.format(p1,p2,p3,p4))
                average_score=grid_search(p1,p2,p3,p4)
                tt = time() - t0
                print ("Classifier trained in {} seconds".format(round(tt,3)))
                if average_score > score:
                    print ('################ Best score ######################')
                    params=(p1,p2,p3,p4)
                    score=average_score
print ('Best score is {} at params ={}'.format(score, params))

(numFeatures,regParam,elasticNetParam,maxIter)=(45000,0.09,0.09,9)
average cross-validation accuracy = 0.8083603645874418
Classifier trained in 11.988 seconds
################ Best score ######################
(numFeatures,regParam,elasticNetParam,maxIter)=(45000,0.09,0.09,10)
average cross-validation accuracy = 0.8110226363927155
Classifier trained in 8.387 seconds
################ Best score ######################
(numFeatures,regParam,elasticNetParam,maxIter)=(45000,0.09,0.09,11)
average cross-validation accuracy = 0.8060339779528034
Classifier trained in 8.172 seconds
(numFeatures,regParam,elasticNetParam,maxIter)=(45000,0.09,0.1,9)
average cross-validation accuracy = 0.809247344073933
Classifier trained in 8.399 seconds
(numFeatures,regParam,elasticNetParam,maxIter)=(45000,0.09,0.1,10)
average cross-validation accuracy = 0.8118715853453029
Classifier trained in 7.478 seconds
################ Best score ######################
(numFeatures,regParam,elasticNetParam,maxIter)=(45000,0.

average cross-validation accuracy = 0.8092447185349815
Classifier trained in 8.223 seconds
(numFeatures,regParam,elasticNetParam,maxIter)=(50000,0.11,0.11,9)
average cross-validation accuracy = 0.808908833364346
Classifier trained in 8.068 seconds
(numFeatures,regParam,elasticNetParam,maxIter)=(50000,0.11,0.11,10)
average cross-validation accuracy = 0.8056954672432163
Classifier trained in 7.68 seconds
(numFeatures,regParam,elasticNetParam,maxIter)=(50000,0.11,0.11,11)
average cross-validation accuracy = 0.8016370699016028
Classifier trained in 8.728 seconds
(numFeatures,regParam,elasticNetParam,maxIter)=(55000,0.09,0.09,9)
average cross-validation accuracy = 0.8183783375401217
Classifier trained in 7.533 seconds
(numFeatures,regParam,elasticNetParam,maxIter)=(55000,0.09,0.09,10)
average cross-validation accuracy = 0.822393453269928
Classifier trained in 8.447 seconds
################ Best score ######################
(numFeatures,regParam,elasticNetParam,maxIter)=(55000,0.09,0.09,11)
