In [1]:
import os

In [2]:
os.environ["JAVA_HOME"] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

In [3]:
os.environ["SPARK_HOME"] = '/usr/local/Cellar/apache-spark/2.2.0/libexec'

In [4]:
import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")

In [5]:
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [6]:
import py4j
from pyspark import SparkContext, SparkConf, SQLContext

In [7]:
conf = (SparkConf().setMaster("local[8]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "2g"))

In [8]:
sc = SparkContext(conf=conf)

In [9]:
sqlcontext = SQLContext(sc)

In [10]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
import numpy as np

In [11]:
from pyspark.ml.classification import GBTClassificationModel

In [12]:
a = LabeledPoint(0.0,[0.0, 1.0, 2.0])

In [13]:
a.label

0.0

In [14]:
a.features

DenseVector([0.0, 1.0, 2.0])

In [15]:
data=[
    LabeledPoint(0.0,[0.0]),
    LabeledPoint(1.0,[1.0]),
    LabeledPoint(3.0,[2.0]),
    LabeledPoint(2.0,[3.0])
]
lrm=LinearRegressionWithSGD.train(sc.parallelize(data),iterations=10,initialWeights=np.array([1.0]))
print(lrm.predict(np.array([1.0])))



0.928638123469


### Titanic dataset


|**Variable**|Definition| Keys|
|-------------------|-|-|
|**survival**|	Survival|	0 = No, 1 = Yes|
|**pclass**	|Ticket class|	1 = 1st (upper), 2 = 2nd, 3 = 3rd (lower)|
|**sex**|Sex|(male/female)|
|**Age**|	Age in years |Fractional if < 1. If the age is estimated -- in the form of xx.5
|**sibsp**| # of siblings / spouses aboard the Titanic||
|**parch**| # of parents / children aboard the Titanic||
|**ticket**|Ticket number||
|**fare**|Passenger fare (плата за проезд)||
|**cabin**|Cabin number||
|**embarked**|Port of Embarkation|C = Cherbourg, Q = Queenstown, S = Southampton|

In [16]:
import pandas as pd
pdf = pd.read_csv('titanic.csv')
pdf.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [17]:
pdf.columns

Index(['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp',
       'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked'],
      dtype='object')

In [23]:
# Pairwise columns correlation
pdf.corr(method='pearson')

Unnamed: 0,PassengerId,Survived,Pclass,Age,SibSp,Parch,Fare
PassengerId,1.0,-0.005007,-0.035144,0.036847,-0.057527,-0.001652,0.012658
Survived,-0.005007,1.0,-0.338481,-0.077221,-0.035322,0.081629,0.257307
Pclass,-0.035144,-0.338481,1.0,-0.369226,0.083081,0.018443,-0.5495
Age,0.036847,-0.077221,-0.369226,1.0,-0.308247,-0.189119,0.096067
SibSp,-0.057527,-0.035322,0.083081,-0.308247,1.0,0.414838,0.159651
Parch,-0.001652,0.081629,0.018443,-0.189119,0.414838,1.0,0.216225
Fare,0.012658,0.257307,-0.5495,0.096067,0.159651,0.216225,1.0


In [18]:
data = pdf['Name']

In [19]:
clean_data = pdf.drop('Name', axis=1)
#clean_data = pdf.drop('Ticket', axis=1)
clean_data.tail(10)

Unnamed: 0,PassengerId,Survived,Pclass,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
881,882,0,3,male,33.0,0,0,349257,7.8958,,S
882,883,0,3,female,22.0,0,0,7552,10.5167,,S
883,884,0,2,male,28.0,0,0,C.A./SOTON 34068,10.5,,S
884,885,0,3,male,25.0,0,0,SOTON/OQ 392076,7.05,,S
885,886,0,3,female,39.0,0,5,382652,29.125,,Q
886,887,0,2,male,27.0,0,0,211536,13.0,,S
887,888,1,1,female,19.0,0,0,112053,30.0,B42,S
888,889,0,3,female,,1,2,W./C. 6607,23.45,,S
889,890,1,1,male,26.0,0,0,111369,30.0,C148,C
890,891,0,3,male,32.0,0,0,370376,7.75,,Q


In [20]:
# Создаем новый столбец, чтобы посмотреть общее количество попутчиков пассажира (братья, сестры, жены, мужья + родители)
clean_data['Companions'] = clean_data['SibSp'] + clean_data['Parch']
clean_data.head(10)

Unnamed: 0,PassengerId,Survived,Pclass,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Companions
0,1,0,3,male,22.0,1,0,A/5 21171,7.25,,S,1
1,2,1,1,female,38.0,1,0,PC 17599,71.2833,C85,C,1
2,3,1,3,female,26.0,0,0,STON/O2. 3101282,7.925,,S,0
3,4,1,1,female,35.0,1,0,113803,53.1,C123,S,1
4,5,0,3,male,35.0,0,0,373450,8.05,,S,0
5,6,0,3,male,,0,0,330877,8.4583,,Q,0
6,7,0,1,male,54.0,0,0,17463,51.8625,E46,S,0
7,8,0,3,male,2.0,3,1,349909,21.075,,S,4
8,9,1,3,female,27.0,0,2,347742,11.1333,,S,2
9,10,1,2,female,14.0,1,0,237736,30.0708,,C,1


Пропуски в данных в колонках: Age (тип float, кстати), Cabin (ооочень много пропусков) и 2 пропуска в Embarked. 

In [21]:
clean_data['Cabin'].values

array([nan, 'C85', nan, 'C123', nan, nan, 'E46', nan, nan, nan, 'G6',
       'C103', nan, nan, nan, nan, nan, nan, nan, nan, nan, 'D56', nan,
       'A6', nan, nan, nan, 'C23 C25 C27', nan, nan, nan, 'B78', nan, nan,
       nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan,
       nan, nan, nan, nan, nan, 'D33', nan, 'B30', 'C52', nan, nan, nan,
       nan, nan, 'B28', 'C83', nan, nan, nan, 'F33', nan, nan, nan, nan,
       nan, nan, nan, nan, 'F G73', nan, nan, nan, nan, nan, nan, nan, nan,
       nan, nan, nan, nan, 'C23 C25 C27', nan, nan, nan, 'E31', nan, nan,
       nan, 'A5', 'D10 D12', nan, nan, nan, nan, 'D26', nan, nan, nan, nan,
       nan, nan, nan, 'C110', nan, nan, nan, nan, nan, nan, nan, 'B58 B60',
       nan, nan, nan, nan, 'E101', 'D26', nan, nan, nan, 'F E69', nan, nan,
       nan, nan, nan, nan, nan, 'D47', 'C123', nan, 'B86', nan, nan, nan,
       nan, nan, nan, nan, nan, 'F2', nan, nan, 'C2', nan, nan, nan, nan,
       nan, nan, nan, nan, nan, nan, na

In [22]:
import matplotlib.pyplot as plt
%matplotlib inline

In [26]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('/Users/MarinaAnanyeva/Downloads/titanic.csv')

In [27]:
df.head(1)

[Row(PassengerId='1', Survived='0', Pclass='3', Name='Braund, Mr. Owen Harris', Sex='male', Age='22', SibSp='1', Parch='0', Ticket='A/5 21171', Fare='7.25', Cabin=None, Embarked='S')]

In [33]:
from pyspark.sql.functions import udf
from pyspark.sql import types

def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
df.select('Embarked').distinct().collect()

[Row(Embarked='Q'), Row(Embarked='C'), Row(Embarked='S'), Row(Embarked='')]

In [34]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)

In [35]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector]

In [36]:
df_t.head(1)

[Row(PassengerId='1', Survived='0', Pclass='3', Name='Braund, Mr. Owen Harris', Sex='male', Age='22', SibSp='1', Parch='0', Ticket='A/5 21171', Fare='7.25', Cabin=None, Embarked='S', EmbarkedIndex=0.0, EmbarkedVec=SparseVector(3, {0: 1.0}))]

In [37]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [38]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
        ] + list(r.EmbarkedVec.toArray())
    )

