In [1]:
import os

os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

os.environ["SPARK_HOME"] = '/home/act10nz/contest/spark-2.2.0-bin-hadoop2.7/'

import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")

sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

import py4j
from pyspark import SparkContext, SparkConf, SQLContext

conf = (SparkConf().setMaster("local[8]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "2g"))

sc = SparkContext(conf=conf)

sqlcontext = SQLContext(sc)

from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
import numpy as np


In [2]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('train.csv')

In [3]:
from pyspark.sql.functions import udf

from pyspark.sql import types



def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
df.select('Embarked').distinct().collect()

[Row(Embarked=u'Q'), Row(Embarked=u'C'), Row(Embarked=u'S'), Row(Embarked=u'')]

In [4]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)


In [5]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [6]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age)
            #parse_age(r.Age)*float(r.Fare)# new feature
        ] + list(r.EmbarkedVec.toArray())
    )

In [7]:
data = df_t.rdd.map(transf)

In [8]:
data.collect()[15]

LabeledPoint(1.0, [2.0,0.0,16.0,0.0,0.0,55.0,1.0,0.0,0.0])

In [9]:
train, test = data.randomSplit([0.7, 0.3])

In [10]:
train.cache()

PythonRDD[39] at RDD at PythonRDD.scala:48

In [11]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
rfc = RandomForest.trainClassifier(train, numClasses=2,
                             categoricalFeaturesInfo={},
                            numTrees=100)

In [12]:
def acc(m, test):
    values = test.map(lambda x: x.features)
    yhat = m.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

In [13]:
acc(rfc, test)

0.8231292517006803

In [14]:
# добавить 5 новых фичей
# 3 фичи высчитываются из имеющихся
# хотя бы одна использует udf

# попробовать 3 новых модели

# f1 меру

### Фича с udf который miss или мистер

In [15]:
def name_transform(x):
    if "Mrs." in x:
        return "Mrs"
    elif "Mr." in x:
        return "Mr"
    else:
        return ' ' 

my_udf = udf(name_transform, types.StringType())
df_t = df_t.withColumn('Name', my_udf(df['Name']))
df_t.select('Name').distinct().collect()

[Row(Name=u'Mr'), Row(Name=u'Mrs'), Row(Name=u' ')]

In [16]:
stringIndexer = StringIndexer(inputCol="Name", outputCol="NameIndex")
model = stringIndexer.fit(df_t)
indexed = model.transform(df_t)

encoder = OneHotEncoder(inputCol="NameIndex", outputCol="NameVec")
df_t = encoder.transform(indexed)

In [17]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
            parse_age(r.Age)*float(r.Fare),# new feature
            parse_age(r.Age)**2,
            float(r.Fare)**2
        ] + list(r.EmbarkedVec.toArray()) + 
        list(r.NameVec.toArray())   # 2 new features 
    )

### Треним разные модели

In [18]:
data = df_t.rdd.map(transf)

train, test = data.randomSplit([0.7, 0.3])

In [19]:
from pyspark.mllib.classification import SVMWithSGD #1
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel #2
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel #3
from pyspark.mllib.evaluation import MulticlassMetrics
m1=SVMWithSGD.train(train,validateData=test, intercept=True)
m2= GradientBoostedTrees.trainClassifier(train,{},maxDepth=3)
m3=DecisionTree.trainClassifier(train,2,{},maxDepth=10)

## Считаем для них fMeasure

In [20]:
from pyspark.mllib.evaluation import MulticlassMetrics
def fm(m, test):
    values = test.map(lambda x: x.features)
    yhat = m.predict(values)
    yhat=yhat.map(lambda x: float(x))
    y = test.map(lambda x: float(x.label))
    comp = yhat.zip(y)
    metrics = MulticlassMetrics(comp)
    return metrics.fMeasure()

In [25]:
print("m1_f ", fm(m1, test))
print("m2_f ", fm(m2, test))
print("m3_f ", fm(m3, test))

('m1_f ', 0.5691056910569106)
('m2_f ', 0.8333333333333334)
('m3_f ', 0.7723577235772358)
