In [1]:
import string
import pyspark
from IPython.display import clear_output, display
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    col,
    max,
    min,
    udf,
)
from pyspark.sql.types import StringType

In [2]:
spark = SparkSession.builder.appName("TitanicData").getOrCreate()
# Read the CSV file into a DataFrame
df_pyspark = spark.read.option("header", "true").option("inferSchema", "true").option("header", "false").csv("data/titanic.csv")
# Define the column names of the Titanic dataset
column_names = ['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked','Timestamp']

23/05/19 08:24:34 WARN Utils: Your hostname, all-MS-7D35 resolves to a loopback address: 127.0.1.1; using 192.168.1.142 instead (on interface enp2s0)
23/05/19 08:24:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/19 08:24:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Assign the column names to the DataFrame
df_pyspark = df_pyspark.toDF(*column_names)
# Display the DataFrame schema
df_pyspark.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



In [4]:
# Print the column names
df_pyspark.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked',
 'Timestamp']

In [6]:
df_pyspark.select("Name").show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



In [7]:
num_cols = [col for col, dtype in df_pyspark.dtypes if dtype in ['int', 'double']]
num_stats = df_pyspark.select(num_cols).describe().drop("Survived")
num_stats.show()

23/05/19 08:27:41 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-----------------+------------------+------------------+------------------+-------------------+-----------------+
|summary|      PassengerId|            Pclass|               Age|             SibSp|              Parch|             Fare|
+-------+-----------------+------------------+------------------+------------------+-------------------+-----------------+
|  count|              891|               891|               714|               891|                891|              891|
|   mean|            446.0| 2.308641975308642|29.679271708683473|0.5230078563411896|0.38159371492704824| 32.2042079685746|
| stddev|257.3538420152301|0.8360712409770491|14.536482769437564|1.1027434322934315| 0.8060572211299488|49.69342859718089|
|    min|                1|                 1|                 0|                 0|                  0|              0.0|
|    max|              891|                 3|                80|                 8|                  6|         512.3292|
+-------+-------

In [8]:
numeric_columns = df_pyspark.select("PassengerId", "Pclass", "Age", "Fare", "SibSp", "Parch", "Embarked").columns
min_values = df_pyspark.select(numeric_columns).agg(*(min(c).alias(c) for c in numeric_columns)).collect()[0]
max_values = df_pyspark.select(numeric_columns).agg(*(max(c).alias(c) for c in numeric_columns)).collect()[0]
avg_values = df_pyspark.select(numeric_columns).agg(*(avg(c).alias(c) for c in numeric_columns)).collect()[0]
print("Minimum values:")
print(min_values)
print("Maximum values:")
print(max_values)
print("Average values:")
print(avg_values)

Minimum values:
Row(PassengerId=1, Pclass=1, Age=0, Fare=0.0, SibSp=0, Parch=0, Embarked='C')
Maximum values:
Row(PassengerId=891, Pclass=3, Age=80, Fare=512.3292, SibSp=8, Parch=6, Embarked='S')
Average values:
Row(PassengerId=446.0, Pclass=2.308641975308642, Age=29.679271708683473, Fare=32.2042079685746, SibSp=0.5230078563411896, Parch=0.38159371492704824, Embarked=None)


In [11]:
str_columns = ["Sex", "Pclass", "Survived", "Embarked"]

def change_last_letter_after_space(word):
    if word is not None:
        words = word.split()
        for i in range(len(words)):
            words[i] = words[i][:-1] + "1"
        return " ".join(words)
    return word

change_last_letter_udf = udf(change_last_letter_after_space, StringType())

for column in str_columns:
    if column == "Pclass" or column == "Survived":
        df_pyspark = df_pyspark.withColumn(column, col(column).cast(StringType()))
    df_pyspark = df_pyspark.withColumn(column, change_last_letter_udf(col(column)))

df_pyspark.show()





+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------------+
|          1|       1|     1|Braund, Mr. Owen ...|  mal1|  22|    1|    0|       A/5 21171|   7.25| null|       1|2020-01-01 13:45:25|
|          2|       1|     1|Cumings, Mrs. Joh...|femal1|  38|    1|    0|        PC 17599|71.2833|  C85|       1|2020-01-01 13:44:48|
|          3|       1|     1|Heikkinen, Miss. ...|femal1|  26|    0|    0|STON/O2. 3101282|  7.925| null|       1|2020-01-01 13:38:11|
|          4|       1|     1|Futrelle, Mrs. Ja...|femal1|  35|    1|    0|          113803|   53.1| C123|       1|2020-01-01 13:32:00|
|          5|       1|     1|Allen, Mr. Willia...|  mal

In [22]:
# Sort DataFrame by the first column
sorted_df = df.orderBy(df.columns[0])
# Save the sorted DataFrame to Parquet file
sorted_df.write.parquet("./sorted_titanic/titanic.parquet")