In [1]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession \
        .builder \
        .appName("My-App") \
        .getOrCreate()

# Create Dataframe
sdf_train = spark.read.format("csv").option("header","true").load("train.csv")
sdf_test = spark.read.format("csv").option("header","true").load("test.csv")

# createOrReplaceTempView TempView
#sdf_train.createOrReplaceTempView("sdf_train")
#sdf_test.createOrReplaceTempView("sdf_test")

## Fill missing values

In [3]:
from pyspark.sql.functions import col
for i in sdf_test.columns:
    if sdf_train.where(col(i).isNull()).count() or sdf_train.where(col(i).isNull()).count():
        print(f'{i} has {sdf_train.where(col(i).isNull()).count()} N/A out of {sdf_train.count()} at train')
        print(f'{i} has {sdf_test.where(col(i).isNull()).count()} N/A out of {sdf_test.count()} at test')
print(sdf_train.select("Cabin").limit(5).collect())

Age has 177 N/A out of 891 at train
Age has 86 N/A out of 418 at test
Cabin has 687 N/A out of 891 at train
Cabin has 327 N/A out of 418 at test
Embarked has 2 N/A out of 891 at train
Embarked has 0 N/A out of 418 at test
[Row(Cabin=None), Row(Cabin='C85'), Row(Cabin=None), Row(Cabin='C123'), Row(Cabin=None)]


In [4]:
sdf_train.where(col('Cabin').isNull()).count()

687

In [5]:
for i in sdf_test.columns:
    if sdf_train.where(col(i).isNull()).count() or sdf_train.where(col(i).isNull()).count():
        print(f'{i} has {sdf_train.where(col(i).isNull()).count()} N/A out of {sdf_train.count()} at train')
        print(f'{i} has {sdf_test.where(col(i).isNull()).count()} N/A out of {sdf_test.count()} at test')
print(sdf_train.select("Cabin").limit(5).collect())

Age has 177 N/A out of 891 at train
Age has 86 N/A out of 418 at test
Cabin has 687 N/A out of 891 at train
Cabin has 327 N/A out of 418 at test
Embarked has 2 N/A out of 891 at train
Embarked has 0 N/A out of 418 at test
[Row(Cabin=None), Row(Cabin='C85'), Row(Cabin=None), Row(Cabin='C123'), Row(Cabin=None)]


In [6]:
sdf_train = sdf_train.fillna({'Cabin':'None'})
sdf_test = sdf_test.fillna({'Cabin':'None'})
sdf_train.createOrReplaceTempView("sdf_train")
sdf_test.createOrReplaceTempView("sdf_test")

print(sdf_train.select("Cabin").limit(5).collect())

[Row(Cabin='None'), Row(Cabin='C85'), Row(Cabin='None'), Row(Cabin='C123'), Row(Cabin='None')]


In [7]:
for i in sdf_test.columns:
    if sdf_train.where(col(i).isNull()).count() or sdf_train.where(col(i).isNull()).count():
        print(f'{i} has {sdf_train.where(col(i).isNull()).count()} N/A out of {sdf_train.count()} at train')
        print(f'{i} has {sdf_test.where(col(i).isNull()).count()} N/A out of {sdf_test.count()} at test')

Age has 177 N/A out of 891 at train
Age has 86 N/A out of 418 at test
Embarked has 2 N/A out of 891 at train
Embarked has 0 N/A out of 418 at test


## Feature Enginnering

### Name

In [8]:
foo = sdf_train.where(sdf_train.Sex == "female"). \
    where(~sdf_train.Name.rlike('Mrs')).\
    where(~sdf_train.Name.rlike('Miss')).\
    count()

In [9]:
from pyspark.sql.types import *

name_regex = {
#female_regex
    "Name_Mrs": '(Mrs|Mme|Dona|Countess)',
    "Name_Miss": '(Mlle|Miss)',
#male_regex 
    "Name_Mr": '(Don|Mr\.|Jonkheer)',
    "Name_Master": '(Mlle|Miss)',
    "Name_Dr": '(Dr)',
    "Name_Rev": '(Rev)',
    "Name_Soldier": '(Major|Col|Capt)'
}

for new_col in name_regex.keys():
    sdf_train = sdf_train.withColumn(new_col, \
                                     sdf_train. \
                                         Name. \
                                         rlike(name_regex[new_col]). \
                                         cast(IntegerType()))
    sdf_test = sdf_test.withColumn(new_col, \
                                   sdf_test. \
                                       Name. \
                                       rlike(name_regex[new_col]). \
                                       cast(IntegerType()))

sdf_train.createOrReplaceTempView("sdf_train")
sdf_test.createOrReplaceTempView("sdf_test")

## Ticket

In [10]:
from pyspark.sql.functions import split
from pyspark.sql.functions import element_at,log1p


sdf_train = sdf_train.\
        withColumn("tmp", split("Ticket", " ")).\
        withColumn("last_element", element_at(col('tmp'), -1,)).\
        withColumn("Ticket_Num_Log", log1p(col('last_element'))).\
        drop("tmp","last_element")

