In [1]:
import os

os.environ["PYSPARK_SUBMIT_ARGS"]='--packages com.databricks:spark-csv_2.10:1.1.0 pyspark-shell'
os.environ["SPARK_HOME"]='/home/cheshire/spark/'

import sys

sys.path.append(os.environ.get('SPARK_HOME', None)+"/python")
sys.path.append(os.environ.get('SPARK_HOME', None)+"/python/lib/py4j-0.8.2.1-src.zip")

import py4j
from pyspark import SparkContext,SparkConf,SQLContext

conf = (SparkConf().setMaster("local[2]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "2g")
        .set("spark.cores.max", "2"))

sc = SparkContext(conf=conf)

sqlCtx = SQLContext(sc)

In [2]:
sc.version

u'1.5.0'

In [3]:
df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true').load("/home/cheshire/Documents/spark_demo/titanic_train.csv")

In [4]:
df

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string]

In [5]:
df.show()

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25|     |       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925|     |       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05|     |       S|
|          6|       0|     3|    Moran, Mr. James|  male|   |    0|    0|          33087

In [6]:
df.select('Age').distinct().collect()

[Row(Age=u'70.5'),
 Row(Age=u'36.5'),
 Row(Age=u'50'),
 Row(Age=u'51'),
 Row(Age=u'0.75'),
 Row(Age=u'52'),
 Row(Age=u'53'),
 Row(Age=u'54'),
 Row(Age=u'55.5'),
 Row(Age=u'55'),
 Row(Age=u'56'),
 Row(Age=u'57'),
 Row(Age=u'58'),
 Row(Age=u'40.5'),
 Row(Age=u'59'),
 Row(Age=u'45.5'),
 Row(Age=u'30.5'),
 Row(Age=u'20.5'),
 Row(Age=u'1'),
 Row(Age=u'2'),
 Row(Age=u'3'),
 Row(Age=u'0.83'),
 Row(Age=u'4'),
 Row(Age=u'60'),
 Row(Age=u'5'),
 Row(Age=u'61'),
 Row(Age=u'62'),
 Row(Age=u'6'),
 Row(Age=u'7'),
 Row(Age=u'63'),
 Row(Age=u'8'),
 Row(Age=u'64'),
 Row(Age=u'65'),
 Row(Age=u'9'),
 Row(Age=u'66'),
 Row(Age=u'0.92'),
 Row(Age=u'34.5'),
 Row(Age=u'70'),
 Row(Age=u'71'),
 Row(Age=u'74'),
 Row(Age=u'24.5'),
 Row(Age=u'10'),
 Row(Age=u'11'),
 Row(Age=u'12'),
 Row(Age=u'14.5'),
 Row(Age=u'13'),
 Row(Age=u'14'),
 Row(Age=u'15'),
 Row(Age=u'16'),
 Row(Age=u'17'),
 Row(Age=u'18'),
 Row(Age=u'19'),
 Row(Age=u'80'),
 Row(Age=u'0.42'),
 Row(Age=u'20'),
 Row(Age=u'21'),
 Row(Age=u'22'),
 Row(Age=u'2

In [7]:
df.select('Embarked').distinct().collect()

[Row(Embarked=u'C'), Row(Embarked=u'Q'), Row(Embarked=u'S'), Row(Embarked=u'')]

In [8]:
df.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       0|  549|
|       1|  342|
+--------+-----+



In [9]:
df.registerTempTable("titanic")
sqlCtx.sql('SELECT Survived, count(Survived) as count FROM titanic GROUP BY Survived').show()

+--------+-----+
|Survived|count|
+--------+-----+
|       0|  549|
|       1|  342|
+--------+-----+



In [10]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)
df_t.show()

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|EmbarkedIndex|  EmbarkedVec|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25|     |       S|          0.0|(3,[0],[1.0])|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|          1.0|(3,[1],[1.0])|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925|     |       S|          0.0|(3,[0],[1.0])|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|          0.0|(3,[0],[1.0])|
|     

In [11]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array
import random

In [12]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1.0
    
def transform_row(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            parse_age(r.Age),
            int(r.SibSp),
            int(r.Parch),
            float(r.Fare),
        ] + list(r.EmbarkedVec.toArray())
    )

In [13]:
data = df_t.map(transform_row)

In [14]:
data.take(10)

[LabeledPoint(0.0, [3.0,1.0,22.0,1.0,0.0,7.25,1.0,0.0,0.0]),
 LabeledPoint(1.0, [1.0,0.0,38.0,1.0,0.0,71.2833,0.0,1.0,0.0]),
 LabeledPoint(1.0, [3.0,0.0,26.0,0.0,0.0,7.925,1.0,0.0,0.0]),
 LabeledPoint(1.0, [1.0,0.0,35.0,1.0,0.0,53.1,1.0,0.0,0.0]),
 LabeledPoint(0.0, [3.0,1.0,35.0,0.0,0.0,8.05,1.0,0.0,0.0]),
 LabeledPoint(0.0, [3.0,1.0,-1.0,0.0,0.0,8.4583,0.0,0.0,1.0]),
 LabeledPoint(0.0, [1.0,1.0,54.0,0.0,0.0,51.8625,1.0,0.0,0.0]),
 LabeledPoint(0.0, [3.0,1.0,2.0,3.0,1.0,21.075,1.0,0.0,0.0]),
 LabeledPoint(1.0, [3.0,0.0,27.0,0.0,2.0,11.1333,1.0,0.0,0.0]),
 LabeledPoint(1.0, [2.0,0.0,14.0,1.0,0.0,30.0708,0.0,1.0,0.0])]

In [15]:
(train, test) = data.randomSplit([0.7, 0.3])
train.cache()

PythonRDD[75] at RDD at PythonRDD.scala:43

In [16]:
print train.count(), test.count()

602 289


In [17]:
def evaluate_model(model, test_data):
    example_values = test_data.map(lambda x: x.features)
    example_target = test_data.map(lambda x: x.label)
    
    example_predictions = model.predict(example_values)
    
    classifier_eval = example_target.zip(example_predictions)
    errors = classifier_eval.map(lambda x: abs(x[0]-x[1]))
    
    return 1.0-errors.sum()/errors.count()

In [18]:
from pyspark.mllib.classification import \
  LogisticRegressionWithLBFGS, LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel

In [19]:
model_bf = LogisticRegressionWithLBFGS.train(train)
evaluate_model(model_bf, test)

0.7785467128027681

In [20]:
model_svm = SVMWithSGD.train(train)
evaluate_model(model_svm, test)

0.6262975778546713

In [21]:
model_lrsgd = LogisticRegressionWithSGD.train(train)
evaluate_model(model_lrsgd, test)

0.6885813148788927

In [22]:
model_rf = RandomForest.trainClassifier(train, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=3, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)
evaluate_model(model_rf, test)

0.685121107266436

In [23]:
model_rf2 = RandomForest.trainClassifier(train, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=30, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)
evaluate_model(model_rf2, test)

0.7958477508650519

In [24]:
model_gbt = GradientBoostedTrees.trainClassifier(train,
    categoricalFeaturesInfo={}, numIterations=3)
evaluate_model(model_gbt, test)

0.7889273356401384

In [25]:
model_gbt2 = GradientBoostedTrees.trainClassifier(train,
    categoricalFeaturesInfo={}, numIterations=10, maxDepth=7)
evaluate_model(model_gbt2, test)

0.7854671280276817