In [1]:
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder.appName("Titanic-RDD").getOrCreate()
spark.sparkContext.setLogLevel('WARN')

In [3]:
data = spark.read.option("header", True).csv('titanic.csv').rdd

In [4]:
data.take(5)

[Row(PassengerId='1', Survived='0', Pclass='3', Name='Braund, Mr. Owen Harris', Sex='male', Age='22', SibSp='1', Parch='0', Ticket='A/5 21171', Fare='7.25', Cabin=None, Embarked='S'),
 Row(PassengerId='2', Survived='1', Pclass='1', Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age='38', SibSp='1', Parch='0', Ticket='PC 17599', Fare='71.2833', Cabin='C85', Embarked='C'),
 Row(PassengerId='3', Survived='1', Pclass='3', Name='Heikkinen, Miss. Laina', Sex='female', Age='26', SibSp='0', Parch='0', Ticket='STON/O2. 3101282', Fare='7.925', Cabin=None, Embarked='S'),
 Row(PassengerId='4', Survived='1', Pclass='1', Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age='35', SibSp='1', Parch='0', Ticket='113803', Fare='53.1', Cabin='C123', Embarked='S'),
 Row(PassengerId='5', Survived='0', Pclass='3', Name='Allen, Mr. William Henry', Sex='male', Age='35', SibSp='0', Parch='0', Ticket='373450', Fare='8.05', Cabin=None, Embarked='S')]

In [5]:
data.filter(lambda row: row.Survived == '1').take(5)