In [39]:
data = df_t.rdd.map(transf)

In [46]:
train, test = data.randomSplit([0.7, 0.3])

In [47]:
train.cache()
test.cache()

PythonRDD[162] at RDD at PythonRDD.scala:48

In [48]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
rfc = RandomForest.trainClassifier(train, numClasses=2,
                             categoricalFeaturesInfo={},
                            numTrees=100)

In [50]:
def acc(m, test):
    values = test.map(lambda x: x.features)
    yhat = rfc.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

In [51]:
acc(rfc, test)

0.8345323741007195

## Hometask

In [26]:
# добавить 5 новых фичей (всего)
# 3 фичи высчитываются из имеющихся
# хотя бы одна использует udf (user defined function)

# попробовать 3 новых модели

# Посчитать F-1 меру

### Feature 0. Companions (Siblings, Spouse, Parents) 

In [81]:
def Companions(x, y):
    return(str(int(x) + int(y)))

udf2 = udf(Companions, types.StringType())
df_t = df_t.withColumn('Companions', udf2(df_t['SibSp'],df_t['Parch']))

### Feature 1. Name title

In [83]:
def name_title(x):
    y = x.split(", ")[1]
    z = y.split(".")[0]
    return(z)

udf3 = udf(name_title, types.StringType())
df_t = df_t.withColumn('Title', udf3(df_t['Name']))
df_t.select('Title').distinct().collect()

