# Machine Learning Base Line

## Initialization

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *

conf = SparkConf().setAppName("preprocess").setMaster("local")
sc = SparkContext.getOrCreate(conf)
#spark = SparkSession.builder.master("local").appName("preprocess").getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1490841303863_0005,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


### Load and Split Data

In [2]:
from pyspark.mllib.util import Vectors, MLUtils
from pyspark.mllib.linalg import VectorUDT
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DataType, StringType

def output_csv(df, path):
    udf = UserDefinedFunction(lambda x: Vectors.stringify(x), StringType())
    new_df = df.withColumn('features', udf(df.features))
    
    new_df.write.csv(path, header=True)
    
def read_csv(path):
    df = spark.read.csv(path, header=True, inferSchema=True)
    
    udf = UserDefinedFunction(lambda x: Vectors.parse(x), VectorUDT())
    # https://spark.apache.org/docs/latest/ml-migration-guides.html
    new_df = MLUtils.convertVectorColumnsToML(df.withColumn('features', udf(df.features)))
    
    return new_df

In [3]:
df = read_csv("/HdiNotebooks/DATA_TFIDF_HADM_TOP10.csv")
df_train, df_test = df.randomSplit(weights=[0.75, 0.25], seed=12345)
df_train.cache()
df_test.cache()
print "Train:", df_train.count()
print "Test:", df_test.count()
df_train.show()
df_test.show()

