# Setting Up

## Init

In [None]:
import seaborn as sns
import numpy as np
import pandas as pd
import collections # For frequency counting
import findspark
findspark.init("/home/alex/learn-spark-python/spark2")

import pyspark
from pyspark.sql import DataFrameNaFunctions
from pyspark.sql.functions import lit # Create columns of *literal* value
from pyspark.sql.functions import col # Returns a Column based on the 
                                      # given column name
from pyspark.ml.feature import StringIndexer #label encoding
from pyspark.ml import Pipeline,PipelineModel

sc = pyspark.SparkContext(appName="helloworld")

## SparkSession

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Data

In [None]:
trainDF = spark.read.csv("/home/alex/learn-spark-python/data/titanic/train.csv", header="true")
testDF = spark.read.csv("/home/alex/learn-spark-python/data/titanic/test.csv", header="true")

**Combine train and test data.**

In [None]:
trainDF.columns

In [None]:
trainDF.select(['Embarked']).collect()

In [None]:
trainDF.select(['Embarked']).show()

In [None]:
trainDF = trainDF.withColumn('Mark', lit('train'))

In [None]:
trainDF.show()

In [None]:
trainDF.select(['Mark']).show()

In [None]:
## Add Survived column to test, and dataset name as a column
testDF = (testDF.withColumn('Survived',lit(0))
                .withColumn('Mark', lit('test')))
testDF = testDF[trainDF.columns]

## Append Test data to Train data
df = trainDF.unionAll(testDF)

In [None]:
testDF.show()

In [None]:
df.createOrReplaceTempView("train")

# Steps in a Machine Learning Workflow

* Data Collection
* Data Preprocessing
* Feature Engineering
* Data format translation
* Modeling
* Evaluation and Selection

## Data Collection

* Combiniing Datasets

## Data Preprocessing

### Exploratory Data Analysis

* Statistical Summary
* Histograms
* Correlations

#### What is the schema?

In [None]:
df.columns

In [None]:
df.printSchema()

#### Which ones are numeric?

In [None]:
df.show(5)

Here are the variables which should be numeric (float or integer):

* PassengerId: Integer
* Pclass: Integer
* SibSp: Integer
* Parch: Integer
* Survived: Integer
* Age: Float
* Fare: Float

In [None]:
# Here is an example
df = df.withColumn("AgeTmp", df["Age"].cast("float")) \
    .drop("Age") \
    .withColumnRenamed("AgeTmp", "Age")

In [None]:
df.show(5)

In [None]:
# Let's define function
def to_anytype(df, colnames, typename):
    for colname in colnames:
        df = df.withColumn("tmp", df[colname].cast(typename)) \
        .drop(colname) \
        .withColumnRenamed("tmp", colname)
    return(df)

In [None]:
intCols = ['PassengerId', 'Pclass', 'SibSp', 'Parch', 'Survived']
floatCols = ['Age', 'Fare']

df = to_anytype(df, intCols, "integer")
df = to_anytype(df, floatCols, "float")

In [None]:
df1.show()

In [None]:
df.printSchema()

#### Let's inspect data

In [None]:
df.take(5)

In [None]:
df.show(5)

#### Statistical Summary

In [None]:
df.describe('Age').show()

In [None]:
df.describe(['Age', 'Name']).show()

In [None]:
df.describe(trainDF.columns).show()

In [None]:
df.describe(trainDF.columns[1:4]).show()

In [None]:
df.describe(trainDF.columns[5:8]).show()

In [None]:
df.describe(trainDF.columns[9:12]).show()

#### Histograms

* We need the frequency count of various levels

In [None]:
age_hist = spark.sql(
    "SELECT Age AS age, \
            count(*) AS count \
    FROM train \
    GROUP BY Age \
    ORDER BY Age")
age_hist.show(n=age_hist.count())

