# Machine Learning with Spark in Python

##  I. Seminar

### Setting the environment

In [1]:
import os

os.environ["JAVA_HOME"] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'
os.environ["SPARK_HOME"] = '/usr/local/Cellar/apache-spark/2.2.0/libexec'
os.environ["PYSPARK_PYTHON"] = '/usr/local/bin/python2.7' # because default is python3 for some reason

In [2]:
import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [3]:
import py4j
from pyspark import SparkContext, SparkConf, SQLContext

In [4]:
# create a SparkConf object to configure the application

conf = (SparkConf().setMaster("local[8]")     # cluster url
                   .setAppName("ML demo")     # application name
                   .set("spark.executor.memory", "2g"))

In [5]:
# initialize a SparkContext object to work with Spark
sc = SparkContext(conf=conf)

# to shut down Spark call sc.stop()

In [6]:
# initialize an SQLContext object to work with Spark SQL
sqlcontext = SQLContext(sc)

### Toy example

In [7]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
from pyspark.ml.classification import GBTClassificationModel
import numpy as np

In [8]:
# initialize data
data = [
    #LabeledPoint(label,features)
    LabeledPoint(0.0,[0.0]),
    LabeledPoint(1.0,[1.0]),
    LabeledPoint(3.0,[2.0]),
    LabeledPoint(2.0,[3.0])
]

# train a model on data
lrm = LinearRegressionWithSGD.train(
    sc.parallelize(data),
    iterations = 10,
    initialWeights = np.array([1.0])
)

# apply model to a single data point to predict its label
print(lrm.predict(np.array([1.0])))



0.928638123469


### Titanic dataset

Preparing the data.

In [9]:
# load titanic dataset as DataFrame

df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('train.csv')

In [10]:
df

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string]

In [11]:
df.head(3)

[Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=u'Braund, Mr. Owen Harris', Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A/5 21171', Fare=u'7.25', Cabin=None, Embarked=u'S'),
 Row(PassengerId=u'2', Survived=u'1', Pclass=u'1', Name=u'Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex=u'female', Age=u'38', SibSp=u'1', Parch=u'0', Ticket=u'PC 17599', Fare=u'71.2833', Cabin=u'C85', Embarked=u'C'),
 Row(PassengerId=u'3', Survived=u'1', Pclass=u'3', Name=u'Heikkinen, Miss. Laina', Sex=u'female', Age=u'26', SibSp=u'0', Parch=u'0', Ticket=u'STON/O2. 3101282', Fare=u'7.925', Cabin=None, Embarked=u'S')]

Adding new features based on existing ones.

In [12]:
from pyspark.sql.functions import udf
from pyspark.sql import types

# define a udf

def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())    # define udf with return type
df = df.withColumn('Embarked', my_udf(df['Embarked']))  # replace column 'Embarked' with new contents using udf

df.select('Embarked').distinct().collect()  # show distinct values from column 'Embarked'

[Row(Embarked=u'Q'), Row(Embarked=u'C'), Row(Embarked=u'S'), Row(Embarked=u'')]

We need to replace every value in column `Embarked` with a one-hot vector because it is a categorical feature.

In [13]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# assign indexes to values in 'Embarked'
stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")

# define transformation
model = stringIndexer.fit(df)

# perform transformation
indexed = model.transform(df)

# do one-hot encoding of a column
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")

# perform transformation
df_t = encoder.transform(indexed)

In [14]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector]

In [15]:
def parse_age(str_age):
    '''Return age as float.'''
    try:
        return float(str_age)
    except:
        return -1

In [16]:
def transf(r):
    '''Return a LabeledPoint with custom features.'''
    return LabeledPoint(int(r.Survived), [  int(r.Pclass),
                                            r.Sex == 'male',
                                            float(r.Fare),
                                            int(r.SibSp),
                                            int(r.Parch),
                                            parse_age(r.Age),] + list(r.EmbarkedVec.toArray()))

In [17]:
# create new dataset with custom features
data = df_t.rdd.map(transf)

Training Random Forest model on the modified dataset.

In [18]:
# split data into two parts
train, test = data.randomSplit([0.7, 0.3])

