# Learning PySpark

Notebook of messing around with PySpark

## Setup

Code below taken from Spark By Examples website to validate installation

In [1]:
# Import PySpark
from pyspark.sql import SparkSession

In [2]:
# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [3]:
# Data
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

# Columns
columns = ["language","users_count"]

In [4]:
# Create DataFrame
df = spark.createDataFrame(data).toDF(*columns)

In [5]:
# Print DataFrame
df.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [6]:
spark.stop()

## Practice

Mess around on Titanic

In [66]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, Imputer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
conf = (
    SparkConf()
    .setAppName("house price practice")
    .setMaster("local")
)

spark = (
    SparkSession
    .builder
    .config(conf=conf)
    .getOrCreate()
)

In [3]:
train_df = spark.read.csv("./data/train.csv", inferSchema=True, header=True)
train_df.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [4]:
train_df.count(), len(train_df.columns)

(891, 12)

In [8]:
# null counts
train_df.agg(
    *[F.count_if(F.isnull(c)).alias(c) for c in train_df.columns]
).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [54]:
train_df2 = (
    train_df
    .withColumn("CabinZone", F.substring("Cabin", 1, 1))
    .withColumn("FamilySize", F.col("SibSp") + F.col("Parch") + 1)
    .withColumn("Title", F.regexp_extract(F.col("Name"), r", (.*?)\. ", 1))
    .withColumn("Title", F.when(F.col("Title").isin(["Mr", "Mrs", "Miss", "Master"]), F.col("Title")).otherwise("Other"))
    .withColumn("Pclass", 
                F.when(F.col("Pclass") == "1", "Upper")
                .when(F.col("Pclass") == "2", "Middle")
                .when(F.col("Pclass") == "3", "Lower"))
    .drop("Cabin", "SibSp", "Parch", "PassengerId", "Ticket", "Name")
)
train_df2.show(10)

+--------+------+------+----+-------+--------+---------+----------+------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|CabinZone|FamilySize| Title|
+--------+------+------+----+-------+--------+---------+----------+------+
|       0| Lower|  male|22.0|   7.25|       S|     NULL|         2|    Mr|
|       1| Upper|female|38.0|71.2833|       C|        C|         2|   Mrs|
|       1| Lower|female|26.0|  7.925|       S|     NULL|         1|  Miss|
|       1| Upper|female|35.0|   53.1|       S|        C|         2|   Mrs|
|       0| Lower|  male|35.0|   8.05|       S|     NULL|         1|    Mr|
|       0| Lower|  male|NULL| 8.4583|       Q|     NULL|         1|    Mr|
|       0| Upper|  male|54.0|51.8625|       S|        E|         1|    Mr|
|       0| Lower|  male| 2.0| 21.075|       S|     NULL|         5|Master|
|       1| Lower|female|27.0|11.1333|       S|     NULL|         3|   Mrs|
|       1|Middle|female|14.0|30.0708|       C|     NULL|         2|   Mrs|
+--------+------+------+-

In [63]:
train_df2.agg(
    *[F.count_if(F.isnull(c)).alias(c) for c in train_df2.columns]
).show()

+--------+------+---+---+----+--------+---------+----------+-----+
|Survived|Pclass|Sex|Age|Fare|Embarked|CabinZone|FamilySize|Title|
+--------+------+---+---+----+--------+---------+----------+-----+
|       0|     0|  0|177|   0|       2|      687|         0|    0|
+--------+------+---+---+----+--------+---------+----------+-----+



In [65]:
train_df2.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- CabinZone: string (nullable = true)
 |-- FamilySize: integer (nullable = true)
 |-- Title: string (nullable = true)



In [69]:
cat_cols = [field for (field, dtype) in train_df2.dtypes if dtype == "string"]
cat_idx_cols = [x + "Index" for x in cat_cols]

string_indexer = StringIndexer(inputCols=cat_cols, outputCols=cat_idx_cols, handleInvalid="keep")
imputer = Imputer(strategy="median", inputCols=["Age"], outputCols=["AgeImputed"])

num_cols = [field for (field, dtype) in train_df2.dtypes if (dtype != "string" and field not in ["Survived", "Age"])]

vec_assembler = VectorAssembler(inputCols=cat_idx_cols + num_cols, 
                                outputCol="features", handleInvalid="keep")
rf = RandomForestClassifier(featuresCol="features", labelCol="Survived")

pipeline = Pipeline(stages=[string_indexer, imputer, vec_assembler, rf])
pipelineModel = pipeline.fit(train_df2)

In [70]:
pred_train = pipelineModel.transform(train_df2)
pred_train.select("features", "Survived", "Prediction").show()

+--------------------+--------+----------+
|            features|Survived|Prediction|
+--------------------+--------+----------+
|(7,[3,5,6],[8.0,7...|       0|       0.0|
|[1.0,1.0,1.0,0.0,...|       1|       1.0|
|[0.0,1.0,0.0,8.0,...|       1|       1.0|
|[1.0,1.0,0.0,0.0,...|       1|       1.0|
|(7,[3,5,6],[8.0,8...|       0|       0.0|
|[0.0,0.0,2.0,8.0,...|       0|       0.0|
|[1.0,0.0,0.0,3.0,...|       0|       0.0|
|[0.0,0.0,0.0,8.0,...|       0|       0.0|
|[0.0,1.0,0.0,8.0,...|       1|       1.0|
|[2.0,1.0,1.0,8.0,...|       1|       1.0|
|[0.0,1.0,0.0,6.0,...|       1|       1.0|
|[1.0,1.0,0.0,0.0,...|       1|       1.0|
|(7,[3,5,6],[8.0,8...|       0|       0.0|
|(7,[3,5,6],[8.0,3...|       0|       0.0|
|[0.0,1.0,0.0,8.0,...|       0|       1.0|
|[2.0,1.0,0.0,8.0,...|       1|       1.0|
|[0.0,0.0,2.0,8.0,...|       0|       0.0|
|[2.0,0.0,0.0,8.0,...|       1|       0.0|
|[0.0,1.0,0.0,8.0,...|       0|       1.0|
|[0.0,1.0,1.0,8.0,...|       1|       1.0|
+----------

In [None]:
classificationEvaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction",
    labelCol="Survived",
    metricName="areaU"
)

In [74]:
spark.stop()