In [11]:
ticket_regex = {
    "Ticket_PC": '(PC)',
    "Ticket_TON": '(TON)',
    "Ticket_NO_PREFIEX": '(^[0-9])'
}

for new_col in ticket_regex.keys():
    sdf_train = sdf_train.withColumn(new_col, sdf_train["Ticket"].rlike(ticket_regex[new_col]).cast(IntegerType()))
    sdf_test = sdf_test.withColumn(new_col, sdf_test["Ticket"].rlike(ticket_regex[new_col]).cast(IntegerType()))

## Cabin

In [12]:
cabin_regex = {
    "Cabin_A": "^A",
    "Cabin_B": "^B",
    "Cabin_C": "^C",
    "Cabin_D": "^D",
    "Cabin_E": "^E",
    "Cabin_F": "^F",
    "Cabin_G": "^G",
    "Cabin_T": "^T",
    "Cabin_None": "None"
}

for new_col in cabin_regex.keys():
    sdf_train = sdf_train.withColumn(new_col, sdf_train["Cabin"].rlike(cabin_regex[new_col]).cast(IntegerType()))

In [13]:
# Later need one-hot
cols_int_to_str = ["Pclass","SibSp","Parch"]
for cols in cols_int_to_str:
    sdf_train = sdf_train.withColumn(cols, sdf_train[cols].cast("string"))

In [14]:
mylist = sdf_train.columns
mylist.pop(mylist.index("Name"))
mylist.pop(mylist.index("Sex"))
mylist.pop(mylist.index("Embarked"))
mylist.pop(mylist.index("Age"))

'Age'

In [15]:
for i in sdf_test.columns:
    if sdf_train.where(col(i).isNull()).count() or sdf_train.where(col(i).isNull()).count():
        print(f'{i} has {sdf_train.where(col(i).isNull()).count()} N/A out of {sdf_train.count()} at train')
        print(f'{i} has {sdf_test.where(col(i).isNull()).count()} N/A out of {sdf_test.count()} at test')
print(sdf_train.select("Cabin").limit(5).collect())

Age has 177 N/A out of 891 at train
Age has 86 N/A out of 418 at test
Embarked has 2 N/A out of 891 at train
Embarked has 0 N/A out of 418 at test
[Row(Cabin='None'), Row(Cabin='C85'), Row(Cabin='None'), Row(Cabin='C123'), Row(Cabin='None')]


In [16]:
#mylist = sdf_train.columns.pop(11)

for i in mylist:
    if str(type(sdf_train.schema[i].dataType)) == "<class 'pyspark.sql.types.StringType'>":
        #print(i)
        sdf_train = sdf_train.withColumn(i, sdf_train[i].cast("int"))
for i in mylist:
    if str(type(sdf_train.schema[i].dataType)) == "<class 'pyspark.sql.types.StringType'>":
        print(i)   

In [17]:
import pyspark.sql.functions as F

def drop_null_columns(df):
    """
    This function drops all columns which contain null values.
    :param df: A PySpark DataFrame
    """
    null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in null_counts.items() if v > 0]
    df = df.drop(*to_drop)
    return df
           
sdf_train = drop_null_columns(sdf_train)

In [18]:
for i in sdf_train.columns:
    if sdf_train.where(col(i).isNull()).count() :
        print(f'{i} has {sdf_train.where(col(i).isNull()).count()} N/A out of {sdf_train.count()} at train')
        #print(f'{i} has {sdf_test.where(col(i).isNull()).count()} N/A out of {sdf_test.count()} at test')

sdf_train = sdf_train.dropna()

In [None]:
sdf_train.dropna().count()

In [None]:
(traindf, testdf) = sdf_train.randomSplit([0.7,0.3])

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
genderIndexer = StringIndexer(inputCol="Sex", outputCol="indexedSex")
PclassIndexer = StringIndexer(inputCol="Pclass", outputCol="indexedPclass")
#genderIndexer = StringIndexer(inputCol="SibSp", outputCol="indexedSibSp")
embarkIndexer = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked")
 
surviveIndexer = StringIndexer(inputCol="Survived", outputCol="indexedSurvived")
 
# One Hot Encoder on indexed features
genderEncoder = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec")
PclassEncoder = OneHotEncoder(inputCol="indexedPclass", outputCol="PclassVec")
embarkEncoder = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec")


# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["Pclass","sexVec","Age","SibSp","Fare","embarkedVec"],outputCol="features")

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")
 
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[surviveIndexer, genderIndexer, PclassIndexer, embarkIndexer, PclassEncoder,embarkEncoder,genderEncoder, assembler, rf]) # genderIndexer,embarkIndexer,genderEncoder,embarkEncoder,
 

model = pipeline.fit(traindf)


## References

  - <https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.SparkSession>
  - <https://creativedata.atlassian.net/wiki/spaces/SAP/pages/83237142/Pyspark+-+Tutorial+based+on+Titanic+Dataset>