# Machine Learning Example

This notebook is used to practice machine learning using Spark.  It shows some of the things that can be done for the various steps once we have a training and test set.

## Setup Environment

In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkFiles
from pyspark.sql import functions as sqlf
from pyspark.sql import types as sqlt
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

sc = SparkContext()
sqlContext = SQLContext(sc)

## Load Training Data

In [2]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)
DF_ppl.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [3]:
#url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
#sc.addFile(url)
sc.addFile('machine-learning/train.csv')
df = sqlContext.read.csv(SparkFiles.get("train.csv"), header=True, inferSchema= True)
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- premise: string (nullable = true)
 |-- hypothesis: string (nullable = true)
 |-- lang_abv: string (nullable = true)
 |-- language: string (nullable = true)
 |-- label: string (nullable = true)



## Explore Data

In [4]:
# It looks like some labels are bad

df.groupby('label').count().show()

+--------------------+-----+
|               label|count|
+--------------------+-----+
|                  en|    9|
|To keep the peace...|    1|
|                   0| 4158|
|             English|   42|
|     It was destiny.|    1|
|To keep the peace...|    1|
|The man yelled th...|    1|
| continued the Co...|    2|
|                   1| 3862|
|The man exclaimed...|    1|
|                   2| 4042|
+--------------------+-----+



In [5]:
# Here are the ones without the right labels

df.filter(~df.label.isin([0,1,2])).show()

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        id|             premise|          hypothesis|            lang_abv|            language|               label|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|dd4f0d9f25|"""If you people ...| you wouldn't jok...|Many people have ...|                  en|             English|
|ad4b9214af|" ""So your girl ...| eh?"" he chortled."|Your farewell act...|                  en|             English|
|bc400f6df7|"This is one of t...| if you must wors...|"""We shouldn't w...| why do we even h...|                  en|
|26b2abe32d|"We need to be su...|            for once| seemed tongue-ti...|Tuppence was shoc...|                  en|
|34fe3bf8ea|"Well, we will co...| her hands folded...| and her grey hai...|Dorcas will be as...|                  en|
|218c1a9db3|"Very well ‚Äùbut i...|  and went inside. "|

In [6]:
# Here's the null count

df.agg(*[sqlf.count(sqlf.when(sqlf.isnull(c), c)).alias(c) for c in df.columns]).show()

+---+-------+----------+--------+--------+-----+
| id|premise|hypothesis|lang_abv|language|label|
+---+-------+----------+--------+--------+-----+
|  0|      0|         0|       0|       0|    0|
+---+-------+----------+--------+--------+-----+



## Clean Data

