# Classification with PySpark

For this notebook I am using a simple sentiment analisys dataset just to show a classification pipeline with PySpark. This dataset is the same I used for my bachelor of science thesis degree. Indeed, this classification pipeline can be used with other datasets.

Firstly, let's show the data, with the help of pandas library:

In [1]:
import pandas as pd
df = pd.read_csv('/Users/user/Jupyter/ml_notes/PySpark-Notes/datasets/simple_corpus.csv')

df.head(10)

Unnamed: 0,id,content,label
0,AEG_Electrolux_60840_Lavamat__Opinion_1506705,programa especial ropa sensible planchado fáci...,5
1,AEG_Electrolux_62610_Lavamat__Opinion_2000923,carga superior punto medio color blanco carga ...,4
2,AEG_Electrolux_L14800VI__Opinion_2005396,carga superior forma frontal programa majo efi...,5
3,AEG_Electrolux_L6227FL__Opinion_2140710,modelo electrolux,4
4,AEG_Electrolux_L62280FL__Opinion_2151025,clasificación energético sonido bajo conversac...,4
5,AEG_Electrolux_L62642VI__Opinion_1995158,ficha tecnica programa especial prenda delicado,5
6,AEG_Electrolux_L62642VI__Opinion_1995791,habitación principal control electrónico clasi...,4
7,AEG_Electrolux_L70470FL__Opinion_2085167,clasificación energético fase anterior lavado ...,4
8,AEG_Electrolux_L74650__Opinion_1856967,capazidad grande,3
9,AEG_Electrolux_L85275XFL__Opinion_2138100,temperatura medio ciclo medio factor determina...,5


With regards to the above dataframe, we can se that it is conformed of 3 columns (`id`, `content` and `label`), note that pandas adds an index column at the first position. Next, we proceed to transform the pandas dataframe to a spark dataframe as follows:

In [3]:
sqlContext = SQLContext(sc)
spark_df = sqlContext.createDataFrame(df[['id', 'content', 'label']])
spark_df.show(10)

+--------------------+--------------------+-----+
|                  id|             content|label|
+--------------------+--------------------+-----+
|AEG_Electrolux_60...|programa especial...|    5|
|AEG_Electrolux_62...|carga superior pu...|    4|
|AEG_Electrolux_L1...|carga superior fo...|    5|
|AEG_Electrolux_L6...|   modelo electrolux|    4|
|AEG_Electrolux_L6...|clasificaciÃ³n en...|    4|
|AEG_Electrolux_L6...|ficha tecnica pro...|    5|
|AEG_Electrolux_L6...|habitaciÃ³n princ...|    4|
|AEG_Electrolux_L7...|clasificaciÃ³n en...|    4|
|AEG_Electrolux_L7...|    capazidad grande|    3|
|AEG_Electrolux_L8...|temperatura medio...|    5|
+--------------------+--------------------+-----+
only showing top 10 rows



In [4]:
#regresa una lista
spark_df.take(1)

[Row(id=u'AEG_Electrolux_60840_Lavamat__Opinion_1506705', content=u'programa especial ropa sensible planchado f\xc3\xa1cil funci\xc3\xb3n especial tiempo cercano punto fuerte se\xc3\xb1al ac\xc3\xbastico lugar visible carga m\xc3\xa1ximo consumo m\xc3\xa1ximo', label=5)]

# CV

In [10]:
#Cross validation

(df_train, df_test) = spark_df.randomSplit([0.8, 0.2])
(df_train, df_test)

(DataFrame[id: string, content: string, label: bigint],
 DataFrame[id: string, content: string, label: bigint])

In [5]:
#breaking a stream of text up into tokens or other elements 

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer = Tokenizer(inputCol='content', outputCol='tokens')
tokensData = tokenizer.transform(spark_df)
tokensData

DataFrame[id: string, content: string, label: bigint, tokens: array<string>]

In [16]:
# Tokenizing
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol='content', outputCol='tokens')
df_train_words = tokenizer.transform(df_train)

# Hashing Term-Frequency
from pyspark.ml.feature import HashingTF
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='content_tf', numFeatures=10000)
df_train_tf = hashing_tf.transform(df_train_words)

# Inverse Document Frequency
from pyspark.ml.feature import IDF
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="content_tfidf")
idf_model = idf.fit(df_train_tf) # fit to build the model on all the data, and then apply it line by line
df_train_tfidf = idf_model.transform(df_train_tf)

print df_train_words
print df_train_tf
print df_train_tfidf

DataFrame[id: string, content: string, label: bigint, tokens: array<string>]
DataFrame[id: string, content: string, label: bigint, tokens: array<string>, content_tf: vector]
DataFrame[id: string, content: string, label: bigint, tokens: array<string>, content_tf: vector, content_tfidf: vector]


In [17]:
df_train_tfidf.show(5)