In [19]:
train.cache()  # persist this rdd (prevent from recomputing)

PythonRDD[98] at RDD at PythonRDD.scala:48

In [20]:
from pyspark.mllib.tree import RandomForest, RandomForestModel

# train random forest model on our 'train' data

rfc = RandomForest.trainClassifier(train, 
                                   numClasses=2, 
                                   categoricalFeaturesInfo={}, 
                                   numTrees=100)

In [21]:
def acc(rfc, test):
    '''Compute accuracy of model 'rfc' applied to 'test' data.'''
    values = test.map(lambda x: x.features)      # get features from elements in 'test' (new rdd)
    yhat = rfc.predict(values)                   # get predictions via trained random forest model
    y = test.map(lambda x: x.label)              # get labels from elements in 'test'
    comp = yhat.zip(y)                           # zip predictions with true labels
    errors = comp.map(lambda x: abs(x[0]-x[1]))  # compute errors
    
    return 1 - errors.sum()/errors.count()       # return average accuracy

In [22]:
# evaluate model on test data
acc(rfc, test)

0.7969924812030076

## II. Home Assignment

1. Add 5 new features
    - 3 of which are computed based on existing ones
    - at least one uses UDF
2. Train 3 new ML models
3. Compute F1-measure

This is the data that we are going to start with:

In [23]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector]


|Variable|Definition|
|:---|:---|
|Survived|Survival|
|Pclass|Ticket class|
|Name|Name|
|Sex|Sex|
|Age|Age in years|
|SibSp|# of siblings / spouses aboard the Titanic|
|Parch|# of parents / children aboard the Titanic|
|Ticket|	Ticket number	|
|Fare|	Passenger fare	|
|Cabin|	Cabin number	|
|Embarked|	Port of Embarkation|

### New Features

Let's make new features in addintion to those that we've defined during class. We'll write a function that extracts existing features and composes a new element with them.

Some features are inspired by [an article](https://triangleinequality.wordpress.com/2013/09/08/basic-feature-engineering-with-the-titanic-data/) on feature engineering for Titanic dataset.

1. **Age groups**
    - I started with defining children, but then decided to go further and defined a whole bunch of age groups.
    - It is a categorical feature and thus needs to be encoded for ML.
2. **Women's marriage status**
    - I tried to differentiate between married and not married women judging by their titles in names. I realize that it's a bit lazy approach since there are a lot more titles in the dataset but hope it's okay.
    - This produces two features that are both numerical.
3. **Deck**
    - Deck letter roughly corresponds to a floor of the ship and can be obtained directly from the cabin number.
4. **Family on board**
    - This boolean feature indicates whether a passenger has any family members on board at all, or is considered alone.

**Age groups** feature derived using UDF.

In [24]:
def age_group(age_str):
    '''Assign an age group based on age.'''
    age = parse_age(age_str)
    
    if age > 0:
        if age < 3:
            return 'toddler'
        elif age < 14:
            return 'child'
        elif age < 20:
            return 'teenager'
        elif age < 60:
            return 'adult'
        else:
            return 'oldster'
    else:
        return 'unknown'

age_udf = udf(age_group, types.StringType())              # define udf with return type
df_t = df_t.withColumn('AgeGroup', age_udf(df_t['Age']))  # add column 'AgeGroup' using udf

We need to replace every value in column `AgeGroup` with a one-hot vector because it is a categorical feature.

In [25]:
# assign indexes to values in 'AgeGroup'
stringIndexer = StringIndexer(inputCol="AgeGroup", outputCol="AgeGroupIndex")

# define transformation
model = stringIndexer.fit(df_t)

# perform transformation
indexed = model.transform(df_t)

# do one-hot encoding of a column
encoder = OneHotEncoder(inputCol="AgeGroupIndex", outputCol="AgeGroupVec")

# perform transformation
df_t = encoder.transform(indexed)

In [26]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector, AgeGroup: string, AgeGroupIndex: double, AgeGroupVec: vector]

**Deck** feature derived using UDF.

In [27]:
def get_deck(cabin_str):
    '''Get deck letter based on cabin number.'''
    
    if cabin_str is not None:
        return cabin_str[0]
    else:
        return 'Unknown'

