# Spark Data Analysis

This tutorial walks you through a basic data anlaysis using Spark in Camber. We will do the following:
1. Load the Titanic dataset, hosted on Camber's Open Stash, which you have access to by default.
2. Use Spark functionalties to transform, aggregate said dataset

## Loading the dataset
First, `import camber`. We are also importing the functions module from `pyspark.sql` because it's needed for the following analysis.

In [1]:
import camber
from pyspark.sql import functions as sf

You can create a Spark session hassle free using the `camber.spark.connect` method. Specify the `engine_size`, and Camber will provision a Spark cluster to you.

In [2]:
spark = camber.spark.connect(engine_size="XSMALL")

Spark initialized! Remember to stop the spark session when done: spark.stop()


The Open Stash is accessed through `camber.stash`, and you can use it to load a dataset into a Spark DataFrame.

In [3]:
titanic = camber.stash.open_stash.read_spark("datasets/tutorial/titanic.csv", spark, format="csv", header=True)

With the DataFrame loaded, let's print its schema first.

In [4]:
titanic.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



We can also get a sample view of the DataFrame. Disabling the `truncate` option prints the full output for every column instead of trucating ones that are too long.

In [5]:
titanic.show(10, truncate=False) 

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22  |1    |0    |A/5 21171       |7.25   |null |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38  |1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss Laina                              |female|26  |0    |0    |STON/O2. 3101282|7.925  |null |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35  |1    |0    |113803          |53

Access columns in the DataFrame in one of the following ways. The rest of the tutorial uses all these methods to show what's possible.

In [6]:
titanic.select("PassengerId", titanic.PassengerId, sf.col("PassengerId")).show(5)

+-----------+-----------+-----------+
|PassengerId|PassengerId|PassengerId|
+-----------+-----------+-----------+
|          1|          1|          1|
|          2|          2|          2|
|          3|          3|          3|
|          4|          4|          4|
|          5|          5|          5|
+-----------+-----------+-----------+
only showing top 5 rows



## Analyzing the dataset

Find the distinct values of the `Embarked` column, and then order the output in ascending order.

In [7]:
titanic.select(titanic.Embarked).distinct().orderBy(titanic.Embarked).show()

+--------+
|Embarked|
+--------+
|    null|
|       C|
|       Q|
|       S|
+--------+



Notice how we are making extensive use of `DataFrame.show()` so far. This is because Spark executes lazily. You can find more info on it [here](https://medium.com/@roshmitadey/pyspark-transformations-v-s-actions-797fc8ad16ea). A rough idea is that certain methods create the execution graph, while others force the execution. Let's filter for all survived passengers.

In [8]:
survivors = titanic.filter(sf.col("Survived") == "1")
survivors.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss L...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female| 27|    0|    2|          347742|11.1333| null|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female| 14|    1|    0|          237736|30.0708| null|       C|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

Now let's count the number of passengers in each `Pclass` (passenger class).

In [9]:
classes = titanic.groupBy("Pclass").agg(sf.count("*").alias("Pcount")).orderBy("Pclass")
classes.show()

+------+------+
|Pclass|Pcount|
+------+------+
|     1|   216|
|     2|   184|
|     3|   491|
+------+------+



Wit the two DataFrames created, we can gain insights into survival rates by passenger class by joining them together on the values of Pclass (passenger class).

In [10]:
survival_rates_by_class = survivors.groupBy(
    sf.col("Pclass"),
).agg(
    sf.count("*").alias("SurvivorCount"),
).join(
    classes, on="Pclass",
).select(
    sf.col("Pclass"),
    (sf.round(sf.col("SurvivorCount") / sf.col("Pcount") * 100, 2)).alias("survivalRate")
).orderBy(sf.col("survivalRate").desc())
survival_rates_by_class.show()

+------+------------+
|Pclass|survivalRate|
+------+------------+
|     1|       62.96|
|     2|       47.28|
|     3|       24.24|
+------+------------+



Congratulations! You just ran a simple data analysis in Spark. Remember to call `spark.stop()` to kill your Spark session.

In [11]:
spark.stop()