In [1]:
# Install Spark 3.5.0
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

# Install Findspark
!pip install -q findspark

import findspark
findspark.init()

## creating a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Midterm').getOrCreate()

## Question 1:
Read the data set from Google Drive, and create a new RDD with it. Show the first 20 rows and the schema of the Titanic RDD

In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [5]:

titanic_df = spark.read.csv('/content/gdrive/My Drive/Titanic-Dataset.csv', header=True, inferSchema=True)


titanic_rdd = titanic_df.rdd

In [6]:

print(titanic_rdd.take(20))

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'), Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'), Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'), Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'), Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S'), Row(PassengerId=6, Survived=0, Pclass=3, Name='Moran, Mr. James', Sex='male', Age=

## Extra Credit Opportunity!

Convert the RDD into a Pandas Dataframe.

In [9]:
titanic_df_from_rdd = spark.createDataFrame(titanic_rdd)


titanic_pd_df = titanic_df_from_rdd.toPandas()




Index(['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp',
       'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked'],
      dtype='object')


## Question 2:

`sibsp` is the number of siblings/spouses onboard

`parch` is the number of parents/children onboard

Copy the contents of the `sibsp` to a new one with a clearer name, then re-name the `parch` column without adding a new column. Print the schema and the first 10 rows of the new RDD.

In [12]:
from pyspark.sql.functions import col


titanic_df_from_rdd = titanic_df_from_rdd.withColumn("siblings_spouses_onboard", col("sibsp"))


titanic_df_from_rdd = titanic_df_from_rdd.withColumnRenamed("parch", "parents_children_onboard")


titanic_df_from_rdd.printSchema()


titanic_df_from_rdd.show(10)