In [None]:
age_hist = spark.sql(
    "SELECT bucket_floor, \
        CONCAT(bucket_floor, ' to ', bucket_ceiling) as bucket_name, \
        count(*) as count \
     FROM ( \
        SELECT floor(Age/5.00)*5 as bucket_floor, \
            floor(Age/5.00)*5 + 5 as bucket_ceiling \
        FROM train \
     ) a \
     GROUP BY 1, 2 \
     ORDER BY 1")

age_hist.show(n=age_hist.count())


In [None]:
def get_column(df, colname):
    coldata = df.rdd.map(lambda r: r[colname]).collect()
    coldata = ['None' if v is None else v for v in coldata] #replace None values
    return(coldata)

age = get_column(age_hist, "bucket_name")
count = get_column(age_hist, "count")

In [None]:
%matplotlib inline

barplt = sns.barplot(age, count)

In [None]:
%matplotlib inline

barplt = sns.barplot(age, count)
for item in barplt.get_xticklabels():
    item.set_rotation(45)

#### Histogram Function

In [None]:
def get_column(df, colname):
    coldata = df.rdd.map(lambda r: r[colname]).collect()
    coldata = ['None' if v is None else v for v in coldata] #replace None values
    return(coldata)

def histplot(dfname, colname, binsize):
    binsize = str(binsize)
    dfname.createOrReplaceTempView("tmpDF")
    hist_query = "SELECT bucket_floor, \
        CONCAT(bucket_floor, ' to ', bucket_ceiling) as bucket_name, \
        count(*) as count \
     FROM ( \
        SELECT floor(" + colname + "/" + binsize + ")*" + binsize + " as bucket_floor, \
            floor(" + colname + "/" + binsize + ")*" + binsize + " + " + binsize + " as bucket_ceiling \
        FROM tmpDF \
     ) a \
     GROUP BY 1, 2 \
     ORDER BY 1"
    hist_data = spark.sql(hist_query)
    xvar = get_column(hist_data, "bucket_name")
    count = get_column(hist_data, "count")
    barplt = sns.barplot(xvar, count)
    for item in barplt.get_xticklabels():
        item.set_rotation(45)
    return(barplt)

In [None]:
histplot(df, "Age", 5)

In [None]:
histplot(df, "Age", 10)

#### All Histograms

* Play with various binsizes

In [None]:
df.printSchema()

In [None]:
histplot(df, "Age", 5)

In [None]:
histplot(df, "Survived", 1)

In [None]:
histplot(df, "Survived", 1)

In [None]:
histplot(df, "Pclass", 5)

In [None]:
histplot(df, "SibSp", 1)

In [None]:
histplot(df, "Parch", 1)

In [None]:
histplot(df, "Age", 5)

In [None]:
histplot(df, "Fare", 5)

In [None]:
histplot(df, "Fare", 10)

Let's test with a categorical variable.

In [None]:
histplot(trainDF, "Embarked", 1)

In [None]:
def histplot_s(df, colname):
    xvar = get_column(df, colname)
    counter = collections.Counter(xvar)
    barplt = sns.barplot(list(counter.keys()), list(counter.values()))
    for item in barplt.get_xticklabels():
        item.set_rotation(45)
    return(barplt)

In [None]:
histplot_s(df, "Sex")

In [None]:
histplot_s(df, "Embarked")

#### Correlations

In [None]:
df.corr("Age", "Fare")

In [None]:
df.corr("Age", "Survived")

In [None]:
df.corr("Fare", "Survived")

Currently, only *pearson* is supported.

### Missing Value Imputation

In [None]:
numVars = ['Survived','Age','SibSp','Parch','Fare']
stringVars = ['Cabin', 'Embarked', 'Pclass', 'Sex']

def countNull(df,var):
    return df.where(df[var].isNull()).count()

def countEmptyString(df,var):
    return df[df[var].isin("")].count()

def countZero(df,var):
    return df[df[var].isin(0)].count()

