# Theory

## Why Spark
### Big data processing
<img src="pics/global-data-generated-annually.webp" alt="The rate of data accumulation" width="800"/>

We will produce more data next year than all data produced prior to 2020.

At some point the data that needs processing exceeds the size of RAM on a single machine.

At this stage most of the in-memory processing tools including Pandas would raise an `OutOfMemory` exception.

Some packages can swap data between RAM and the permanent storage but it slows down the workflow a lot.

In addition, most of the queries can be parallelized but a single machine can not go beyond the number of its cores even if multithreading.

One solution would be to beef up the data processing server but it makes the price raise faster than the specs and up to a certain limit.

Another solution is to connect a number of cheap machines together and distribute the data processing tasks among them.

This turned out to scale up pretty well and gave a rise to Spark as a system that distributed the computational load across the cluster of devices and collects the results.

# When Spark

1. A lot of rows (>8M). Spark can process Terabytes of data, and with some effort can scale up to Petabytes (https://www.databricks.com/blog/2014/10/10/spark-petabyte-sort.html). It does however also mean that Spark is not very well suited for small data (up to a few million rows), it will work, but the overhead will be large. Pandas/Polars/Python would be better suited for analysis on small datasets.
2. Column-wise processing. Better database solutions exist for fast retrieval and saving of individual rows. Spark shines when column-wise operations like filtering, aggregation, transaformation, and so on are required


# How Spark

## Distributed computing framework
Distributed computing is the method of making multiple computers work together to solve a common problem.

For example, we need to perform some operations on a 100 GB file. Processing this data on a single machine can take hours or maybe days based on the operation. 
However, if the same file could be broken down into 100 files of 1GB each and then processed in parallel, our total time taken would become approximately 1/100th.

A Spark cluster consists of the following components shown in the image below:
<img src="pics/cluster-overview.png" alt="The rate of data accumulation" width="800"/>

All the Python calculations are performed at the driver.
The driver also instructs the nodes to load portions of the datasets and perform computations.
The results of the computations get sent to the driver node.
The commands are executed only when the result needs to be materialized (displayed/stored/etc).

For more background information, see: https://spark.apache.org/docs/latest/cluster-overview.html

# PySpark application
PySpark is a wrapper around Spark commands written in Scala.

Please download this NY Taxi data from here https://www.kaggle.com/datasets/neilclack/nyc-taxi-trip-data-google-public-data to practice PySpark skills through the exercises below

In [0]:
import pyspark.sql.functions as F

# 1. Basic operations

## 1.1 Read

**Example**: reading the trips dataset `taxi_trip_data` from the path `dbfs:/FileStore/tables/nyctaxi-training`

In [0]:
LOAD_PATH = "dbfs:/FileStore/tables/nyctaxi-training"

In [0]:
sdf_trips = spark.read.load(f"{LOAD_PATH}/taxi_trip_data")

**Task**: read the geo zones dataset from the taxi_zone_geo.csv file in the data folder

In [0]:
sdf_geo = spark.read.load(f"{LOAD_PATH}/taxi_zone_geo")

## 1.2 Display rows

**Example**: show a sample of 5 records from sdf_trips

In [0]:
sdf_trips.limit(5).toPandas()

**Task**: show a sample of 3 records from sdf_geo

In [0]:
sdf_geo.show(3)

## 1.3 Aggregate

**Example**: show the top 3 most used payment types

In [0]:
(
    sdf_trips
    .groupBy('payment_type')
    .count()
    .sort(F.desc('count'))
).show(3)

**Task**: show the top 3 borough names with the most zone ids

Extra credit for displaying the percentage of total zone ids

In [0]:
geo_row_count = sdf_geo.count()

(
    sdf_geo
    .groupBy('borough')
    .count()
    .sort(F.desc('count'))
    .withColumn('percentage', F.format_string('%2.2f%%', 100*F.col('count')/geo_row_count))
    .drop('all_counts')
).show(3)

## 1.4 Filter

**Example**: count of the rides with more than 3 passengers in the year May of 2018

In [0]:
n_full_car_rides = (
    sdf_trips
    .filter(F.date_format(F.col('pickup_datetime'), "MMyyyy") == "052018")
    .filter(F.col('passenger_count') > 3)
).count()

print(f"{n_full_car_rides:,}")

**Task**: find the cost of the most expensive ride that ended between 5pm and 6pm made paid with the payment type 1

In [0]:
cost_evening_ride = (
    sdf_trips
    .filter(F.date_format(F.col('dropoff_datetime'), "HH") == "17")
    .filter(F.col('payment_type') == 1)
    .agg(F.max("fare_amount"))
).collect()[0][0]

print(f"${cost_evening_ride:,.2f}")

## 1.5 Deduplicate

**Example**: keep only one trip for each unique combination of passenger_count and rate_code

In [0]:
(
    sdf_trips
    .dropDuplicates(["passenger_count", "rate_code"])
).limit(5).toPandas()

**Task**: keep only one zone name for each zone id in the `geo` dataset

In [0]:
(
    sdf_geo
    .dropDuplicates(["zone_id"])
).show(5)

## 1.6 Change conditionally

**Example**: reduce the `tolls_amount` by half for the trips that started and ended in the different zones

In [0]:
(
    sdf_trips
    .withColumn(
        "new_toll",
        F.when(F.col("pickup_location_id") != F.col("dropoff_location_id"), F.col("tolls_amount") * 0.5)
        .otherwise(F.col("tolls_amount"))
    )
).limit(5).toPandas()

**Task**: set the fare_amount to 0 for the negative fares and cap the positive ones by $10

In [0]:
(
    sdf_trips
    .withColumn(
        "fare_amount",
        F.when(F.col("fare_amount") < 0, F.lit(0))
        .when(F.col("fare_amount") > 10, F.lit(10))
        .otherwise(F.col("fare_amount"))
    )
).limit(5).toPandas()

## 1.7 Join

**Example**: how many single passengers were picked up in Bronx borough

In [0]:
n_single_bronx_rides = (
    sdf_trips
    .filter(F.col("passenger_count") == 1)
    .join(
        sdf_geo
        .filter(F.col("borough") == "Bronx"),
        sdf_trips.pickup_location_id == sdf_geo.zone_id,
        how="inner",
    )
).count()

print(f"{n_single_bronx_rides:,} single Bronx rides")

**Task**: What is the longest ride to JFK Airport zone for less than $20?

In [0]:
longest_cheap_trip = (
    sdf_trips
    .filter(F.col("passenger_count") == 1)
    .join(
        sdf_geo
        .filter(F.col("zone_name") == "JFK Airport"),
        sdf_trips.dropoff_location_id == sdf_geo.zone_id,
        how="inner",
    )
    .filter(F.col("fare_amount") < 20)
    .sort(F.desc("trip_distance"))
    .limit(1)
).collect()[0]["trip_distance"]

print(f"Price for the longest cheap trip: ${longest_cheap_trip:,.2f}")

# 2. User defined functions
When you think you ran out of Pyspark native options

In [0]:
import pyspark.sql.types as T

**Example**: round up the tip amount

In [0]:
from math import ceil

def spark_ceil(x):
    return ceil(x)

spark_ceil_udf = F.udf(spark_ceil, T.IntegerType())

In [0]:
(
    sdf_trips
    .withColumn("tip_amount_rounded_up", spark_ceil_udf(F.col("tip_amount")))
    .select("tip_amount_rounded_up", "tip_amount")
    .sample(0.001)
).show(5)

**Task**: calculate cosine of the tolls amount

In [0]:
from math import cos

def spark_cos(x):
    return cos(x)

spark_cos_udf = F.udf(spark_cos, T.FloatType())

In [0]:
(
    sdf_trips
    .withColumn("cos_tolls_amount", spark_cos_udf(F.col("tolls_amount")))
    .select("cos_tolls_amount", "tolls_amount")
).show(5)

# 3. Transformations within slices/windows

In [0]:
from pyspark.sql import Window

**Example**: keep the 3 latest rides from each pickup zone

In [0]:
(
    sdf_trips
    .withColumn(
        "row_number",
        F.row_number()
        .over(
            Window
            .partitionBy("pickup_location_id")
            .orderBy(F.desc("pickup_datetime"))
        )
    )
    .filter(F.col("row_number") < 4)
    .drop("row_number")
).limit(5).toPandas()

**Task**: calculate a cumulative sum of mta_tax starting from the earliest for each payment type

Extra credit: how many trips did it take to accumulate $5 in mta_tax for each payment type?

In [0]:
TO_ACCUMULATE = 5

(
    sdf_trips
    .withColumn(
        "cumulative_mta_tax",
        F.sum("mta_tax")
        .over(
            Window
            .partitionBy("payment_type")
            .orderBy(F.col("pickup_datetime"))
            .rowsBetween(Window.unboundedPreceding, Window.currentRow)
        )
    )
    .filter(F.col("cumulative_mta_tax") <= TO_ACCUMULATE)
    .groupBy("payment_type")
    .agg(
        F.count("*").alias("trips_count"),
        F.max("cumulative_mta_tax").alias("max_cum")
    )
    .select(
        "payment_type",
        F.when(F.col("max_cum") >= TO_ACCUMULATE, F.col("trips_count"))
        .otherwise(F.lit(None))
        .alias("trips_count")
    )
).show(50)

# 4. Structures and Arrays

## 4.1 Structures

**Example**: put pickup and dropoff locations to the `zone_ids` structure

In [0]:
(
    sdf_trips
    .withColumn(
        "zone_ids",
        F.struct('pickup_location_id','dropoff_location_id')
    )
).printSchema()

**Task 1**: create a column that would hold a structure called `payments` that would include the total and substructure with all the contributions to this payment

**Task 2**: create a dataframe `sdf_structured` with the column above while the contributing columns are dropped

In [0]:
sdf_structured = (
    sdf_trips
    .withColumn(
        "contributions",
        F.struct(*sdf_trips.columns[8:12])
    )
    .withColumn(
        'payments',
        F.struct("contributions", "total_amount")
    )
    .drop(*sdf_trips.columns[8:13])
    .drop("contributions")
)

sdf_structured.printSchema()

## 4.2 Arrays

**Example**: collect an array of trip distances for each `passenger_count`, `rate_code`, and `payment_type` combination

In [0]:
(
    sdf_trips
    .groupBy(
        "passenger_count",
        "rate_code",
        "payment_type",
    )
    .agg(
        F.collect_list('trip_distance').alias('distances')
    )
).show(5)

**Task**: create a column that holds the 5 latest payments structures from `sdf_structured` for each pickup location and store the resulting data frame as `sdf_structured_arrayed`

**Extra Credit**: extract the 3rd tip amount from the array for each location

In [0]:
sdf_structured_arrayed = (
    sdf_structured
    .withColumn(
        "row_number",
        F.row_number()
        .over(
            Window
            .partitionBy("pickup_location_id")
            .orderBy(F.desc("pickup_datetime"))
        )
    )
    .filter(F.col("row_number") < 6)
    .groupBy("pickup_location_id")
    .agg(
        F.collect_list('payments').alias('payments')
    )
    .withColumnRenamed("pickup_location_id", "zone_id")
    .join(
        sdf_geo,
        on="zone_id",
        how="left",
    )
)
sdf_structured_arrayed.printSchema()

(
    sdf_structured_arrayed
    .select(
        "zone_name",
        "payments",
        (F.col("payments.contributions.tip_amount")[2]).alias('the_trird_tip')
    )
).limit(5).display()

## 4.3 Extract data

**Example**: Extract the total amount from `sdf_structured` into an individual column

In [0]:
(
    sdf_structured
    .select("*", "payments.total_amount")
).limit(5).display()

**Task**: Extract contributions from `sdf_structured_arrayed` into individual rows

In [0]:
(
    sdf_structured_arrayed
    .withColumn('payments', F.explode('payments'))
    .select("*", "payments.contributions.*")
).show(5)

# 5. Time

## 5.1 Format time

**Example**: keep only the rides that went over at least one night and change pickup and dropoff times to the format like `April 27, 1967`

Make sure to exclude the ones traveling back in time (dropped off before picked up)

In [0]:
(
    sdf_trips
    .withColumn(
        "date_diff",
        F.datediff('dropoff_datetime', 'pickup_datetime')
    )
    .filter(F.col("date_diff") > 0)
    .withColumn('pickup_date', F.date_format(F.col("pickup_datetime"), 'MMMM d, yyy'))
    .withColumn('dropoff_date', F.date_format(F.col("dropoff_datetime"), 'MMMM d, yyy'))
).limit(5).display()

**Task**: keep only the weekend rides (started on either Saturday or Sunday) and specify which day it was in the corresponding column

In [0]:
(
    sdf_trips
    .withColumn(
        "day_of_week",
        F.dayofweek("pickup_datetime")
    )
    .filter(F.col("day_of_week").isin([1, 7]))
    .withColumn(
        "day_of_week",
        F.when(F.col("day_of_week") == 1, F.lit("Sunday"))
        .otherwise(F.lit("Saturday"))
    )
    .select("pickup_datetime", "day_of_week")
    .sample(0.001)
).show(5)

## 5.2 Unix time

**Example**: keep only the rides that ended on the day corresponding to the following unix timestamp `1521552311`

In [0]:
(
    sdf_trips
    .filter(F.to_date("dropoff_datetime") == F.to_date(F.from_unixtime(F.lit("1521552311"))))
    .select("dropoff_datetime", F.to_unix_timestamp("pickup_datetime").alias("uts"))
    .sample(0.001)
).show(5)

**Task**: find the date corresponding twice the unix pickup date for each ride

In [0]:
(
    sdf_trips
    .withColumn(
        "ts_doubled",
        2*F.unix_timestamp("pickup_datetime")
    )
    .withColumn(
        "doubled_date",
        F.to_date(F.from_unixtime(F.col("ts_doubled")))
    )
    .select("pickup_datetime", "ts_doubled", "doubled_date")
    .sample(0.01)
).show(5)

# 6. Drying the code

## 6.1 Windows

**Example**: rewrite the example in part 3 with window defined in a separate variable

In [0]:
window_pickup = Window.partitionBy("pickup_location_id").orderBy(F.desc("pickup_datetime"))

(
    sdf_trips
    .withColumn("row_number", F.row_number().over(window_pickup))
    .filter(F.col("row_number") < 4)
).limit(5).display()

**Task**: rewrite the task from part 3 with window defined in a separate variable

**Extra credit**: rewrite the filtering condition as a separate variable

In [0]:
TO_ACCUMULATE = 5
window_payment_type = Window.partitionBy("payment_type").orderBy(F.col("pickup_datetime")).rowsBetween(Window.unboundedPreceding, Window.currentRow)
limit_cum_mta = F.col("cumulative_mta_tax") <= TO_ACCUMULATE

(
    sdf_trips
    .withColumn(
        "cumulative_mta_tax",
        F.sum("mta_tax")
        .over(window_payment_type)
    )
    .filter(limit_cum_mta)
    .groupBy("payment_type")
    .agg(
        F.count("*").alias("trips_count"),
        F.max("cumulative_mta_tax").alias("max_cum")
    )
    .select(
        "payment_type",
        F.when(F.col("max_cum") >= TO_ACCUMULATE, F.col("trips_count"))
        .otherwise(F.lit(None))
        .alias("trips_count")
    )
).show(50)

## 6.2 Parameterization

**Example**: multiply each of the monetary columns by the number of symbols in the column name and add this value to the column name containing the result

In [0]:
(
    sdf_trips
    .select(
        *sdf_trips.columns[:8],
        *[(F.col(c)*len(c)).alias(f"{c} * {len(c)}") for c in sdf_trips.columns[8:-2]],
        *sdf_trips.columns[8:-2],
        *sdf_trips.columns[-2:],
    )
).limit(5).display()

**Task**: rewrite the example from 5.1 by creating a function that takes the date time column name and returns the formatted output

**Extra credit**: make the function work in a loop by providing a list of columns to transform

In [0]:
def transform_date(col):
    return F.date_format(F.col(col), 'MMMM d, yyy')

(
    sdf_trips
    .withColumn(
        "date_diff",
        F.datediff('dropoff_datetime', 'pickup_datetime')
    )
    .filter(F.col("date_diff") > 0)
    .select(
        "*",
        *[transform_date(col).alias(col[:-4])
          for col in sdf_trips.columns
          if "datetime" in col]
    )
).limit(5).display()

# 7. Prototyping

## 7.1 Sampling

**Example**: calculate the number of combinations of rides such that the drop off location for one is the pick up location for another

**Hint**: stop the execution if spent more than 5 minutes waiting and proceed to the following task

In [0]:
sdf_pairs = (
    sdf_trips
    .select(
        F.col("pickup_location_id").alias("pickup_location_id_a"),
        F.col("dropoff_location_id").alias("dropoff_location_id_a"),
        F.col("dropoff_location_id").alias("to_join"),
    )
    .join(
        sdf_trips
        .select(
            F.col("pickup_location_id").alias("pickup_location_id_b"),
            F.col("dropoff_location_id").alias("dropoff_location_id_b"),
            F.col("pickup_location_id").alias("to_join"),
        ),
        on="to_join",
        how='inner',
    )
)

In [0]:
# counting the combinations
# print(f"{sdf_sampled_pairs.count()=:,}")

**Task**: prototype the code from the example for the 0.1% sample of the rides

In [0]:
sdf_trips_sample = sdf_trips.sample(0.001)

sdf_sampled_pairs = (
    sdf_trips_sample
    .select(
        F.col("pickup_location_id").alias("pickup_location_id_a"),
        F.col("dropoff_location_id").alias("dropoff_location_id_a"),
        F.col("dropoff_location_id").alias("to_join"),
    )
    .join(
        sdf_trips_sample
        .select(
            F.col("pickup_location_id").alias("pickup_location_id_b"),
            F.col("dropoff_location_id").alias("dropoff_location_id_b"),
            F.col("pickup_location_id").alias("to_join"),
        ),
        on="to_join",
        how='inner',
    )
)

print(f"{sdf_sampled_pairs.count()=:,}")

## 7.2 Saving intermediate results

**Example**: For the task in 7.1: save the first 0.1% of rides before joining it to the whole dataset

In [0]:
sdf_trips_sample = sdf_trips.sample(0.001)

my_name = "nicolas_bouvier"
path_to_save = f"dbfs:/FileStore/tables/{my_name}_sdf_trips_sample"

(
    sdf_trips_sample
    .write.mode("overwrite")
    .option("overwriteSchema", "True")
    .saveAsTable(f"sdf_trips_sample_{my_name}", path=path_to_save)
)

sdf_trips_saved_sample = spark.read.load(path_to_save)


**Task**: process at least 10 batches of the 0.1% samples to get the corresponding scores

In [0]:
# TODO

## 7.3 Duck typing

**Example**: make a method `.prc()` applied to a dataframe print row count of that dataframe and test it on `sdf_trips` dataset 

In [0]:
from pyspark.sql.dataframe import DataFrame

def _prc(self):
    print(f"{self.count():,}")

DataFrame.prc = _prc

sdf_trips.prc()

**Task**: make `.pvc(col)` applied to a dataframe return another dataframe with row count for each unique entry in the column `col` and test it on `sdf_geo` dataset column `borough`

**Extra credit** the dataframe should also contain a column for the percentage of each unique entry

In [0]:
def _pvc(self, col):
    return (
        self
        .withColumn('total_count', F.count("*").over(Window.partitionBy(F.lit(1))))
        .groupBy(col)
        .agg(
            F.count("*").alias("count"),
            F.format_string('%.1f%%', 100*F.count("*")/F.max('total_count')).alias("percentage"),
        )
        .sort(F.desc("count"))
    )

DataFrame.pvc = _pvc

sdf_geo.pvc('borough').show()