root
 |-- PassengerId: long (nullable = true)
 |-- Survived: long (nullable = true)
 |-- Pclass: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: long (nullable = true)
 |-- parents_children_onboard: long (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- siblings_spouses_onboard: long (nullable = true)

+-----------+--------+------+--------------------+------+----+-----+------------------------+----------------+-------+-----+--------+------------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|parents_children_onboard|          Ticket|   Fare|Cabin|Embarked|siblings_spouses_onboard|
+-----------+--------+------+--------------------+------+----+-----+------------------------+----------------+-------+-----+--------+------------------------+
| 

## Extra Credit Opportunity!

Count how many unique families were onboard the Titanic.

In [15]:
from pyspark.sql.functions import split, col


titanic_df_with_family = titanic_df_from_rdd.withColumn("LastName", split(col("Name"), ",")[0]) \
                                             .withColumn("FamilySize", 1 + col("siblings_spouses_onboard") + col("parents_children_onboard"))

unique_families_count = titanic_df_with_family.groupBy("LastName", "FamilySize").count()


number_of_unique_families = unique_families_count.count()

number_of_unique_families




720

## Question 3:

Use Spark SQL to select the ten most expensive fares, and a 2nd query to select the ten least expensive fares.

In [16]:

titanic_df_from_rdd.createOrReplaceTempView("titanic_data")



In [17]:

most_expensive_fares = spark.sql("""
    SELECT Fare, PassengerId, Name
    FROM titanic_data
    ORDER BY Fare DESC
    LIMIT 10
""")


most_expensive_fares.show()



+--------+-----------+--------------------+
|    Fare|PassengerId|                Name|
+--------+-----------+--------------------+
|512.3292|        680|Cardeza, Mr. Thom...|
|512.3292|        259|    Ward, Miss. Anna|
|512.3292|        738|Lesurer, Mr. Gust...|
|   263.0|         89|Fortune, Miss. Ma...|
|   263.0|         28|Fortune, Mr. Char...|
|   263.0|        342|Fortune, Miss. Al...|
|   263.0|        439|   Fortune, Mr. Mark|
| 262.375|        312|Ryerson, Miss. Em...|
| 262.375|        743|"Ryerson, Miss. S...|
|247.5208|        119|Baxter, Mr. Quigg...|
+--------+-----------+--------------------+



In [18]:

least_expensive_fares = spark.sql("""
    SELECT Fare, PassengerId, Name
    FROM titanic_data
    WHERE Fare > 0
    ORDER BY Fare ASC
    LIMIT 10
""")


least_expensive_fares.show()


+------+-----------+--------------------+
|  Fare|PassengerId|                Name|
+------+-----------+--------------------+
|4.0125|        379| Betros, Mr. Tannous|
|   5.0|        873|Carlsson, Mr. Fra...|
|6.2375|        327|Nysveen, Mr. Joha...|
|6.4375|        844|Lemberopolous, Mr...|
|  6.45|        819|Holm, Mr. John Fr...|
|6.4958|        203|Johanson, Mr. Jak...|
|6.4958|        372|Wiklund, Mr. Jako...|
|  6.75|        144| Burke, Mr. Jeremiah|
|  6.75|        655|"Hegarty, Miss. H...|
|6.8583|        412|     Hart, Mr. Henry|
+------+-----------+--------------------+



## Question 4:

Drop any rows with 2 or more null values. Print the counts for the number of rows before and after dropping. (You won't need to use the new RDD with the dropped rows after this question)

In [19]:

before_drop_count = titanic_df_from_rdd.count()
print(f"Number of rows before dropping: {before_drop_count}")


titanic_df_dropped = titanic_df_from_rdd.dropna(thresh=len(titanic_df_from_rdd.columns) - 1)


after_drop_count = titanic_df_dropped.count()
print(f"Number of rows after dropping: {after_drop_count}")


Number of rows before dropping: 891
Number of rows after dropping: 733


## Question 5:

The Titanic sank in 1912, but the Bureau of Labor Statistics didn't keep track of inflation until 1913. Write a UDF to convert the fare from 1913 dollars to today's dollars, and apply it to the `fare` column.

- Hint: Use a library to find the Consumer Price Index (CPI) to adjust for inflation! Don't try to re-invent the wheel!

In [37]:
!pip install cpi



In [40]:
import cpi
cpi.inflate(1, 1913, to=2023)


30.777979797979796

In [41]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType


inflation_rate = 30.777979797979796

def adjust_fare_for_inflation(fare):
    adjusted_fare = fare * inflation_rate
    return adjusted_fare


adjust_fare_udf = udf(adjust_fare_for_inflation, DoubleType())


titanic_df_adjusted_fare = titanic_df_from_rdd.withColumn("AdjustedFare", adjust_fare_udf("Fare"))


titanic_df_adjusted_fare.select("Fare", "AdjustedFare").show(10)


+-------+------------------+
|   Fare|      AdjustedFare|
+-------+------------------+
|   7.25| 223.1403535353535|
|71.2833|2193.9559673333333|
|  7.925|243.91548989898988|
|   53.1|1634.3107272727273|
|   8.05| 247.7627373737374|
| 8.4583| 260.3293865252525|
|51.8625|1596.2229772727271|
| 21.075| 648.6459242424241|
|11.1333|342.66048248484844|
|30.0708| 925.5184749090909|
+-------+------------------+
only showing top 10 rows



## Question 6:

Write an accumulator variables to counts minors (Persons under 18) who were onboard the Titanic

In [42]:
from pyspark import AccumulatorParam


class IntAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return 0

    def addInPlace(self, v1, v2):
        return v1 + v2


minors_count = spark.sparkContext.accumulator(0, IntAccumulatorParam())


def count_minors(row):
    age = row['Age']
    if age is not None and age < 18:
        minors_count.add(1)
    return row


titanic_df_from_rdd.rdd.foreach(count_minors)


print(f"Number of minors onboard: {minors_count.value}")


Number of minors onboard: 113


## Extra Credit Opportunity!

The [Birkenhead drill](https://en.wikipedia.org/wiki/Women_and_children_first) was meant to prioritize the lives of women and children in a life-threatening situation, and was commonplace on navel vessels in 1912.

Find the ratio of women and minors who survived/perished compared to men 18 and older.

In [43]:
from pyspark.sql.functions import when, col


classified_df = titanic_df_from_rdd.withColumn(
    "Group",
    when((col("Sex") == "female") | (col("Age") < 18), "Women_and_Minors")
    .otherwise("Men_18_and_Over")
)


survival_counts = classified_df.groupBy("Group", "Survived").count()


survival_ratio = survival_counts.groupBy("Group").pivot("Survived").sum("count")


survival_ratio = survival_ratio.withColumn(
    "SurvivalRatio",
    col("1") / (col("1") + col("0"))
).select("Group", "SurvivalRatio")

survival_ratio.show()


+----------------+-------------------+
|           Group|      SurvivalRatio|
+----------------+-------------------+
|Women_and_Minors| 0.6881720430107527|
| Men_18_and_Over|0.16570327552986513|
+----------------+-------------------+



## Question 7:

In the `embarked` column, each letter represents where the passenger embarked on their voyage from. Store the below information in a broadcast variable, and add an additional column with the full name of the port using the information in the broadcast variable.

C = Cherbourg
Q = Queenstown
S = Southampton

In [56]:
embarked_mapping = {
    "C": "Cherbourg",
    "Q": "Queenstown",
    "S": "Southampton"
}


broadcast = spark.sparkContext.broadcast(embarked_mapping)


In [63]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def get_full_port_name(code):
    return broadcast.value.get(code, "Unknown")


get_full_port_name_udf = udf(get_full_port_name, StringType())


titanic_df_with_port_name = titanic_df_from_rdd.withColumn("PortFullName", get_full_port_name_udf(col("Embarked")))


titanic_df_with_port_name.select("Embarked", "PortFullName").show()
titanic_df_with_port_name.show()


+--------+------------+
|Embarked|PortFullName|
+--------+------------+
|       S| Southampton|
|       C|   Cherbourg|
|       S| Southampton|
|       S| Southampton|
|       S| Southampton|
|       Q|  Queenstown|
|       S| Southampton|
|       S| Southampton|
|       S| Southampton|
|       C|   Cherbourg|
|       S| Southampton|
|       S| Southampton|
|       S| Southampton|
|       S| Southampton|
|       S| Southampton|
|       S| Southampton|
|       Q|  Queenstown|
|       S| Southampton|
|       S| Southampton|
|       C|   Cherbourg|
+--------+------------+
only showing top 20 rows

+-----------+--------+------+--------------------+------+----+-----+------------------------+----------------+-------+-----+--------+------------------------+------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|parents_children_onboard|          Ticket|   Fare|Cabin|Embarked|siblings_spouses_onboard|PortFullName|
+-----------+--------+------+--------------------+----

## Question 8:

Find the ratio of survived/perished passengers for each Passenger Class or `pclass`

In [67]:
from pyspark.sql.functions import col

survival_counts = titanic_df_with_port_name.groupBy("Pclass", "Survived").count()


pivot_df = survival_counts.groupBy("Pclass").pivot("Survived").sum("count").withColumnRenamed("1", "Survived").withColumnRenamed("0", "Perished")


ratio_df = pivot_df.withColumn("SurvivalRatio", col("Survived") / (col("Perished") + 0.0001))


ratio_df.select("Pclass", "Survived", "Perished", "SurvivalRatio").show()


+------+--------+--------+------------------+
|Pclass|Survived|Perished|     SurvivalRatio|
+------+--------+--------+------------------+
|     1|     136|      80| 1.699997875002656|
|     3|     119|     372|0.3198923871257024|
|     2|      87|      97|0.8969062918491836|
+------+--------+--------+------------------+



## Question 9:

Randomly split the data 70/30 into training/test data, and count the number if rows in each. Then build a Random Forrest Classifier on the features `pclass`,`sex`,`age`,`sibsp`, and `parch`.

In [71]:

train_data, test_data = titanic_df_with_port_name.randomSplit([0.7, 0.3])

train_count = train_data.count()
test_count = test_data.count()

print(f"Number of rows in training data: {train_count}")
print(f"Number of rows in test data: {test_count}")

Number of rows in training data: 595
Number of rows in test data: 296


In [72]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import col


spark = SparkSession.builder.appName("Titanic Random Forest Example").getOrCreate()

median_age = titanic_df_with_port_name.approxQuantile("Age", [0.5], 0.0)[0]
titanic_df_filled = titanic_df_with_port_name.fillna({'Age': median_age, 'Embarked': 'S'})


sex_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndexed")
assembler = VectorAssembler(
    inputCols=["Pclass", "SexIndexed", "Age", "SibSp", "parents_children_onboard"],
    outputCol="features")


rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")


pipeline = Pipeline(stages=[sex_indexer, assembler, rf])


(train_data, test_data) = titanic_df_filled.randomSplit([0.7, 0.3])


model = pipeline.fit(train_data)


predictions = model.transform(test_data)


predictions.select("prediction", "Survived", "features").show(5)




+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       1|[1.0,1.0,35.0,1.0...|
|       1.0|       1|[2.0,1.0,14.0,1.0...|
|       1.0|       1|[3.0,1.0,4.0,1.0,...|
|       1.0|       1|[2.0,1.0,55.0,0.0...|
|       0.0|       0|[3.0,1.0,31.0,1.0...|
+----------+--------+--------------------+
only showing top 5 rows



## Extra Credit Opportunity!

Use the test data to validate the model.

In [73]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator


binary_evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

auc = binary_evaluator.evaluate(predictions)
print(f"Area under ROC curve: {auc}")


Area under ROC curve: 0.8672633495145631


## Question 10:

Show the probability of the following two passengers surviving the Titanic shipwreck:

---

| pclass | name                | sex    | age | sibsp | parch |
|--------|---------------------|--------|-----|-------|-------|
| 1      | Rose Dewitt Bukater | female | 17  | 0     | 1     |
| 3      | Jack Dawson         | male   | 21  | 0     | 0     |

In [75]:
from pyspark.sql import Row


new_passengers = spark.createDataFrame([
    Row(Pclass=1, Name="Rose Dewitt Bukater", Sex="female", Age=17, SibSp=0, parents_children_onboard=1),
    Row(Pclass=3, Name="Jack Dawson", Sex="male", Age=21, SibSp=0, parents_children_onboard=0)
])


new_passengers_transformed = model.transform(new_passengers)


new_passengers_transformed.select("Name", "prediction", "probability").show(truncate=False)


+-------------------+----------+----------------------------------------+
|Name               |prediction|probability                             |
+-------------------+----------+----------------------------------------+
|Rose Dewitt Bukater|1.0       |[0.04339944669657338,0.9566005533034266]|
|Jack Dawson        |0.0       |[0.8819617121468608,0.11803828785313915]|
+-------------------+----------+----------------------------------------+