In [None]:
missing = {var: countNull(df,var) for var in df.columns}
missing

In [None]:
missing = {var: countEmptyString(df, var) for var in df.columns}
missing

In [None]:
missing = {var: countZero(df, var) for var in df.columns}
missing

In [None]:
age_mean = df.groupBy().mean('Age').first()
age_mean

In [None]:
age_mean[0]

In [None]:
age_mean = df.groupBy().mean('Age').first()[0]
fare_mean = df.groupBy().mean('Fare').first()[0]
age_mean, fare_mean

In [None]:
df = df.na.fill({'Age':age_mean,'Fare':fare_mean})

In [None]:
df.show()

**What is wrong with what I just did?**

### Outlier Treatment

* Univariate
    - Winsorization
* Multivariate

* Is it a good idea?
* Know your data

## Feature Engineering

### Applying Domain Expertise

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
## create user defined function to extract title
getTitle = udf(lambda name: name.split('.')[0].strip(), StringType())
df = df.withColumn('Title', getTitle(df['Name']))
 
df.select('Name','Title').show(5)

In [None]:
getTitle = udf(lambda name: name.split('.')[0].split(',')[1].strip(), StringType())
df = df.withColumn('Title', getTitle(df['Name']))
 
df.select('Name','Title').show(5)

### Categorical Variable treatment

**Some algorithms can handle categorical variables directly, some can't.**

* Label Encoder
    - It is used to transform non-numerical labels to numerical labels (or nominal categorical variables)
    - Numerical labels are always between 0 and n_classes-1
    - May introduce spurious relationship
        * Age and City
* One Hot Encoding
    - Encodes categorical integer features using a one-hot aka one-of-K scheme
    - Preferable

#### Label Encoding (Indexing)

In [None]:
catVars = ['Pclass','Sex','Embarked','Title']

In [None]:
## index Sex variable
si = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_indexed')
df_indexed = si.fit(df).transform(df).drop('Sex').withColumnRenamed('Sex_indexed','Sex')

In [None]:
df_indexed.show(5)

In [None]:
# make use of pipeline to index all categorical variables
def indexer(df, col):
    si = StringIndexer(inputCol = col, outputCol = col+'_indexed').fit(df)
    return si
 
indexers = [indexer(df, col) for col in catVars]

In [None]:
pipeline = Pipeline(stages = indexers)
df_indexed = pipeline.fit(df).transform(df)
df_indexed.select('Embarked','Embarked_indexed').show(3)

In [None]:
pipeline = Pipeline(stages = indexers)
df_indexed = pipeline.fit(df_indexed).transform(df_indexed)
 
df_indexed.select('Embarked','Embarked_indexed').show(3)

* The categorical features are indexed in resulting data
* Embarked is mapped S=>0, C=>1, Q=>2

#### StringIndexer

* Maps a string column of labels to a column of label indices
* If the input column is numeric, we cast it to string and index the string values
* The indices are in [0, numLabels), ordered by label frequencies
    - So the most frequent label gets index 0.

#### Transformer

* transform one dataset into another

#### Estimator

* fit models to data

#### Pipelines

 
* A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer
* When Pipeline.fit() is called, the stages are executed in order
    - If a stage is an Estimator, its Estimator.fit() method will be called on the input dataset to fit a model
        * Then the model, which is a transformer, will be used to transform the dataset as the input to the next stage
    - If a stage is a Transformer, its Transformer.transform() method will be called to produce the dataset for the next stage
* The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages
* If there are no stages, the pipeline acts as an identity transformer.

### Timeseries Variable treatments

* Shattering
* No time/day variables here

## Data format translation

* In this step, we get the data in the format or data type expected by the algorithms
* In the case of Spark MLlib, this includes 
    - local vector
    - dense or sparse vectors
    - labeled points
    - local matrix
    - distributed matrix with row matrix
    - indexed row matrix
    - coordinate matrix
    - block matrix