[Row(Title='Don'),
 Row(Title='Miss'),
 Row(Title='Col'),
 Row(Title='Rev'),
 Row(Title='Lady'),
 Row(Title='Master'),
 Row(Title='Mme'),
 Row(Title='Capt'),
 Row(Title='Mr'),
 Row(Title='Dr'),
 Row(Title='Mrs'),
 Row(Title='Sir'),
 Row(Title='Jonkheer'),
 Row(Title='Mlle'),
 Row(Title='Major'),
 Row(Title='Ms'),
 Row(Title='the Countess')]

In [85]:
stringIndexer2 = StringIndexer(inputCol="Title", outputCol="TitleIndex")
model2 = stringIndexer2.fit(df_t)
indexed2 = model2.transform(df_t)
encoder2 = OneHotEncoder(inputCol="TitleIndex", outputCol="TitleVec")
df_t = encoder2.transform(indexed2)

In [86]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector, Companions: string, Title: string, TitleIndex: double, TitleVec: vector]

### Feature 3. Sex

In [134]:
def sex_transf(sex):
    '''Function transforms sex into binary output
    Input: string
    Output: vector'''
    if sex == 'male':
        return 1
    elif sex == 'female':
        return 2
    else:
        return 0

### Feature 4. Age

We need the feature of 'age' to be non-negative
That's why we use the function to get rid of zeroes. 

In [135]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return 0

### Feature 5. Cabin type

In [136]:
#binary 
def parse_cabin(str_cabin):
    return 0 if str_cabin == None else 1

In [None]:
# multi function
def cabin_type(str_cabin):
    if str_cabin == 'None':
        return 0
    elif str_cabin == '1':
        return 1
    elif str_cabin == '2':
        return 2
    elif str_cabin == '3':
        return 3
    else:
        return 0

### F1-score (среднее гармоническое)
$F1 = \frac{2 * Precision * Recall}{Precision + Recall} $

In [127]:
# lazy approach - from sklearn.metrics import f1_score

# f-1 мера ручками
def recall(true, test):
    values = test.map(lambda x: x.features)
    yhat = true.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    true_positive = comp.map(lambda x: 1 if (x[0] == 1 == x[1]) else 0)
    condition_condition_positive = comp.map(lambda x: 1 if (x[1] == 1) else 0)
    return float(true_positive.sum())/ condition_condition_positive.sum()

In [128]:
def precision(true, test):
    values = test.map(lambda x: x.features)
    yhat = true.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    true_positive = comp.map(lambda x: 1 if (x[0] == 1 == x[1]) else 0)
    predicted_condition_positive = comp.map(lambda x: 1 if (x[0] == 1) else 0)
    return float(true_positive.sum()) / predicted_condition_positive.sum()

In [129]:
def f1_score(true, test):
    return 2 * (recall(true, test) * precision(true, test)) / (recall(true, test) + precision(true, test))

In [130]:
f1_score(rfc, test)

0.7604166666666665

In [None]:
# def acc(m, test):
#     values = test.map(lambda x: x.features)
#     yhat = m.predict(values)
#     y = test.map(lambda x: x.label)
#     comp = yhat.zip(y)
#     errors = comp.map(lambda x: abs(x[0] - x[1]))
#     return 1 - errors.sum()/errors.count()

# def f1_score(m, test):
#     values = test.map(lambda x: x.features)
#     yhat = m.predict(values)
#     y = test.map(lambda x: x.label)
#     comp = yhat.zip(y)
#     precision_1 = comp.map(lambda x:  x[0]*x[1])
#     precision_2 = comp.map(lambda x: x[0] == x[1])
#     precision = precision_1.sum()/ precision_2.sum()
#     recall_1 = comp.map(lambda x:  x[0]*x[1])
#     recall_2 = comp.map(lambda x: x[1]>0)
#     recall = recall_1.sum()/recall_2.sum()
#     f1_score = 2*precision*recall/(precision + recall)
#     return f1_score

