In [1]:
import os

In [2]:
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-8-openjdk-amd64/jre'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

In [3]:
os.environ["SPARK_HOME"] = '/home/dev/spark-2.2.0-bin-hadoop2.7'

In [4]:
import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")

In [5]:
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [6]:
import py4j
from pyspark import SparkContext, SparkConf, SQLContext

In [7]:
conf = (SparkConf().setMaster("local[8]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "2g"))

In [8]:
sc = SparkContext(conf=conf)

In [9]:
sqlcontext = SQLContext(sc)

In [10]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
import numpy as np

In [11]:
from pyspark.ml.classification import GBTClassificationModel

In [24]:
data=[
    LabeledPoint(0.0,[0.0]),
    LabeledPoint(1.0,[1.0]),
    LabeledPoint(3.0,[2.0]),
    LabeledPoint(2.0,[3.0])
]
lrm=LinearRegressionWithSGD.train(sc.parallelize(data),iterations=10,initialWeights=np.array([1.0]))
print(lrm.predict(np.array([1.0])))

0.928638123469


In [25]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('train.csv')

In [26]:
df.head(3)

[Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=u'Braund, Mr. Owen Harris', Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A/5 21171', Fare=u'7.25', Cabin=None, Embarked=u'S'),
 Row(PassengerId=u'2', Survived=u'1', Pclass=u'1', Name=u'Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex=u'female', Age=u'38', SibSp=u'1', Parch=u'0', Ticket=u'PC 17599', Fare=u'71.2833', Cabin=u'C85', Embarked=u'C'),
 Row(PassengerId=u'3', Survived=u'1', Pclass=u'3', Name=u'Heikkinen, Miss. Laina', Sex=u'female', Age=u'26', SibSp=u'0', Parch=u'0', Ticket=u'STON/O2. 3101282', Fare=u'7.925', Cabin=None, Embarked=u'S')]

In [27]:
from pyspark.sql.functions import udf

from pyspark.sql import types



def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''
    

    
# def Pclass_transform(x)

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
df.select('Embarked').distinct().collect()

[Row(Embarked=u'Q'), Row(Embarked=u'C'), Row(Embarked=u'S'), Row(Embarked=u'')]

In [28]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer


stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)


In [29]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector]

In [30]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [31]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
        ] + list(r.EmbarkedVec.toArray())
    )

In [32]:
data = df_t.rdd.map(transf)

In [33]:
train, test = data.randomSplit([0.7, 0.3])

In [34]:
train.cache()

PythonRDD[192] at RDD at PythonRDD.scala:48

In [35]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
rfc = RandomForest.trainClassifier(train, numClasses=2,
                                categoricalFeaturesInfo={},
                                numTrees=100)

In [36]:
def acc(m, test):
    values = test.map(lambda x: x.features)
    yhat = rfc.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

In [37]:
acc(rfc, test)

0.7870036101083032

In [38]:
# добавить 5 новых фичей
# 3 фичи высчитываются из имеющихся
# хотя бы одна использует udf

# попробовать 3 новых модели

# f1 меру

In [39]:
import pandas as pd
df_pd = pd.read_csv('train.csv')
df_pd.head(10)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S
5,6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
6,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
7,8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.075,,S
8,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S
9,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C


In [40]:
from pyspark.sql.functions import udf
from pyspark.sql import types

def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''
    
def Name_transform(x):
    if x != None:
        return len(x)
    else:
        return 0
    
my_udf = udf(Name_transform, types.StringType())
df = df.withColumn('Name', my_udf(df['Name']))
my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
df.select('Embarked').distinct().collect()

[Row(Embarked=u'Q'), Row(Embarked=u'C'), Row(Embarked=u'S'), Row(Embarked=u'')]

In [41]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)


In [42]:
stringIndexer = StringIndexer(inputCol="Pclass", outputCol="PclassIndex")
model = stringIndexer.fit(df_t)
indexed = model.transform(df_t)
encoder = OneHotEncoder(inputCol="PclassIndex", outputCol="PclassVec")
df_t = encoder.transform(indexed)

In [43]:
print df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector, PclassIndex: double, PclassVec: vector]


In [44]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [45]:
def parse_age_sq(str_age):
    try:
        return float(str_age) ** 2
    except:
        return -1

In [46]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
            parse_age_sq(r.Age),
        ] + list(r.EmbarkedVec.toArray()) + list(r.PclassVec.toArray())
    )

In [47]:
data = df_t.rdd.map(transf)

In [48]:
train, test = data.randomSplit([0.7, 0.3])

In [49]:
train.cache()

PythonRDD[263] at RDD at PythonRDD.scala:48

In [50]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
rfc = RandomForest.trainClassifier(train, numClasses=2,
                                categoricalFeaturesInfo={},
                                numTrees=100)

In [51]:
def acc(m, test):
    values = test.map(lambda x: x.features)
    yhat = rfc.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

In [52]:
acc(rfc, test)

0.8076923076923077