### Importing required libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [2]:

scSpark = SparkSession.builder.appName("Spark Assignment 1").getOrCreate()

### Defining schema and loading titanic dataset in a dataframe, also giving header

In [3]:
file = 'titanic.csv'
header = StructType([
    StructField("PassengerId", IntegerType(), nullable=True),
    StructField("Survived", IntegerType(), nullable=True),
    StructField("Pclass", IntegerType(), nullable=True),
    StructField("Name", StringType(), nullable=True),
    StructField("Sex", StringType(), nullable=True),
    StructField("Age", DoubleType(), nullable=True),
    StructField("SibSp", IntegerType(), nullable=True),
    StructField("Parch", IntegerType(), nullable=True),
    StructField("Ticket", StringType(), nullable=True),
    StructField("Fare", DoubleType(), nullable=True),
    StructField("Cabin", StringType(), nullable=True),
    StructField("Embarked", StringType(), nullable=True),
    StructField("Timestamp", StringType(), nullable=True)
])
df_titanic = scSpark.read.csv(file, header= True, schema=header,inferSchema=True)

df_titanic

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string, Timestamp: string]

In [4]:
df_titanic.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|2020-01-01 13:36:30|
|          6|       0|     3|    Moran, Mr. James|  mal

### Droping null value for avoiding nonetype error in UDF for categorical data 

In [5]:
df_titanic = df_titanic.na.drop()

### Defining function for distributing columns according to their datatype and returning results in to a dictionary

In [6]:
def preprocess_fun(dataframe):
    
    numeric_columns = []
    categorical_columns = []
    others = []

    column_types = dataframe.dtypes
    for column_name, column_type in column_types:
        if column_type == 'int' or column_type == 'float' or column_type == 'double':
            numeric_columns.append(column_name)
        elif column_type == 'string':
            categorical_columns.append(column_name)
        else:
            others.append(column_name)
                
    column_types_dic = {
        "numeric_columns" : numeric_columns,
        "categorical_columns" : categorical_columns,
        "others" : others
    }
    
    return column_types_dic
            

In [7]:
column_types_dict = preprocess_fun(df_titanic)

### Calculating min, max, average of numeric columns

In [8]:
min_values = df_titanic.select([min(column).alias(column) for column in column_types_dict['numeric_columns']])
min_values.show()

+-----------+--------+------+---+-----+-----+----+
|PassengerId|Survived|Pclass|Age|SibSp|Parch|Fare|
+-----------+--------+------+---+-----+-----+----+
|          2|       0|     1|0.0|    0|    0| 0.0|
+-----------+--------+------+---+-----+-----+----+



In [9]:
max_values = df_titanic.select([max(column).alias(column) for column in column_types_dict['numeric_columns']])
max_values.show()

+-----------+--------+------+----+-----+-----+--------+
|PassengerId|Survived|Pclass| Age|SibSp|Parch|    Fare|
+-----------+--------+------+----+-----+-----+--------+
|        890|       1|     3|80.0|    3|    4|512.3292|
+-----------+--------+------+----+-----+-----+--------+



In [10]:
avg_values = df_titanic.select([avg(column).alias(column) for column in column_types_dict['numeric_columns']])
avg_values.show()

+-----------------+------------------+------------------+-----------------+------------------+-------------------+-----------------+
|      PassengerId|          Survived|            Pclass|              Age|             SibSp|              Parch|             Fare|
+-----------------+------------------+------------------+-----------------+------------------+-------------------+-----------------+
|455.3661202185792|0.6721311475409836|1.1912568306010929|35.66120218579235|0.4644808743169399|0.47540983606557374|78.68246885245901|
+-----------------+------------------+------------------+-----------------+------------------+-------------------+-----------------+



### defining function for categorical columns to replace last letter of word with 1

In [11]:
def categorical_udf(words):
    text = ''
    for word in words.split():
        text = text + word[:-1] + '1' + " " 
    return text.strip()

categorical_udf = udf(categorical_udf, StringType())


In [12]:
for column in column_types_dict["categorical_columns"]:
    df_titanic = df_titanic.withColumn(column+"_changed", categorical_udf(col(column)))


In [13]:
df_titanic.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked',
 'Timestamp',
 'Name_changed',
 'Sex_changed',
 'Ticket_changed',
 'Cabin_changed',
 'Embarked_changed',
 'Timestamp_changed']

In [14]:

df_titanic.select('Name_changed', 'Sex_changed', 'Ticket_changed', 'Cabin_changed','Embarked_changed','Timestamp_changed').show()

+--------------------+-----------+--------------+-------------+----------------+-------------------+
|        Name_changed|Sex_changed|Ticket_changed|Cabin_changed|Embarked_changed|  Timestamp_changed|
+--------------------+-----------+--------------+-------------+----------------+-------------------+
|Cumings1 Mrs1 Joh...|     femal1|      P1 17591|          C81|               1|2020-01-01 13:44:41|
|Futrelle1 Mrs1 Ja...|     femal1|        113801|         C121|               1|2020-01-01 13:32:01|
|McCarthy1 Mr1 Tim...|       mal1|         17461|          E41|               1|2020-01-01 13:37:31|
|Sandstrom1 Miss1 ...|     femal1|       P1 9541|           G1|               1|2020-01-01 13:32:21|
|Bonnell1 Miss1 El...|     femal1|        113781|         C101|               1|2020-01-01 13:30:11|
|Beesley1 Mr1 Lawr...|       mal1|        248691|          D51|               1|2020-01-01 13:33:31|
|Sloper1 Mr1 Willi...|       mal1|        113781|           A1|               1|2020-01-01 