Train: 30400
Test: 10162
+------+----+----+-----+----+-----+-----+-----+-----+----+----+--------------------+
|    id|4019|2724|25000|4280|41401|53081|51881|42731|5849|5990|            features|
+------+----+----+-----+----+-----+-----+-----+-----+----+----+--------------------+
|100050|   0|   0|    0|   1|    1|    1|    0|    1|   0|   1|(40000,[69,78,104...|
|100053|   0|   0|    1|   0|    0|    0|    0|    1|   0|   0|(40000,[794,2044,...|
|100059|   1|   0|    1|   0|    1|    0|    0|    0|   0|   0|(40000,[130,207,3...|
|100061|   0|   0|    0|   1|    0|    0|    0|    0|   1|   0|(40000,[24,151,20...|
|100281|   0|   0|    0|   1|    1|    0|    0|    1|   0|   0|(40000,[30,48,78,...|
|100282|   0|   0|    0|   1|    0|    0|    0|    0|   1|   0|(40000,[379,585,7...|
|100289|   0|   0|    0|   0|    0|    0|    0|    0|   1|   0|(40000,[228,574,1...|
|100290|   1|   0|    0|   0|    1|    0|    0|    0|   0|   0|(40000,[1,20,115,...|
|100292|   0|   1|    0|   0|    0|    0

### Evaluator

In [4]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import StringType, IntegerType
import pyspark.sql.functions as F

concat_udf = F.udf(lambda cols: float(int("".join([str(int(x)) for x in cols]), 2)), DoubleType())

def evaluate(df, labelCols, metrics=["f1", "weightedPrecision", "weightedRecall", "accuracy"]):
    evaluator = MulticlassClassificationEvaluator()
    labelCols2 = [i+"_pred" for i in labelCols]
    df2 = df.withColumn("_label", concat_udf(F.array(labelCols)))
    df2 = df2.withColumn("_pred", concat_udf(F.array(labelCols2)))
    
    output = {}
    for m in metrics:
        result = evaluator.evaluate(df2, {evaluator.metricName: m,
                                         evaluator.predictionCol: "_pred",
                                         evaluator.labelCol: "_label"})
        output[m] = result
        
    return output
    

## Logistic Regression

Define our custom Logistic Regression class

In [5]:
from pyspark.ml.classification import LogisticRegression

class CustomLogisticRegression:
    def __init__(self):
        pass
    
    def fit(self, df, maxIter=100, regParam=0.0, featuresCol="features", ignoreCols=["id"]):
        self.featuresCol = featuresCol
        self.labelCols = df.columns
        self.labelCols.remove("features")
        for c in ignoreCols:
            self.labelCols.remove(c)
        self.models = []
        
        for c in self.labelCols:
            lr = LogisticRegression(featuresCol=featuresCol,
                                    labelCol=c,
                                    predictionCol=c+"_pred",
                                    probabilityCol=c+"_prob",
                                    rawPredictionCol=c+"_rpred",
                                    maxIter=maxIter,
                                    regParam=regParam,
                                    family="binomial")
            model = lr.fit(df)
            self.models.append(model)
            
    def predict(self, df):
        df_out = df
        for c, m in zip(self.labelCols, self.models):
            df_out = m.transform(df_out)
            
        return df_out
        
        

Evaluate with our data

In [6]:
for maxIter in [5, 10, 25, 50, 75, 100]:
    clr = CustomLogisticRegression()
    clr.fit(df_train, maxIter=maxIter)
    df_pred_train = clr.predict(df_train)
    df_pred_test = clr.predict(df_test)

    print "maxIter: ", maxIter
    print evaluate(df_pred_train, clr.labelCols)
    print evaluate(df_pred_test, clr.labelCols)

maxIter:  5
{'weightedPrecision': 0.32422971544867885, 'f1': 0.2487607767288541, 'weightedRecall': 0.26467105263157803, 'accuracy': 0.264671052631579}
{'weightedPrecision': 0.10921601442253366, 'f1': 0.09381365749094564, 'weightedRecall': 0.12595945679984275, 'accuracy': 0.12595945679984255}
maxIter:  10
{'weightedPrecision': 0.7887577146619332, 'f1': 0.7696060340197072, 'weightedRecall': 0.7609539473684176, 'accuracy': 0.760953947368421}
{'weightedPrecision': 0.1290803781671321, 'f1': 0.1253548498061775, 'weightedRecall': 0.13225742963983492, 'accuracy': 0.13225742963983467}
maxIter:  25
{'weightedPrecision': 0.9998710805070459, 'f1': 0.9998681263464682, 'weightedRecall': 0.9998684210526279, 'accuracy': 0.9998684210526316}
{'weightedPrecision': 0.10314755726388049, 'f1': 0.09796554367340238, 'weightedRecall': 0.09781539067112782, 'accuracy': 0.09781539067112772}
maxIter:  50
{'weightedPrecision': 0.9999999999999962, 'f1': 0.9999999999999962, 'weightedRecall': 0.9999999999999962, 'accu

## Random Forest

Define our custom Logistic Regression class

In [7]:
from pyspark.ml.classification import RandomForestClassifier

class CustomRandomForestClassifier:
    def __init__(self):
        pass
    
    def fit(self, df, maxDepth=5, maxBins=32, numTrees=20, regParam=0.0, featuresCol="features", ignoreCols=["id"]):
        self.featuresCol = featuresCol
        self.labelCols = df.columns
        self.labelCols.remove("features")
        for c in ignoreCols:
            self.labelCols.remove(c)
        self.models = []
        
        for c in self.labelCols:
            lr = RandomForestClassifier(featuresCol=featuresCol,
                                        labelCol=c,
                                        predictionCol=c+"_pred",
                                        probabilityCol=c+"_prob",
                                        rawPredictionCol=c+"_rpred",
                                        maxDepth=maxDepth,
                                        maxBins=maxBins,
                                        impurity="gini",
                                        numTrees=numTrees,
                                        seed=None)
            model = lr.fit(df)
            self.models.append(model)
            
    def predict(self, df):
        df_out = df
        for c, m in zip(self.labelCols, self.models):
            df_out = m.transform(df_out)
            
        return df_out

Evaluate with our data

In [9]:
for maxDepth in [5, 10, 20, 30]:
    clr = CustomRandomForestClassifier()
    clr.fit(df_train, maxDepth=maxDepth)
    df_pred_train = clr.predict(df_train)
    df_pred_test = clr.predict(df_test)

    print "maxDepth: ", maxDepth
    print evaluate(df_pred_train, clr.labelCols)
    print evaluate(df_pred_test, clr.labelCols)

maxDepth:  5
{'weightedPrecision': 0.06003821647001132, 'f1': 0.02490492183351698, 'weightedRecall': 0.06743421052631579, 'accuracy': 0.06743421052631579}
{'weightedPrecision': 0.06822878344400911, 'f1': 0.022155707150682047, 'weightedRecall': 0.06366856917929542, 'accuracy': 0.06366856917929542}
maxDepth:  10
{'weightedPrecision': 0.13480532836332554, 'f1': 0.06533599216963552, 'weightedRecall': 0.10861842105263164, 'accuracy': 0.10861842105263157}
{'weightedPrecision': 0.07147709071387036, 'f1': 0.04029352952448755, 'weightedRecall': 0.08364495178114549, 'accuracy': 0.08364495178114545}
maxDepth:  20
{'weightedPrecision': 0.4457382831587539, 'f1': 0.2741693567900213, 'weightedRecall': 0.29098684210526216, 'accuracy': 0.29098684210526315}
{'weightedPrecision': 0.08816127543442014, 'f1': 0.06109304953121328, 'weightedRecall': 0.10096437709112385, 'accuracy': 0.1009643770911238}
maxDepth:  30
{'weightedPrecision': 0.6360990909870188, 'f1': 0.48942733688808804, 'weightedRecall': 0.484802