deck_udf = udf(get_deck, types.StringType())             # define udf with return type
df_t = df_t.withColumn('Deck', deck_udf(df_t['Cabin']))  # add column 'Deck' using udf    

We need to replace every value in column `Deck` with a one-hot vector because it is a categorical feature.

In [28]:
# assign indexes to values in 'Deck'
stringIndexer = StringIndexer(inputCol="Deck", outputCol="DeckIndex")

# define transformation
model = stringIndexer.fit(df_t)

# perform transformation
indexed = model.transform(df_t)

# do one-hot encoding of a column
encoder = OneHotEncoder(inputCol="DeckIndex", outputCol="DeckVec")

# perform transformation
df_t = encoder.transform(indexed)

In [29]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector, AgeGroup: string, AgeGroupIndex: double, AgeGroupVec: vector, Deck: string, DeckIndex: double, DeckVec: vector]

Final feature definition.

In [30]:
def extract_features(r):
    '''Return a LabeledPoint with custom features.'''
    
    name = r.Name
    
    return LabeledPoint(
                        # label and features defined previously
                        int(r.Survived), [int(r.Pclass),     # ticket class
                                          r.Sex == 'male',   # sex
                                          parse_age(r.Age),  # age
                                          int(r.SibSp),      # number of siblings
                                          int(r.Parch),      # number of parents/children
                                          float(r.Fare),     # ticket cost
                                          ]
                                       + list(r.EmbarkedVec.toArray())  # embarkation location (one-hot vector)
                        # new features
                                       + ['Miss.' in name or 'Mlle.' in name or 'Ms.' in name,  # not married woman
                                          'Mrs.' in name or 'Mme' in name,  # married woman
                                          int(r.SibSp)+int(r.Parch) > 0,    # has family members on board
                                          ]
                                       + list(r.AgeGroupVec.toArray())  # age group (one-hot vector)
                                       + list(r.DeckVec.toArray())      # deck (one-hot vector)
                        )

In [31]:
# create new dataset with custom features
data = df_t.rdd.map(extract_features)

### Model Training

Preparation

In [32]:
# split data into two parts
train, test = data.randomSplit([0.7, 0.3])

In [33]:
train.cache()  # persist this rdd (prevent from recomputing)

PythonRDD[166] at RDD at PythonRDD.scala:48

In [34]:
def f1(model, test):
    '''Compute F1-measure of model applied to test data.'''
    # F1 = 2/(1/recall + 1/precision)
    # recall = TP/(TP + FN)
    # precision = TP/(TP + FP)
    
    values = test.map(lambda x: x.features)      # get features from elements in 'test' (new rdd)
    yhat = model.predict(values)                 # get predictions via trained random forest model
    y = test.map(lambda x: x.label)              # get labels from elements in 'test'
    comp = yhat.zip(y)                           # zip predictions with true labels
    # comp == (prediction, truth)
    
    tp_set = comp.filter(lambda x: x[0] + x[1] == 2)
    tp = float(tp_set.count())
    
    fp_set = comp.filter(lambda x: x[0] - x[1] == 1)
    fp = float(fp_set.count())
    
    fn_set = comp.filter(lambda x: x[1] - x[0] == 1)
    fn = float(fn_set.count())
    
    if tp > 0:
        recall_ = (tp + fn)/tp
        precision_ = (tp + fp)/tp
        return 2.0/(recall_ + precision_)
    else:
        return 0

Logistic Regression

In [35]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

lrm = LogisticRegressionWithLBFGS.train(train, iterations=100)

In [36]:
f1(lrm, test)

0.7439613526570049

SVM

In [57]:
from pyspark.mllib.classification import SVMWithSGD

svm = SVMWithSGD.train(train, iterations=100, miniBatchFraction=0.2)

In [58]:
f1(svm, test)

0.5617021276595745

Gradient Boosted Trees

In [39]:
from pyspark.mllib.tree import GradientBoostedTrees

gbt = GradientBoostedTrees.trainClassifier(train, categoricalFeaturesInfo={}, numIterations=100)

In [41]:
f1(gbt, test)

0.7922705314009661