In [1]:
import os

In [2]:
os.environ["JAVA_HOME"] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

In [3]:
os.environ["SPARK_HOME"] = '/usr/local/Cellar/apache-spark/2.2.0/libexec'

In [4]:
import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")

In [5]:
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [6]:
import py4j
from pyspark import SparkContext, SparkConf, SQLContext

In [7]:
conf = (SparkConf().setMaster("local[8]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "2g"))

In [8]:
sc = SparkContext(conf=conf)

In [9]:
sqlcontext = SQLContext(sc)

In [10]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
import numpy as np

In [11]:
from pyspark.ml.classification import GBTClassificationModel

In [12]:
data=[
    LabeledPoint(0.0,[0.0]),
    LabeledPoint(1.0,[1.0]),
    LabeledPoint(3.0,[2.0]),
    LabeledPoint(2.0,[3.0])
]
lrm=LinearRegressionWithSGD.train(sc.parallelize(data),iterations=10,initialWeights=np.array([1.0]))
print(lrm.predict(np.array([1.0])))



0.928638123469


In [13]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('train.csv')

In [14]:
df.head(3)

[Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=u'Braund, Mr. Owen Harris', Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A/5 21171', Fare=u'7.25', Cabin=None, Embarked=u'S'),
 Row(PassengerId=u'2', Survived=u'1', Pclass=u'1', Name=u'Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex=u'female', Age=u'38', SibSp=u'1', Parch=u'0', Ticket=u'PC 17599', Fare=u'71.2833', Cabin=u'C85', Embarked=u'C'),
 Row(PassengerId=u'3', Survived=u'1', Pclass=u'3', Name=u'Heikkinen, Miss. Laina', Sex=u'female', Age=u'26', SibSp=u'0', Parch=u'0', Ticket=u'STON/O2. 3101282', Fare=u'7.925', Cabin=None, Embarked=u'S')]

In [15]:
from pyspark.sql.functions import udf

from pyspark.sql import types



def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked'])) # применяем my_udf(df['Embarked'])
df.select('Embarked').distinct().collect()

[Row(Embarked=u'Q'), Row(Embarked=u'C'), Row(Embarked=u'S'), Row(Embarked=u'')]

In [16]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)

In [17]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector]

In [18]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [19]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
        ] + list(r.EmbarkedVec.toArray())
    )

In [20]:
data = df_t.rdd.map(transf)

In [21]:
train, test = data.randomSplit([0.7, 0.3])

In [22]:
train.cache()

PythonRDD[98] at RDD at PythonRDD.scala:48

In [23]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
rfc = RandomForest.trainClassifier(train, numClasses=2,
                             categoricalFeaturesInfo={},
                            numTrees=100)

In [24]:
def acc(m, test):
    values = test.map(lambda x: x.features)
    yhat = rfc.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

In [25]:
acc(rfc, test)

0.8007380073800738

In [26]:
# добавить 5 новых фичей
# 3 фичи высчитываются из имеющихся
# хотя бы одна использует udf

# попробовать 3 новых модели

# f1 меру

# Начало домашки

In [27]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector]

In [28]:
def Little_girls(age, sex):
    return int((sex == 'female') * (age < 18))

# Old в смысле взрослый, а не старый
def Old_mans(age, sex):
    return int((sex == 'male') * (age > 45))

def Cabin_letters(cabin_string):
    try:
        return cabin_string[0]
    except:
        return 'na'

# имеет смысл разбить 'age' на 5 групп
def Age_group(age):
    try:
        age = int(age)
        if age < 12:
            return 1
        elif 12 <= age < 24:
            return 2
        elif 24 <= age < 36:
            return 3
        else:
            return 4
    except:
        return -1

def Single(sibsp, parch):
    if sibsp == '0' and parch == '0':
        return 1
    else:
        return 0

lg_udf = udf(Little_girls, types.IntegerType())
om_udf = udf(Old_mans, types.IntegerType())
cl_udf = udf(Cabin_letters, types.StringType())
ag_udf = udf(Age_group, types.IntegerType())
si_udf = udf(Single, types.IntegerType())

