# Test ML Pipeline

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import findspark
findspark.init()

In [3]:
import pyspark

In [4]:
cd ..

/Users/cls/Documents/IndependentDataScience/Projects/Telekom/MLPipeline


In [5]:
from mlpipelinefactory import factory

In [6]:
data_path = "/Users/cls/Documents/IndependentDataScience/Projects/point8/data-science-101/notebooks/.assets/data/titanic/titanic.csv"

In [7]:
spark = pyspark.sql.SparkSession \
    .builder \
    .appName("MLPipelineTest") \
    .getOrCreate()


In [8]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

schema = StructType([
             StructField('PassengerId', StringType()),
             StructField('Survived', IntegerType()),
             StructField('Pclass', IntegerType()),
             StructField('Name', StringType()),
             StructField('Sex', StringType()),
             StructField('Age', IntegerType()),
             StructField('SibSp', IntegerType()),
             StructField('Parch', IntegerType()),
             StructField('Ticket', StringType()),
             StructField('Fare', DoubleType()),
             StructField('Cabin', StringType()),
             StructField('Embarked', StringType())
        ])

In [9]:
data = spark.read.format("csv").option("header", "true").schema(schema).load(data_path) 


In [10]:
data.take(5)

[Row(PassengerId='1', Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
 Row(PassengerId='2', Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
 Row(PassengerId='3', Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'),
 Row(PassengerId='4', Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'),
 Row(PassengerId='5', Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S')]

In [11]:
data.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

In [12]:
data.dtypes

[('PassengerId', 'string'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'int'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [13]:
pf = factory.MLPipelineFactory(data=data,
                                    problemType="classification",
                                    target="Survived",
                                    numericCols=[
                                            "Age",
                                            "Fare",
                                            "SibSp",
                                            "Parch"
                                        ],
                                    categoricalCols = [
                                            "Sex",
                                            "Embarked"
                                        ],
                                    categoricalEncoding="index",
                                    algorithm="Random Forest")


In [14]:
(categoricalPipeline, preproPipeline, finalPipeline, completePipeline) = pf.make()

assembling feature vectors, using input columns:  ['Age', 'Fare', 'SibSp', 'Parch', 'Sex_indexed', 'Embarked_indexed']


In [15]:
categoricalPipeline.getStages()

[StringIndexer_46fb9af79f3143bc118f, StringIndexer_49069213b8b8e996f632]

In [16]:
# train-test split
training, test = data.randomSplit([0.8, 0.2])
training = training.cache()

In [17]:
categoricalModel = categoricalPipeline.fit(training)

In [18]:
categoricalModel.transform(training).show()

+-----------+--------+------+--------------------+------+----+-----+-----+-----------------+--------+-------+--------+-----------+----------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|           Ticket|    Fare|  Cabin|Embarked|Sex_indexed|Embarked_indexed|
+-----------+--------+------+--------------------+------+----+-----+-----+-----------------+--------+-------+--------+-----------+----------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|        A/5 21171|    7.25|   null|       S|        0.0|             0.0|
|         10|       1|     2|Nasser, Mrs. Nich...|female|  14|    1|    0|           237736| 30.0708|   null|       C|        1.0|             1.0|
|        100|       0|     2|   Kantor, Mr. Sinai|  male|  34|    1|    0|           244367|    26.0|   null|       S|        0.0|             0.0|
|        101|       0|     3|Petranec, Miss. M...|female|  28|    0|    0|           349245|  7.8958|   null|   

In [19]:
%%time
model = completePipeline.fit(training)



CPU times: user 43.9 ms, sys: 8.91 ms, total: 52.8 ms
Wall time: 3.1 s


In [20]:
test.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|       null|    null|  null|                null|  null|null| null| null|            null|   null| null|    null|
|       null|    null|  null|                null|  null|null| null| null|            null|   null| null|    null|
|       null|    null|  null|                null|  null|null| null| null|            null|   null| null|    null|
|        107|       1|     3|Salkjelsvik, Miss...|female|  21|    0|    0|          343120|   7.65| null|       S|
|        110|       1|     3| Moran, Miss. Bertha|female|null|    1|    0|          371110|  24.15| null|       Q|
|        111|       0|     1|Porter, Mr. Walte...|  male|  47|    0|    0|      

In [21]:
predictions = model.transform(test)


In [22]:
print(predictions.dtypes)

[('features', 'vector'), ('label', 'int'), ('rawPrediction', 'vector'), ('probability', 'vector'), ('prediction', 'double')]


In [23]:
predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[21.0,7.65,0.0,0....|    1|[6.89377552626270...|[0.34468877631313...|       1.0|
|(6,[0,1],[47.0,52...|    0|[14.1859847504173...|[0.70929923752086...|       0.0|
|(6,[0,1],[22.0,8....|    0|[17.2613983579629...|[0.86306991789814...|       0.0|
|[2.0,31.275,4.0,2...|    0|[15.5012820512820...|[0.77506410256410...|       0.0|
|[19.0,26.2833,0.0...|    1|[2.26138981146894...|[0.11306949057344...|       1.0|
|[16.0,7.7333,0.0,...|    1|[4.83533667227300...|[0.24176683361365...|       1.0|
|[55.0,16.0,0.0,0....|    1|[5.20091572114751...|[0.26004578605737...|       1.0|
|[40.0,15.5,1.0,1....|    0|[16.0386661349680...|[0.80193330674840...|       0.0|
|[24.0,13.0,0.0,0....|    0|[4.20846415958976...|[0.21042320797948...|       1.0|
|[2.0,10.4625,0.

In [24]:
factory.evaluateClassifier(completePipeline, data)

Unnamed: 0,label,Precision,Recall,F1
0,0.0,0.924051,0.793478,0.853801
1,1.0,0.660714,0.860465,0.747475


In [25]:
model.stages[-1].featureImportances

SparseVector(6, {0: 0.1549, 1: 0.1637, 2: 0.0556, 3: 0.0681, 4: 0.5358, 5: 0.0219})