In [1]:
! which python

/Users/benjamin/Documents/Quantmetry_Missions/RetD/pipeasy-spark/.venv/bin/python


In [2]:
# Creating a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('titanic').getOrCreate()

In [3]:
spark

### Loading data and exploratory analysis

In [4]:
# Reading data
df = spark.read.csv('./datasets/titanic.csv', header=True, inferSchema=True, sep='\t')
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [5]:
df.count()

156

In [6]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [7]:
# Counting missing values
from pyspark.sql.functions import col, sum

df.select(*(sum(col(c).isNull().cast('int')).alias(c) for c in df.columns)).toPandas().melt()

Unnamed: 0,variable,value
0,PassengerId,0
1,Survived,0
2,Pclass,0
3,Name,0
4,Sex,0
5,Age,30
6,SibSp,0
7,Parch,0
8,Ticket,0
9,Fare,0


In [8]:
# Count of unique values
from pyspark.sql.functions import col, countDistinct

df.select(*(countDistinct(col(c)).alias(c) for c in df.columns)).toPandas().melt()

Unnamed: 0,variable,value
0,PassengerId,156
1,Survived,2
2,Pclass,3
3,Name,156
4,Sex,2
5,Age,56
6,SibSp,6
7,Parch,5
8,Ticket,145
9,Fare,93


In [9]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import seaborn as sns

num_columns = [item[0] for item in df.drop(
    'PassengerId').dtypes if not item[1] == 'string']
corr_data = VectorAssembler(inputCols=num_columns, outputCol='features').transform(
    df.dropna()).select('features')

corr_mat = Correlation.corr(corr_data, 'features').toPandas()
corr_mat = pd.DataFrame(corr_mat['pearson(features)'][0].toArray().astype(float),
                        columns=num_columns,
                        index=num_columns)
cmap = sns.diverging_palette(220, 10, as_cmap=True)
sns.heatmap(corr_mat, cmap=cmap)

<matplotlib.axes._subplots.AxesSubplot at 0x120cad0b8>

### Limited example: output from dataframe-mapper (categorical & numerical transformations)

In [10]:
import pipeasy_spark as ppz
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler

df_pipe = df.drop('PassengerId', 'Name', 'Ticket', 'Cabin').dropna()
ppz_pipeline= ppz.build_pipeline_by_dtypes(
    df_pipe,
    exclude_columns=['Survived'],
    string_transformers=[StringIndexer(), OneHotEncoderEstimator()],
    numeric_transformers=[VectorAssembler(), StandardScaler()])
ppz_model = ppz_pipeline.fit(df_pipe)
df_pipe_out = ppz_model.transform(df_pipe)
df_pipe_out.limit(5).toPandas()

Unnamed: 0,Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Embarked
0,0,[3.723075040654779],(1.0),[1.5021431833972851],[0.8813549847826144],[0.0],[0.1744835715812996],"(1.0, 0.0)"
1,1,[1.241025013551593],(0.0),[2.5946109531407653],[0.8813549847826144],[0.0],[1.7155537624967245],"(0.0, 1.0)"
2,1,[3.723075040654779],(0.0),[1.7752601258331553],[0.0],[0.0],[0.1907285937630068],"(1.0, 0.0)"
3,1,[1.241025013551593],(0.0),[2.389773246313863],[0.8813549847826144],[0.0],[1.2779417449609667],"(1.0, 0.0)"
4,0,[3.723075040654779],(1.0),[2.389773246313863],[0.0],[0.0],[0.1937369312040637],"(1.0, 0.0)"


### Full example: Training a random forest and defining the full pipeline (with ML)

In [11]:
# Random split into train/test
train, test = df_pipe.randomSplit([.7,.3], seed=42)
print(train.count())
print(test.count())

99
26


In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

preprocessing_pipeline = Pipeline(stages=[
    ppz_pipeline,
    # all the features are assembled in a single column
    VectorAssembler(inputCols=['Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked'],
                    outputCol='features')
])
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="Survived", 
                            featuresCol="features", 
                            numTrees=5, 
                            seed=42)

# Chain ppz_pipeline, rf and reverse label indexer
pipeline = Pipeline(stages=[preprocessing_pipeline, rf])

# Train model.
model = pipeline.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.select("Survived", "prediction", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = BinaryClassificationEvaluator(
    labelCol="Survived", rawPredictionCol="prediction", metricName="areaUnderROC")
AUROC = evaluator.evaluate(predictions)
print("Test AUROC = {0}".format(AUROC))

+--------+----------+--------------------+
|Survived|prediction|            features|
+--------+----------+--------------------+
|       0|       0.0|[1.28430485724300...|
|       0|       0.0|[1.28430485724300...|
|       0|       0.0|[1.28430485724300...|
|       0|       0.0|[1.28430485724300...|
|       0|       0.0|[1.28430485724300...|
+--------+----------+--------------------+
only showing top 5 rows

Test AUROC = 0.8