df_t = df_t.withColumn('Little_Girl', lg_udf(df_t['age'], df_t['sex']))
df_t = df_t.withColumn('Old_Man', om_udf(df_t['age'], df_t['sex']))
df_t = df_t.withColumn('Cabin_letter', cl_udf(df_t['Cabin']))
df_t = df_t.withColumn('Age_group', ag_udf(df_t['age']))
df_t = df_t.withColumn('Single', si_udf(df_t['SibSp'], df_t['Parch']))

In [29]:
# проверки, что все пошло по плану

In [30]:
df_t.select('Little_Girl').distinct().collect()

[Row(Little_Girl=1), Row(Little_Girl=0)]

In [31]:
df_t.select('Old_Man').distinct().collect()

[Row(Old_Man=1), Row(Old_Man=0)]

In [32]:
df_t.select('Cabin_letter').distinct().collect()

[Row(Cabin_letter=u'F'),
 Row(Cabin_letter=u'E'),
 Row(Cabin_letter=u'T'),
 Row(Cabin_letter=u'B'),
 Row(Cabin_letter=u'D'),
 Row(Cabin_letter=u'C'),
 Row(Cabin_letter=u'A'),
 Row(Cabin_letter=u'G'),
 Row(Cabin_letter=u'na')]

In [33]:
df_t.select('Age_group').distinct().collect()

[Row(Age_group=-1),
 Row(Age_group=1),
 Row(Age_group=3),
 Row(Age_group=4),
 Row(Age_group=2)]

In [34]:
df_t.select('Single').distinct().collect()

[Row(Single=1), Row(Single=0)]

In [35]:
# преобразование 'Cabin letter' и 'Age group' в one-hot представление

In [36]:
stringIndexer = StringIndexer(inputCol="Cabin_letter", outputCol="Cabin_letter_index")
model = stringIndexer.fit(df_t)
indexed = model.transform(df_t)
encoder = OneHotEncoder(inputCol="Cabin_letter_index", outputCol="Cabin_letter_vec")
df_t = encoder.transform(indexed)

In [37]:
stringIndexer = StringIndexer(inputCol="Age_group", outputCol="Age_group_index")
model = stringIndexer.fit(df_t)
indexed = model.transform(df_t)
encoder = OneHotEncoder(inputCol="Age_group_index", outputCol="Age_group_vec")
df_t = encoder.transform(indexed)

In [38]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector, Little_Girl: int, Old_Man: int, Cabin_letter: string, Age_group: int, Single: int, Cabin_letter_index: double, Cabin_letter_vec: vector, Age_group_index: double, Age_group_vec: vector]

In [39]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            # исходные признаки
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
            
            # добавленные признаки
            int(r.Little_Girl),
            int(r.Old_Man),
            int(r.Single),
        ] + list(r.EmbarkedVec.toArray()) + 
        list(r.Cabin_letter_vec.toArray()) + 
        list(r.Age_group_vec.toArray())
    )

In [40]:
data_ = df_t.rdd.map(transf)

In [41]:
train, test = data_.randomSplit([0.7, 0.3])

In [42]:
train.cache()

PythonRDD[211] at RDD at PythonRDD.scala:48

In [43]:
def f1(m, test):
    values = test.map(lambda x: x.features)
    yhat = rfc.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    true_pos = comp.map(lambda x: (x[0] == 1 and x[1] == 1)).sum()
    precision = true_pos / yhat.sum()
    recall = true_pos / y.sum()
    f1 = 2 * precision * recall / (precision + recall)
    return f1

def acc(m, test):
    values = test.map(lambda x: x.features)
    yhat = rfc.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

In [44]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, SVMWithSGD
from pyspark.mllib.tree import GradientBoostedTrees

In [45]:
lrc = LogisticRegressionWithLBFGS.train(train)
svm = SVMWithSGD.train(train, regType='l1')
gbt = GradientBoostedTrees.trainClassifier(train, categoricalFeaturesInfo={})

In [46]:
for m in [lrc, svm, gbt]:
    print('Accuracy: {}, F1: {}'.format(acc(m, test), f1(m, test)))

Accuracy: 0.825095057034, F1: 0.762886597938
Accuracy: 0.825095057034, F1: 0.762886597938
Accuracy: 0.825095057034, F1: 0.762886597938


In [47]:
# очень странно что точность у классификаторов одинаковая. 
# Я пробовал убрать добавленные признаки, точность все равно одинаковая, 
# Прбовал увеличить количество итераций - вылетала ошибка