+--------------------+--------------------+-----+--------------------+--------------------+--------------------+
|                  id|             content|label|              tokens|          content_tf|       content_tfidf|
+--------------------+--------------------+-----+--------------------+--------------------+--------------------+
|AEG_Electrolux_60...|programa especial...|    5|[programa, especi...|(10000,[205,226,3...|(10000,[205,226,3...|
|AEG_Electrolux_62...|carga superior pu...|    4|[carga, superior,...|(10000,[596,1150,...|(10000,[596,1150,...|
|AEG_Electrolux_L6...|   modelo electrolux|    4|[modelo, electrolux]|(10000,[614,1939]...|(10000,[614,1939]...|
|AEG_Electrolux_L6...|clasificaciÃ³n en...|    4|[clasificaciã³n, ...|(10000,[3465,3731...|(10000,[3465,3731...|
|AEG_Electrolux_L6...|habitaciÃ³n princ...|    4|[habitaciã³n, pri...|(10000,[565,2075,...|(10000,[565,2075,...|
+--------------------+--------------------+-----+--------------------+--------------------+-----

In [20]:
# Indexing the label
from pyspark.ml.feature import StringIndexer
string_indexer = StringIndexer(inputCol='label', outputCol='target_indexed')
string_indexer_model = string_indexer.fit(df_train_tfidf)
df_train_final = string_indexer_model.transform(df_train_tfidf)
df_train_final

DataFrame[id: string, content: string, label: bigint, tokens: array<string>, content_tf: vector, content_tfidf: vector, target_indexed: double]

In [21]:
# Training a Decision Tree on training set
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol())
dt_model = dt.fit(df_train_final)

# Transform the test set
df_test_words = tokenizer.transform(df_test)
df_test_tf = hashing_tf.transform(df_test_words)
df_test_tfidf = idf_model.transform(df_test_tf)
df_test_final = string_indexer_model.transform(df_test_tfidf)

# Preditions on the test set
df_test_pred = dt_model.transform(df_test_final)

In [22]:
df_test_pred.select('id', 'label', 'prediction', 'probability').show(5)

+--------------------+-----+----------+--------------------+
|                  id|label|prediction|         probability|
+--------------------+-----+----------+--------------------+
|AEG_Electrolux_L1...|    5|       1.0|[0.33992094861660...|
|AEG_Electrolux_L6...|    5|       0.0|[0.43514328808446...|
|AEG_Electrolux_L7...|    4|       1.0|[0.33992094861660...|
|AEG_Electrolux_L_...|    4|       1.0|[0.1,0.8,0.1,0.0,...|
|AEG_Electrolux_L_...|    4|       1.0|[0.33992094861660...|
+--------------------+-----+----------+--------------------+
only showing top 5 rows



# Classification Pipeline

In [38]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.mllib.classification import SVMModel
from pyspark.ml import Pipeline

# Instanciate all the Estimators and Transformers necessary
tokenizer = Tokenizer(inputCol='content', outputCol='content_words')
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='content_tf', numFeatures=10000)
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="content_tfidf")
string_indexer = StringIndexer(inputCol='label', outputCol='label_indexed')





dt = DecisionTreeClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol())



# Instanciate a Pipeline
pipeline = Pipeline(stages=[tokenizer, 
                            hashing_tf, 
                            idf, 
                            string_indexer, 
                            dt])

# Transform the data and train the classifier on the training set
pipeline_model = pipeline.fit(df_train)

# Transform the data and perform predictions on the test set
df_test_pred = pipeline_model.transform(df_test)

In [39]:
df_test_pred.show(3)

+--------------------+--------------------+-----+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------+
|                  id|             content|label|       content_words|          content_tf|       content_tfidf|label_indexed|       rawPrediction|         probability|prediction|
+--------------------+--------------------+-----+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------+
|AEG_Electrolux_L1...|carga superior fo...|    5|[carga, superior,...|(10000,[596,1150,...|(10000,[596,1150,...|          0.0|[86.0,137.0,26.0,...|[0.33992094861660...|       1.0|
|AEG_Electrolux_L6...|ficha tecnica pro...|    5|[ficha, tecnica, ...|(10000,[5657,6679...|(10000,[5657,6679...|          0.0|[577.0,468.0,126....|[0.43514328808446...|       0.0|
|AEG_Electrolux_L7...|clasificaciÃ³n en...|    4|[clasificaciã³n, ...|(10000,[1144,1909...|(10000,[1

# Model evaluation

In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Instanciate a MulticlassClassificationEvaluator with precision metric
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label_indexed', 
                                              metricName='precision')

# Evaluate the predictions done on the test set
evaluator.evaluate(df_test_pred)

0.47680890538033394

# Tune hyper parameters

In [32]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

# Instanciation of a ParamGridBuilder

grid = (ParamGridBuilder()
        .baseOn([evaluator.metricName, 'precision'])
        .addGrid(dt.maxDepth, [10, 30])
        .build())

# Instanciation of a CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)

# Transform the data and train the classifier on the training set
cv_model = cv.fit(df_train)

# Transform the data and perform predictions on the test set
df_test_pred = cv_model.transform(df_test)

# Evaluate the predictions done on the test set
evaluator.evaluate(df_test_pred)

0.47680890538033394