### Preprocessing for 'Embarked' 

In [73]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('titanic.csv')
def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
df.select('Embarked').distinct().collect()

[Row(Embarked='Q'), Row(Embarked='C'), Row(Embarked='S'), Row(Embarked='')]

### Preprocessing for 'Ticket' 

In [74]:
def ticket_transform(x):
    if '/' in x:
        x = x.split('/')[0]
        return x
    return 'none'

my_udf_ticket = udf(ticket_transform, types.StringType())
df = df.withColumn('Ticket', my_udf_ticket(df['Ticket']))
df.select('Ticket').distinct().collect()

[Row(Ticket='SC'),
 Row(Ticket='none'),
 Row(Ticket='SW'),
 Row(Ticket='WE'),
 Row(Ticket='SO'),
 Row(Ticket='SOTON'),
 Row(Ticket='W.'),
 Row(Ticket='S.C.'),
 Row(Ticket='SCO'),
 Row(Ticket='A.'),
 Row(Ticket='C.A.'),
 Row(Ticket='A'),
 Row(Ticket='S.O.'),
 Row(Ticket='W'),
 Row(Ticket='STON'),
 Row(Ticket='P'),
 Row(Ticket='S.W.')]

In [35]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector]

## One Hot Encoding

In [75]:
stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)

In [76]:
my_stringIndexer = StringIndexer(inputCol="Ticket", outputCol="TicketIndex")
my_model = my_stringIndexer.fit(df_t)
my_indexed = my_model.transform(df_t)
my_encoder = OneHotEncoder(inputCol="TicketIndex", outputCol="TicketVec")
my_df_t = my_encoder.transform(my_indexed)

## Data transformation to LabelPoint

In [112]:
def my_transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            #int(r.SibSp),
            #int(r.Parch),
            r.Companions == '0', #feature1
            parse_age(r.Age),
            #int(len(r.Name)),
            r.Title == 'Mr',
            parse_cabin(r.Cabin) # 2
        ] + list(r.EmbarkedVec.toArray()) \
          #+ list(r.TicketVec)  # 3 UDF
    )

In [113]:
my_data = df_t.rdd.map(my_transf)
my_train, my_test = my_data.randomSplit([0.7, 0.3])
my_train.cache()
my_test.cache()

PythonRDD[462] at RDD at PythonRDD.scala:48

In [114]:
my_NaiveBayesModel = NaiveBayes.train(my_train)

In [115]:
predictionAndLabel = my_train.map(lambda p: (my_NaiveBayesModel.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / my_train.count()
print('model accuracy {}'.format(accuracy))

model accuracy 0.6840390879478827


## Different models testing

In [None]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
from pyspark.mllib.classification import SVMWithSGD, NaiveBayes, LogisticRegressionWithSGD
from pyspark.ml.classification import GBTClassificationModel

###  #1 Naive Bayes </font>

In [222]:
my_NaiveBayesModel = NaiveBayes.train(my_train)

In [223]:
predictionAndLabel = my_train.map(lambda p: (my_NaiveBayesModel.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / my_train.count()
print('model accuracy {}'.format(accuracy))

model accuracy 0.665625


In [131]:
print('F1-score: ', f1_score(my_NaiveBayesModel, my_test))

F1-score:  0.5533980582524272


### #2 SVM with Stohastis Gradient Descent </font>

In [116]:
my_SVMWithSGD = SVMWithSGD.train(my_train)

In [117]:
predictionAndLabel = my_train.map(lambda p: (my_SVMWithSGD.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / my_train.count()
print('model accuracy {}'.format(accuracy))

model accuracy 0.6482084690553745


In [132]:
print("F1-score: ", f1_score(my_SVMWithSGD, my_test))

F1-score:  0.6192170818505337


### #3 Logistic Regression with LBFGS

In [121]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
logreg_model = LogisticRegressionWithLBFGS.train(my_train)

In [122]:
def my_acc(m, test):
    predictionAndLabel = test.map(lambda p: (m.predict(p.features), p.label))
    accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test.count()
    print('model accuracy {}'.format(accuracy))

my_acc(logreg_model, my_test)

model accuracy 0.7509025270758123


In [133]:
print('F1-score: ', f1_score(logreg_model, my_test))

F1-score:  0.6986899563318777
