In [60]:
import py4j
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.tree import RandomForest, RandomForestModel, GradientBoostedTrees
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.mllib.regression import *
from pyspark.mllib.classification import *
import math
import pandas as pd
import numpy as np
from pyspark.sql.functions import udf
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import types
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [2]:
conf = (SparkConf().setMaster("local[4]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "2g"))

In [3]:
sc = SparkContext(conf=conf)

In [4]:
sqlcontext = SQLContext(sc)

In [5]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
import numpy as np

In [6]:
from pyspark.ml.classification import GBTClassificationModel

In [7]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('./train.csv')

In [9]:
df.head(3)

[Row(PassengerId='1', Survived='0', Pclass='3', Name='Braund, Mr. Owen Harris', Sex='male', Age='22', SibSp='1', Parch='0', Ticket='A/5 21171', Fare='7.25', Cabin=None, Embarked='S'),
 Row(PassengerId='2', Survived='1', Pclass='1', Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age='38', SibSp='1', Parch='0', Ticket='PC 17599', Fare='71.2833', Cabin='C85', Embarked='C'),
 Row(PassengerId='3', Survived='1', Pclass='3', Name='Heikkinen, Miss. Laina', Sex='female', Age='26', SibSp='0', Parch='0', Ticket='STON/O2. 3101282', Fare='7.925', Cabin=None, Embarked='S')]

Features:

In [17]:
#age
def age_nominal(y):
    try:
        y = int(y)
    except:
        return -1
    return y // 20

udf_age = udf(age_nominal, returnType=IntegerType())
df = df.withColumn('age_nominal', udf_age(df['Age']))
# add onehot 
str_ind = StringIndexer(inputCol="age_nominal", outputCol="age_nominal_ind")
model = str_ind.fit(df)
ind = model.transform(df)
encoder = OneHotEncoder(inputCol="age_nominal_ind", outputCol="age_nominal_map")
df = encoder.transform(ind)

In [26]:
#fare
def fare_nominal(p):
    try:
        p = float(p)
    except:
        return -1
    return int(p // 5)

udf_price = udf(fare_nominal, returnType=IntegerType())
df = df.withColumn('fare_nominal', udf_price(df['Fare']))
# add onehot 
str_ind = StringIndexer(inputCol="fare_nominal", outputCol="fare_nominal_ind")
model = str_ind.fit(df)
ind = model.transform(df)
encoder = OneHotEncoder(inputCol="fare_nominal_ind", outputCol="fare_nominal_map")
df = encoder.transform(ind)

In [28]:
#alone
def not_alone(sibsp, parch):
    return 0 if int(sibsp) + int(parch) == 0 else 1
udf_alone = udf(not_alone, returnType=IntegerType())
df = df.withColumn('not_alone', udf_alone(df['Sibsp'], df['Parch']))

In [None]:
##encode pclass

str_ind = StringIndexer(inputCol="Pclass", outputCol="Pclass_ind")
model = str_ind.fit(df)
ind = model.transform(df)
encoder = OneHotEncoder(inputCol="Pclass_ind", outputCol="Pclass_map")
df = encoder.transform(ind)

In [29]:




def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
df.select('Embarked').distinct().collect()

[Row(Embarked='Q'), Row(Embarked='C'), Row(Embarked='S'), Row(Embarked='')]

In [30]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)


In [31]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, age_nominal: int, age_nominal_ind: double, age_nominal_map: vector, fare_nominal: int, fare_nominal_ind: double, fare_nominal_map: vector, not_alone: int, EmbarkedIndex: double, EmbarkedVec: vector]

In [32]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [33]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
        ] + list(r.EmbarkedVec.toArray())
    )

In [34]:
data = df_t.rdd.map(transf)

In [35]:
train, test = data.randomSplit([0.7, 0.3])

In [36]:
train.cache()

PythonRDD[140] at RDD at PythonRDD.scala:48

In [37]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
rfc = RandomForest.trainClassifier(train, numClasses=2,
                             categoricalFeaturesInfo={},
                            numTrees=100)

In [38]:
def acc(m, test):
    values = test.map(lambda x: x.features)
    yhat = rfc.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: np.absolute(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

In [39]:
acc(rfc, test)

0.82229965156794427

In [43]:
#f1 metric
def f1(model, test):
    values = test.map(lambda x: x.features)
    yhat = model.predict(values)
    yhat = yhat.map(lambda x: float(x))
    y = test.map(lambda x: float(x.label))
    predictions_and_labels = yhat.zip(y)
    metrics = MulticlassMetrics(predictions_and_labels)
    f1Score = metrics.fMeasure()
    
    return f1Score

In [44]:
f1(rfc,test)



0.8222996515679443

In [55]:
rfc = LogisticRegressionWithSGD.train(train)

  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


In [56]:
acc(rfc, test), f1(rfc, test)



(0.61672473867595823, 0.6167247386759582)

In [58]:
rfc = LogisticRegressionWithLBFGS.train(train)

In [59]:
acc(rfc, test), f1(rfc, test)



(0.80836236933797911, 0.8083623693379791)

In [62]:
rfc = GradientBoostedTrees.trainClassifier(train,
                                           categoricalFeaturesInfo={})

In [63]:
acc(rfc, test), f1(rfc, test)



(0.83275261324041816, 0.8327526132404182)

In [None]:
# добавить 5 новых фичей
# 3 фичи высчитываются из имеющихся
# хотя бы одна использует udf

# попробовать 3 новых модели

# f1 меру