In [1]:
from pyspark import SparkContext
sc = SparkContext("local[*]")
train_rdd = sc.textFile("./input/train.csv")
test_rdd = sc.textFile("./input/test.csv")
train_rdd.take(3)

['PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked',
 '1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S',
 '2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C']

In [2]:
# Parse RDD to DF
def parseTrain(rdd):
    # step 1
    header = rdd.first()
    body = rdd.filter(lambda r: r!=header)
    
    # prepare for step 2
    def parseRow(row):
        row_list = row.replace('"','').split(',')
        row_tuple = tuple(row_list)
        return row_tuple
    
    # step2
    rdd_parsed = body.map(parseRow)
 
    # step 3
    colnames = header.split(',')
    colnames.insert(3,'FirstName')
 
    return rdd_parsed.toDF(colnames)

def parseTest(rdd):
    # step 1
    header = rdd.first()
    body = rdd.filter(lambda r: r!=header)
 
    def parseRow(row):
        row_list = row.replace('"','').split(',')
        row_tuple = tuple(row_list)
        return row_tuple
    
    # step2
    rdd_parsed = body.map(parseRow)
    
    # step 3
    colnames = header.split(',')
    colnames.insert(2,'FirstName')
    
    return rdd_parsed.toDF(colnames)

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

train_df = parseTrain(train_rdd)
test_df = parseTest(test_rdd)

train_df.show(5)

+-----------+--------+------+---------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|FirstName|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+---------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|   Braund|     Mr. Owen Harris|  male| 22|    1|    0|       A/5 21171|   7.25|     |       S|
|          2|       1|     1|  Cumings| Mrs. John Bradle...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen|         Miss. Laina|female| 26|    0|    0|STON/O2. 3101282|  7.925|     |       S|
|          4|       1|     1| Futrelle| Mrs. Jacques Hea...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|    Allen|   Mr. William Henry|  male| 35|    0|    0|          373450|   8.05|     |       S|
+-------

In [5]:
test_df.show(5)

+-----------+------+---------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Pclass|FirstName|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+------+---------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|     3|    Kelly|           Mr. James|  male|34.5|    0|    0| 330911| 7.8292|     |       Q|
|        893|     3|   Wilkes| Mrs. James (Elle...|female|  47|    1|    0| 363272|      7|     |       S|
|        894|     2|    Myles|  Mr. Thomas Francis|  male|  62|    0|    0| 240276| 9.6875|     |       Q|
|        895|     3|     Wirz|          Mr. Albert|  male|  27|    0|    0| 315154| 8.6625|     |       S|
|        896|     3| Hirvonen| Mrs. Alexander (...|female|  22|    1|    1|3101298|12.2875|     |       S|
+-----------+------+---------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



In [11]:
## Add Survived column to test
from pyspark.sql.functions import lit, col
train_df = train_df.withColumn('Mark',lit('train')) #新增一欄Mark,分成train 跟test 兩種型態
test_df = (test_df.withColumn('Survived',lit(0)).withColumn('Mark',lit('test')))

test_df = test_df[train_df.columns]#這句話的意思？
## Append Test data to Train data
df = train_df.unionAll(test_df)
df.show(5)

+-----------+--------+------+---------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-----+
|PassengerId|Survived|Pclass|FirstName|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Mark|
+-----------+--------+------+---------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-----+
|          1|       0|     3|   Braund|     Mr. Owen Harris|  male| 22|    1|    0|       A/5 21171|   7.25|     |       S|train|
|          2|       1|     1|  Cumings| Mrs. John Bradle...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|train|
|          3|       1|     3|Heikkinen|         Miss. Laina|female| 26|    0|    0|STON/O2. 3101282|  7.925|     |       S|train|
|          4|       1|     1| Futrelle| Mrs. Jacques Hea...|female| 35|    1|    0|          113803|   53.1| C123|       S|train|
|          5|       0|     3|    Allen|   Mr. William Henry|  male| 35|    0|    0|       

In [12]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Mark: string (nullable = false)



In [13]:
#transform it into “feature vectors”.將其中五個型態轉為double
df = (df.withColumn('Age',df['Age'].cast("double"))\
      .withColumn('SibSp',df['SibSp'].cast("double")) \
      .withColumn('Parch',df['Parch'].cast("double")) \
      .withColumn('Fare',df['Fare'].cast("double")) \
      .withColumn('Survived',df['Survived'].cast("double")) \
)
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: double (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Mark: string (nullable = false)



In [23]:
#針對這五個variable,檢查有哪些數字是missing的
numVars = ['Survived','Age','SibSp','Parch','Fare','Sex']
def countNull(df,var):
    return df.where(df[var].isNull()).count()
 
missing = {var: countNull(df,var) for var in numVars}
print(missing)

{'Survived': 0, 'Age': 0, 'SibSp': 0, 'Parch': 0, 'Fare': 0, 'Sex': 0, 'Embarked': 0}


In [15]:
#missing 的數字填上平均值
age_mean = df.groupBy().mean('Age').first()[0]
fare_mean = df.groupBy().mean('Fare').first()[0]
df = df.na.fill({'Age':age_mean,'Fare':fare_mean})

In [16]:
#取得名字跟title

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
## created user defined function to extract title
getTitle = udf(lambda name: name.split('.')[0].strip(),StringType())
df = df.withColumn('Title', getTitle(df['Name']))
 
df.select('Name','Title').show(3)

+--------------------+-----+
|                Name|Title|
+--------------------+-----+
|     Mr. Owen Harris|   Mr|
| Mrs. John Bradle...|  Mrs|
|         Miss. Laina| Miss|
+--------------------+-----+
only showing top 3 rows



In [20]:
catVars = ['Pclass','Sex','Embarked','Title']

## index Sex variable
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_indexed')
df_indexed = si.fit(df).transform(df).drop('Sex').withColumnRenamed('Sex_indexed','Sex')

df_indexed.printSchema()
df_indexed.select('sex').show(5)
 


root
 |-- PassengerId: string (nullable = true)
 |-- Survived: double (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = false)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Mark: string (nullable = false)
 |-- Title: string (nullable = true)
 |-- Sex: double (nullable = true)

+---+
|sex|
+---+
|0.0|
|1.0|
|1.0|
|1.0|
|0.0|
+---+
only showing top 5 rows



In [22]:
## make use of pipeline to index all categorical variables
def indexer(df,col):
    si = StringIndexer(inputCol = col, outputCol = col+'_indexed').fit(df)
    return si
 
indexers = [indexer(df,col) for col in catVars]
 
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = indexers)
df_indexed = pipeline.fit(df).transform(df)
 
df_indexed.select('Embarked','Embarked_indexed').show(4)#Embark有空字串會怎麼處理


+--------+----------------+
|Embarked|Embarked_indexed|
+--------+----------------+
|       S|             0.0|
|       C|             1.0|
|       S|             0.0|
|       S|             0.0|
+--------+----------------+
only showing top 4 rows

