In [1]:
import os

In [2]:
#os.environ["JAVA_HOME"] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home/'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

In [3]:
os.environ["SPARK_HOME"] = 'spark-2.2.0-bin-hadoop2.7'

In [4]:
import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")

In [5]:
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [6]:
import py4j
import pyspark
from pyspark import SparkContext, SparkConf, SQLContext

In [7]:
conf = (SparkConf().setMaster("local[4]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "1g"))

In [8]:
sc = SparkContext(conf=conf)

In [9]:
sqlcontext = SQLContext(sc)

In [10]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
import numpy as np

In [11]:
from pyspark.ml.classification import GBTClassificationModel

In [12]:
data=[
    LabeledPoint(0.0,[0.0]),
    LabeledPoint(1.0,[1.0]),
    LabeledPoint(3.0,[2.0]),
    LabeledPoint(2.0,[3.0])
]
lrm=LinearRegressionWithSGD.train(sc.parallelize(data),iterations=10,initialWeights=np.array([1.0]))
print(lrm.predict(np.array([1.0])))



0.928638123469


In [13]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('train.csv')

In [14]:
df.head(3)

[Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=u'Braund, Mr. Owen Harris', Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A/5 21171', Fare=u'7.25', Cabin=None, Embarked=u'S'),
 Row(PassengerId=u'2', Survived=u'1', Pclass=u'1', Name=u'Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex=u'female', Age=u'38', SibSp=u'1', Parch=u'0', Ticket=u'PC 17599', Fare=u'71.2833', Cabin=u'C85', Embarked=u'C'),
 Row(PassengerId=u'3', Survived=u'1', Pclass=u'3', Name=u'Heikkinen, Miss. Laina', Sex=u'female', Age=u'26', SibSp=u'0', Parch=u'0', Ticket=u'STON/O2. 3101282', Fare=u'7.925', Cabin=None, Embarked=u'S')]

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql import types

def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
df.select('Embarked').distinct().collect()

[Row(Embarked=u'Q'), Row(Embarked=u'C'), Row(Embarked=u'S'), Row(Embarked=u'')]

In [16]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)

In [17]:
test_df = sqlcontext.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(test_df).transform(test_df)
indexed.show()

+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+



In [18]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [19]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
        ] + list(r.EmbarkedVec.toArray())
    )

In [20]:
data = df_t.rdd.map(transf)

In [21]:
data.take(1)

[LabeledPoint(0.0, [3.0,1.0,7.25,1.0,0.0,22.0,1.0,0.0,0.0])]

In [22]:
train, test = data.randomSplit([0.7, 0.3])

In [23]:
train.cache()
test.cache()

PythonRDD[85] at RDD at PythonRDD.scala:48

In [24]:
train.take(1)

[LabeledPoint(1.0, [1.0,0.0,71.2833,1.0,0.0,38.0,0.0,1.0,0.0])]

In [25]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
rfc = RandomForest.trainClassifier(train, numClasses=2,
                             categoricalFeaturesInfo={},
                            numTrees=100)

In [26]:
def acc(m, test):
    values = test.map(lambda x: x.features)
    yhat = m.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

In [27]:
acc(rfc, test)

0.8086642599277978

## Home task

In [28]:
# добавить 5 новых фичей
# 3 фичи высчитываются из имеющихся
# хотя бы одна использует udf

# попробовать 3 новых модели

# f1 меру

In [29]:
test.cache()

PythonRDD[85] at RDD at PythonRDD.scala:48

In [30]:
def precision(m, test):
    values = test.map(lambda x: x.features)
    yhat = m.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    true_positive = comp.map(lambda x: 1 if (x[0] == 1 == x[1]) else 0)
    predicted_condition_positive = comp.map(lambda x: 1 if (x[0] == 1) else 0)
    return float(true_positive.sum())/predicted_condition_positive.sum()

In [31]:
precision(rfc, test)

0.7974683544303798

In [32]:
def recall(m, test):
    values = test.map(lambda x: x.features)
    yhat = m.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    true_positive = comp.map(lambda x: 1 if (x[0] == 1 == x[1]) else 0)
    condition_condition_positive = comp.map(lambda x: 1 if (x[1] == 1) else 0)
    return float(true_positive.sum())/condition_condition_positive.sum()

In [33]:
recall(rfc, test)

0.63

In [34]:
def f1_score(m, test):
    return 2/(1/recall(m, test) + 1/precision(m, test))

In [35]:
f1_score(rfc,test)

0.7039106145251397

## feature 1  (need non-negative for Bayes)

In [36]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return 0

## feature 2

In [37]:
def parse_cabin(str_cabin):
    return 0 if str_cabin == None else 1

In [38]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('train.csv')
def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
df.select('Embarked').distinct().collect()

[Row(Embarked=u'Q'), Row(Embarked=u'C'), Row(Embarked=u'S'), Row(Embarked=u'')]

## feature 3