In [7]:
clean_df = df.filter(df.label.isin([0,1,2]))
clean_df.groupby('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0| 4158|
|    1| 3862|
|    2| 4042|
+-----+-----+



## Prepare Data

In [8]:
# make label numeric
clean_df = clean_df.withColumn('label', clean_df['label'].cast(sqlt.FloatType()))


In [9]:
# one-hot encoding is something that can do done for categorical columns
# we start by converting language values to a number

# assign an index number to each category using a string indexer
language_indexer = StringIndexer(inputCol='language',outputCol='language_index')
clean_df = language_indexer.fit(clean_df).transform(clean_df)
clean_df.select(['language','language_index']).show()

+---------+--------------+
| language|language_index|
+---------+--------------+
|  English|           0.0|
|  English|           0.0|
|   French|           3.0|
|  English|           0.0|
|     Thai|          10.0|
|  Turkish|          13.0|
|     Urdu|           5.0|
|  English|           0.0|
|  English|           0.0|
|  Russian|           7.0|
|Bulgarian|          14.0|
|   German|          12.0|
|   Arabic|           2.0|
|  Chinese|           1.0|
|    Hindi|           8.0|
|  Swahili|           4.0|
|  English|           0.0|
|  English|           0.0|
|  English|           0.0|
|  English|           0.0|
+---------+--------------+
only showing top 20 rows



In [10]:
# Create one-hot encoder for language
language_encoder = OneHotEncoder(inputCols=['language_index'],outputCols=['language_ohe'])
clean_df = language_encoder.fit(clean_df).transform(clean_df)
clean_df.select(['language','language_index','language_ohe']).show()


+---------+--------------+---------------+
| language|language_index|   language_ohe|
+---------+--------------+---------------+
|  English|           0.0| (14,[0],[1.0])|
|  English|           0.0| (14,[0],[1.0])|
|   French|           3.0| (14,[3],[1.0])|
|  English|           0.0| (14,[0],[1.0])|
|     Thai|          10.0|(14,[10],[1.0])|
|  Turkish|          13.0|(14,[13],[1.0])|
|     Urdu|           5.0| (14,[5],[1.0])|
|  English|           0.0| (14,[0],[1.0])|
|  English|           0.0| (14,[0],[1.0])|
|  Russian|           7.0| (14,[7],[1.0])|
|Bulgarian|          14.0|     (14,[],[])|
|   German|          12.0|(14,[12],[1.0])|
|   Arabic|           2.0| (14,[2],[1.0])|
|  Chinese|           1.0| (14,[1],[1.0])|
|    Hindi|           8.0| (14,[8],[1.0])|
|  Swahili|           4.0| (14,[4],[1.0])|
|  English|           0.0| (14,[0],[1.0])|
|  English|           0.0| (14,[0],[1.0])|
|  English|           0.0| (14,[0],[1.0])|
|  English|           0.0| (14,[0],[1.0])|
+---------+

In [11]:
# TODO: use word embeddings to vectorize string columns (maybe BERT?)

In [12]:
# Assemble a features column that can be used for creating a model later
# TODO: add vectors from word embeddings

assembler = VectorAssembler(inputCols=['language_ohe'], outputCol="features")
clean_df = assembler.transform(clean_df)
clean_df.select(['id','features']).show()

+----------+---------------+
|        id|       features|
+----------+---------------+
|5130fd2cb5| (14,[0],[1.0])|
|5b72532a0b| (14,[0],[1.0])|
|3931fbe82a| (14,[3],[1.0])|
|5622f0c60b| (14,[0],[1.0])|
|86aaa48b45|(14,[10],[1.0])|
|ed7d6a1e62|(14,[13],[1.0])|
|5a0f4908a0| (14,[5],[1.0])|
|fdcd1bd867| (14,[0],[1.0])|
|7cfb3d272c| (14,[0],[1.0])|
|8c10229663| (14,[7],[1.0])|
|a1971593d5|     (14,[],[])|
|2bf4b86d4f|(14,[12],[1.0])|
|91b03f6bf4| (14,[2],[1.0])|
|4c25aa4c06| (14,[1],[1.0])|
|82f24422eb| (14,[8],[1.0])|
|6d63ae6397| (14,[4],[1.0])|
|0a3f52c547| (14,[0],[1.0])|
|4b0eca3ccb| (14,[0],[1.0])|
|cad235551c| (14,[0],[1.0])|
|d8b3a4fb06| (14,[0],[1.0])|
+----------+---------------+
only showing top 20 rows



## Train Model

In [13]:
# Use logistic regression
model = LogisticRegression(featuresCol='features',labelCol='label')
clean_df = model.fit(clean_df).transform(clean_df)
clean_df.select(['id','language','label','prediction']).show()


+----------+---------+-----+----------+
|        id| language|label|prediction|
+----------+---------+-----+----------+
|5130fd2cb5|  English|  0.0|       0.0|
|5b72532a0b|  English|  2.0|       0.0|
|3931fbe82a|   French|  0.0|       0.0|
|5622f0c60b|  English|  0.0|       0.0|
|86aaa48b45|     Thai|  1.0|       1.0|
|ed7d6a1e62|  Turkish|  0.0|       2.0|
|5a0f4908a0|     Urdu|  0.0|       2.0|
|fdcd1bd867|  English|  2.0|       0.0|
|7cfb3d272c|  English|  1.0|       0.0|
|8c10229663|  Russian|  0.0|       0.0|
|a1971593d5|Bulgarian|  0.0|       0.0|
|2bf4b86d4f|   German|  1.0|       2.0|
|91b03f6bf4|   Arabic|  0.0|       2.0|
|4c25aa4c06|  Chinese|  2.0|       1.0|
|82f24422eb|    Hindi|  2.0|       2.0|
|6d63ae6397|  Swahili|  0.0|       0.0|
|0a3f52c547|  English|  0.0|       0.0|
|4b0eca3ccb|  English|  1.0|       0.0|
|cad235551c|  English|  2.0|       0.0|
|d8b3a4fb06|  English|  2.0|       0.0|
+----------+---------+-----+----------+
only showing top 20 rows



## Load Test Data

In [14]:
sc.addFile('machine-learning/test.csv')
test_df = sqlContext.read.csv(SparkFiles.get("test.csv"), header=True, inferSchema= True)
test_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- premise: string (nullable = true)
 |-- hypothesis: string (nullable = true)
 |-- lang_abv: string (nullable = true)
 |-- language: string (nullable = true)



## Test Model