[Row(PassengerId='2', Survived='1', Pclass='1', Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age='38', SibSp='1', Parch='0', Ticket='PC 17599', Fare='71.2833', Cabin='C85', Embarked='C'),
 Row(PassengerId='3', Survived='1', Pclass='3', Name='Heikkinen, Miss. Laina', Sex='female', Age='26', SibSp='0', Parch='0', Ticket='STON/O2. 3101282', Fare='7.925', Cabin=None, Embarked='S'),
 Row(PassengerId='4', Survived='1', Pclass='1', Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age='35', SibSp='1', Parch='0', Ticket='113803', Fare='53.1', Cabin='C123', Embarked='S'),
 Row(PassengerId='9', Survived='1', Pclass='3', Name='Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)', Sex='female', Age='27', SibSp='0', Parch='2', Ticket='347742', Fare='11.1333', Cabin=None, Embarked='S'),
 Row(PassengerId='10', Survived='1', Pclass='2', Name='Nasser, Mrs. Nicholas (Adele Achem)', Sex='female', Age='14', SibSp='1', Parch='0', Ticket='237736', Fare='30.0708', 

In [6]:
def mean_age(survived):
    return data.filter(lambda row: row.Survived == survived) \
            .filter(lambda row: row.Age is not None) \
            .map(lambda row: float(row.Age)).mean()

In [7]:
mean_age('0'), mean_age('1')

(30.62617924528302, 28.34368965517242)

In [8]:
def calc_mean(rows):
    cur_sum = 0.
    cur_count = 0
    for row in rows:
        cur_sum += float(row.Age)
        cur_count += 1
    return cur_sum / cur_count

data.filter(lambda row: row.Age is not None) \
    .groupBy(lambda row: row.Survived) \
    .mapValues(calc_mean) \
    .collect()

[('0', 30.62617924528302), ('1', 28.343689655172415)]

In [9]:
from pyspark.sql.types import Row

def add_column(row):
    d = row.asDict()
    d['SurvivedBool'] = False if d['Survived'] == '0' else True
    del d['Survived']
    return Row(**d)

data.map(add_column).take(5)

[Row(PassengerId='1', Pclass='3', Name='Braund, Mr. Owen Harris', Sex='male', Age='22', SibSp='1', Parch='0', Ticket='A/5 21171', Fare='7.25', Cabin=None, Embarked='S', SurvivedBool=False),
 Row(PassengerId='2', Pclass='1', Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age='38', SibSp='1', Parch='0', Ticket='PC 17599', Fare='71.2833', Cabin='C85', Embarked='C', SurvivedBool=True),
 Row(PassengerId='3', Pclass='3', Name='Heikkinen, Miss. Laina', Sex='female', Age='26', SibSp='0', Parch='0', Ticket='STON/O2. 3101282', Fare='7.925', Cabin=None, Embarked='S', SurvivedBool=True),
 Row(PassengerId='4', Pclass='1', Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age='35', SibSp='1', Parch='0', Ticket='113803', Fare='53.1', Cabin='C123', Embarked='S', SurvivedBool=True),
 Row(PassengerId='5', Pclass='3', Name='Allen, Mr. William Henry', Sex='male', Age='35', SibSp='0', Parch='0', Ticket='373450', Fare='8.05', Cabin=None, Embarked='S', SurvivedBool

In [10]:
def expand_row(row, column_name, enum_vals):
    d = row.asDict()
    
    for cur_val in enum_vals:
        cur_name = f'{column_name}_{cur_val}'
        is_eq = d[column_name] == cur_val
        d[cur_name] = is_eq
    
    del d[column_name]
    return Row(**d)

def expand_enum_column(data, column_name):
    enum_vals = data.map(lambda row: row.asDict()[column_name]).distinct().collect()
    return data.map(lambda row: expand_row(row, column_name, enum_vals))

In [11]:
expand_enum_column(data, 'Survived').take(3)

[Row(PassengerId='1', Pclass='3', Name='Braund, Mr. Owen Harris', Sex='male', Age='22', SibSp='1', Parch='0', Ticket='A/5 21171', Fare='7.25', Cabin=None, Embarked='S', Survived_0=True, Survived_1=False),
 Row(PassengerId='2', Pclass='1', Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age='38', SibSp='1', Parch='0', Ticket='PC 17599', Fare='71.2833', Cabin='C85', Embarked='C', Survived_0=False, Survived_1=True),
 Row(PassengerId='3', Pclass='3', Name='Heikkinen, Miss. Laina', Sex='female', Age='26', SibSp='0', Parch='0', Ticket='STON/O2. 3101282', Fare='7.925', Cabin=None, Embarked='S', Survived_0=False, Survived_1=True)]

In [12]:
expand_enum_column(expand_enum_column(data, 'Survived'), 'Pclass').take(3)

[Row(PassengerId='1', Name='Braund, Mr. Owen Harris', Sex='male', Age='22', SibSp='1', Parch='0', Ticket='A/5 21171', Fare='7.25', Cabin=None, Embarked='S', Survived_0=True, Survived_1=False, Pclass_3=True, Pclass_1=False, Pclass_2=False),
 Row(PassengerId='2', Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age='38', SibSp='1', Parch='0', Ticket='PC 17599', Fare='71.2833', Cabin='C85', Embarked='C', Survived_0=False, Survived_1=True, Pclass_3=False, Pclass_1=True, Pclass_2=False),
 Row(PassengerId='3', Name='Heikkinen, Miss. Laina', Sex='female', Age='26', SibSp='0', Parch='0', Ticket='STON/O2. 3101282', Fare='7.925', Cabin=None, Embarked='S', Survived_0=False, Survived_1=True, Pclass_3=True, Pclass_1=False, Pclass_2=False)]

In [13]:
ports = spark.sparkContext.parallelize([
    ('C', {'PortName': 'Cherbourg'}),
    ('Q', {'PortName': 'Queenstown'}),
    ('S', {'PortName': 'Southampton'})
])

def prepare_for_join(row):
    d = row.asDict()
    port = d['Embarked']
    del d['Embarked']
    return port, d
    
def process_after_join(row):
    passenger = row[0]
    port = row[1]
    return Row(**dict(passenger, **port))

data.map(prepare_for_join).join(ports) \
    .map(lambda kv: process_after_join(kv[1])) \
    .take(5)

[Row(PassengerId='2', Survived='1', Pclass='1', Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age='38', SibSp='1', Parch='0', Ticket='PC 17599', Fare='71.2833', Cabin='C85', PortName='Cherbourg'),
 Row(PassengerId='10', Survived='1', Pclass='2', Name='Nasser, Mrs. Nicholas (Adele Achem)', Sex='female', Age='14', SibSp='1', Parch='0', Ticket='237736', Fare='30.0708', Cabin=None, PortName='Cherbourg'),
 Row(PassengerId='20', Survived='1', Pclass='3', Name='Masselmani, Mrs. Fatima', Sex='female', Age=None, SibSp='0', Parch='0', Ticket='2649', Fare='7.225', Cabin=None, PortName='Cherbourg'),
 Row(PassengerId='27', Survived='0', Pclass='3', Name='Emir, Mr. Farred Chehab', Sex='male', Age=None, SibSp='0', Parch='0', Ticket='2631', Fare='7.225', Cabin=None, PortName='Cherbourg'),
 Row(PassengerId='31', Survived='0', Pclass='1', Name='Uruchurtu, Don. Manuel E', Sex='male', Age='40', SibSp='0', Parch='0', Ticket='PC 17601', Fare='27.7208', Cabin=None, PortName='Cherb