In [39]:
def ticket_transform(x):
    x.replace(' ', '/')
    if '/' in x:
        x = x.split('/')[0]
        return x
    return 'none'

my_udf_ticket = udf(ticket_transform, types.StringType())
df = df.withColumn('Ticket', my_udf_ticket(df['Ticket']))
df.select('Ticket').distinct().collect()

[Row(Ticket=u'SC'),
 Row(Ticket=u'none'),
 Row(Ticket=u'SW'),
 Row(Ticket=u'WE'),
 Row(Ticket=u'SO'),
 Row(Ticket=u'SOTON'),
 Row(Ticket=u'W.'),
 Row(Ticket=u'S.C.'),
 Row(Ticket=u'SCO'),
 Row(Ticket=u'A.'),
 Row(Ticket=u'C.A.'),
 Row(Ticket=u'A'),
 Row(Ticket=u'S.O.'),
 Row(Ticket=u'W'),
 Row(Ticket=u'STON'),
 Row(Ticket=u'P'),
 Row(Ticket=u'S.W.')]

## feature 4

In [40]:
def name_transform(x):
    x = x.encode('ascii').lower()
    if 'miss' in x:
        return 1
    elif 'mrs' in x:
        return 2
    elif 'mr' in x:
        return 3
    else:
        return 0

my_udf_name = udf(name_transform, types.IntegerType())
df = df.withColumn('Name', my_udf_name(df['Name']))
df.select('Name').distinct().collect()

[Row(Name=1), Row(Name=3), Row(Name=2), Row(Name=0)]

In [41]:
df

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: int, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string]

In [42]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector]

In [43]:
stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)

## feature 5 (OneHotEncoder)

In [44]:
my_stringIndexer = StringIndexer(inputCol="Ticket", outputCol="TicketIndex")
my_model = my_stringIndexer.fit(df_t)
my_indexed = my_model.transform(df_t)
my_encoder = OneHotEncoder(inputCol="TicketIndex", outputCol="TicketVec")
my_df_t = my_encoder.transform(my_indexed)

In [45]:
def my_transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age), # feature 1
            int(r.Name),  # feature 4
            parse_cabin(r.Cabin) # feature 2
        ] + list(r.EmbarkedVec.toArray()) \
          + list(r.TicketVec.toArray())  # feature 3 + UDF + feature 5 (OneHot)
    )

In [46]:
from pyspark.mllib.classification import NaiveBayes, SVMWithSGD

In [47]:
df.take(1)

[Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=3, Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A', Fare=u'7.25', Cabin=None, Embarked=u'S')]

In [48]:
df_t.take(1)

[Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=3, Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A', Fare=u'7.25', Cabin=None, Embarked=u'S', EmbarkedIndex=0.0, EmbarkedVec=SparseVector(3, {0: 1.0}))]

In [49]:
my_df_t.take(1)

[Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=3, Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A', Fare=u'7.25', Cabin=None, Embarked=u'S', EmbarkedIndex=0.0, EmbarkedVec=SparseVector(3, {0: 1.0}), TicketIndex=1.0, TicketVec=SparseVector(16, {1: 1.0}))]

In [50]:
my_data = my_df_t.rdd.map(my_transf)

In [51]:
my_data.take(1)

[LabeledPoint(0.0, [3.0,1.0,7.25,1.0,0.0,22.0,3.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])]

In [52]:
my_train, my_test = my_data.randomSplit([0.7, 0.3])

In [53]:
my_train.cache()
my_test.cache()

PythonRDD[245] at RDD at PythonRDD.scala:48

In [54]:
my_train.take(2)

[LabeledPoint(0.0, [3.0,1.0,7.25,1.0,0.0,22.0,3.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(1.0, [1.0,0.0,71.2833,1.0,0.0,38.0,2.0,1.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])]

# Models

In [55]:
def my_acc(m, test):
    predictionAndLabel = test.map(lambda p: (m.predict(p.features), p.label))
    accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
    print('model accuracy {}'.format(accuracy))

In [56]:
my_NaiveBayesModel = NaiveBayes.train(my_train)

In [57]:
my_acc(my_NaiveBayesModel, my_test)

model accuracy 0.68275862069


In [58]:
from pyspark.mllib.classification import SVMWithSGD

In [59]:
my_SVMWithSGD = SVMWithSGD.train(my_train)

In [60]:
my_acc(my_SVMWithSGD, my_test)

model accuracy 0.634482758621


In [61]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

In [62]:
my_LogisticRegressionWithSGD = LogisticRegressionWithSGD.train(my_train)

  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


In [63]:
my_acc(my_LogisticRegressionWithSGD, my_test)

model accuracy 0.634482758621


In [64]:
acc(my_NaiveBayesModel, my_test)

0.6827586206896552

In [65]:
acc(my_SVMWithSGD, my_test)

0.6344827586206896

In [66]:
acc(my_LogisticRegressionWithSGD, my_test)

0.6344827586206896