In our case, we need convert features to Vectors (either SparseVector or DenseVector).

In [None]:
from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector

In [None]:
catVarsIndexed = [i + '_indexed' for i in catVars]
catVarsIndexed

In [None]:
featuresCol = numVars + catVarsIndexed
featuresCol

In [None]:
featuresCol.remove('Survived')
featuresCol

In [None]:
labelCol = ['Mark','Survived']
labelCol

In [None]:
row = Row('mark','label','features') 
row

In [None]:
df_indexed = df_indexed[labelCol + featuresCol]
df_indexed

In [None]:
# 0-mark, 1-label, 2-features
# map features to DenseVector
lf = df_indexed.rdd.map(lambda r: (row(r[0], r[1],DenseVector(r[2:])))).toDF()
lf.show()

In [None]:
# index label
# convert numeric label to categorical, which is required by
# decisionTree and randomForest
lf = StringIndexer(inputCol = 'label', outputCol='index').fit(lf).transform(lf)
 
lf.show(3)

### Split back into train/test data

In [None]:
train = lf.where(lf.mark =='train')
test = lf.where(lf.mark =='test')

In [None]:
# random split further to get train/validate
train, validate = train.randomSplit([0.7,0.3], seed =121)

In [None]:
print('Train Data Number of Row: '+ str(train.count()))
print('Validate Data Number of Row: '+ str(validate.count()))
print('Test Data Number of Row: '+ str(test.count()))

## Modeling

* ML is built based on DataFrame, while mllib is based on RDD
* We'll fit the logistic, decision tree and random forest models from ML packages

#### Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
 
# regPara: lasso regularisation parameter (L1)
lr = LogisticRegression(maxIter = 100, regParam = 0.05, labelCol='index').fit(train)

In [None]:
# Evaluate model based on auc ROC(default for binary classification)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
def testModel(model, validate = validate):
    pred = model.transform(validate)
    evaluator = BinaryClassificationEvaluator(labelCol = 'index')
    return evaluator.evaluate(pred)

In [None]:
print('AUC ROC of Logistic Regression model is: ' + str(testModel(lr)))

In [None]:
print('AUC ROC of Logistic Regression model is: ' + str(testModel(lr, validate=test)))

In [None]:
pred_test = lr.transform(test)
pred_test.show(5)

#### More Models

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
 
dt = DecisionTreeClassifier(maxDepth = 3, labelCol ='index').fit(train)
rf = RandomForestClassifier(numTrees = 100, labelCol = 'index').fit(train)
gbt = GBTClassifier(maxIter = 10, labelCol = 'index').fit(train)


In [None]:
models = {'LogisticRegression':lr,
          'DecistionTree':dt,
          'RandomForest':rf}
 
modelPerf = {k:testModel(v) for k,v in models.items()}
print(modelPerf)

In [None]:
def model_acc(model, validate=validate):
    pred = model.transform(validate)
    eval_vec = np.array(get_column(pred, "label")) == np.array(get_column(pred, "prediction")) 
    return(eval_vec.sum()/len(eval_vec))

In [None]:
model_acc(gbt)

In [None]:
models = {'LogisticRegression':lr,
          'DecistionTree':dt,
          'RandomForest':rf,
          'GradientBoostingMachines':gbt}

modelPerf = {k:model_acc(v) for k,v in models.items()}
print(modelPerf)

#### Tuning

In [None]:
for i in range(10):
    dt = DecisionTreeClassifier(maxDepth = i, labelCol ='index').fit(train)
    print('AUC ROC of Decision Tree model is' + '(for maxDepth= ' + str(i) + '): ' + str(testModel(dt)))

In [None]:
for i in range(5, 200):
    rf = RandomForestClassifier(numTrees = i, labelCol = 'index').fit(train)
    print('AUC ROC of Random Forest model is' + '(for numTrees= ' + str(i) + '): ' + str(testModel(rf)))

In [None]:
sc.stop()