## First Look

Video: [DE Zoomcamp 5.3.1 - First Look at Spark/PySpark](https://www.youtube.com/watch?v=r_Sf6fCB40c&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=54)


In [1]:
import pyspark

pyspark.__file__, pyspark.__version__

('/home/sugab/spark/spark-3.5.1-bin-hadoop3/python/pyspark/__init__.py',
 '3.5.1')

### Setup Spark Session


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

24/03/02 12:23:56 WARN Utils: Your hostname, sugab-archlinux resolves to a loopback address: 127.0.1.1; using 192.168.81.206 instead (on interface wlan0)
24/03/02 12:23:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/02 12:23:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2021-01.csv.gz
!gunzip fhv_tripdata_2021-01.csv.gz

--2024-03-02 12:07:05--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2021-01.csv.gz
Loaded CA certificate '/etc/ssl/certs/ca-certificates.crt'
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/e896902d-d6b3-4b1a-967d-f76edcd6da52?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240302%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240302T050706Z&X-Amz-Expires=300&X-Amz-Signature=9406cefc38a5ab4fb788d02f6cc50aa8a3c42da9ffc24d6f12ede671beb1fa71&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2021-01.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-02 12:07:06--  https://objects.githubusercontent.com/

In [3]:
# read csv using spark
df = spark.read.option("header", True).csv("fhv_tripdata_2021-01.csv")

                                                                                

In [4]:
# print schema
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

```
StructType(
    [
        StructField('dispatching_base_num', StringType(), True),
        StructField('pickup_datetime', StringType(), True),
        StructField('dropoff_datetime', StringType(), True),
        StructField('PULocationID', StringType(), True),
        StructField('DOLocationID', StringType(), True),
        StructField('SR_Flag', StringType(), True),
        StructField('Affiliated_base_number', StringType(), True)
    ]
)
```

Spark can't infer the schema of the data, so we need to define it manually.


### Infer Datatypes with Pandas


In [10]:
# make smaller file to read in pandas
!head -n 1001 fhv_tripdata_2021-01.csv > head.csv

In [5]:
# read head csv using pandas

import pandas as pd

df_pandas = pd.read_csv("head.csv")
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropoff_datetime           object
PULocationID              float64
DOLocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [6]:
# convert pandas dataframe to spark dataframe
spark.createDataFrame(df_pandas).schema

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', DoubleType(), True), StructField('DOLocationID', DoubleType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

```
StructType(
    [
        StructField('dispatching_base_num', StringType(), True),
        StructField('pickup_datetime', StringType(), True),
        StructField('dropoff_datetime', StringType(), True),
        StructField('PULocationID', DoubleType(), True),
        StructField('DOLocationID', DoubleType(), True),
        StructField('SR_Flag', DoubleType(), True),
        StructField('Affiliated_base_number', StringType(), True)
    ]
)
```

As we can see, after using Pandas to infer the datatypes, all columns are better defined. But we need to change the datatypes of the columns `pickup_datetime` and `dropoff_datetime` to `TimestampType`.


### Import Data using Spark with the Correct Schema


In [49]:
from pyspark.sql import types

schema = types.StructType(
    [
        types.StructField("dispatching_base_num", types.StringType(), True),
        types.StructField("pickup_datetime", types.TimestampType(), True),
        types.StructField("dropoff_datetime", types.TimestampType(), True),
        types.StructField("PULocationID", types.IntegerType(), True),
        types.StructField("DOLocationID", types.IntegerType(), True),
        types.StructField("SR_Flag", types.StringType(), True),
    ]
)

In [50]:
df = spark.read.option("header", "true").schema(schema).csv("fhv_tripdata_2021-01.csv")
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [51]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+--------------------+-------------------+-------------------+------------+------------+-------+
|              B00009|2021-01-01 00:27:00|2021-01-01 00:44:00|        NULL|        NULL|   NULL|
|              B00009|2021-01-01 00:50:00|2021-01-01 01:07:00|        NULL|        NULL|   NULL|
|              B00013|2021-01-01 00:01:00|2021-01-01 01:51:00|        NULL|        NULL|   NULL|
|              B00037|2021-01-01 00:13:09|2021-01-01 00:21:26|        NULL|          72|   NULL|
|              B00037|2021-01-01 00:38:31|2021-01-01 00:53:44|        NULL|          61|   NULL|
|              B00037|2021-01-01 00:59:02|2021-01-01 01:08:05|        NULL|          71|   NULL|
|              B00037|2021-01-01 00:18:12|2021-01-01 00:30:04|        NULL|          91|   NULL|
|              B00037|2021-01-

24/03/02 12:59:17 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 6
CSV file: file:///home/sugab/workspace/de-zoomcamp/05-batch/code/fhv_tripdata_2021-01.csv


### Partitions


In [52]:
df = df.repartition(24)

In [54]:
df.write.parquet("fhv/2021/01/", mode="overwrite")

24/03/02 12:59:47 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 6
CSV file: file:///home/sugab/workspace/de-zoomcamp/05-batch/code/fhv_tripdata_2021-01.csv
                                                                                

## Spark Dataframe

Video: [DE Zoomcamp 5.3.2 - Spark DataFrames](https://www.youtube.com/watch?v=ti3aC1m3rE8&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=55)


In [55]:
df = spark.read.parquet("fhv/2021/01/")

# Unlike CSV files, parquet files contain the schema of the dataset
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [56]:
df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+--------------------+-------------------+-------------------+------------+------------+-------+
|              B01349|2021-01-06 07:20:00|2021-01-06 10:27:00|        NULL|        NULL|   NULL|
|              B02947|2021-01-09 12:00:00|2021-01-09 12:23:36|         265|          75|   NULL|
|              B01667|2021-01-09 11:24:22|2021-01-09 11:41:45|        NULL|          92|   NULL|
|              B03002|2021-01-07 10:21:03|2021-01-07 10:39:49|        NULL|         209|   NULL|
|              B01984|2021-01-08 18:31:00|2021-01-08 18:39:00|        NULL|          16|   NULL|
+--------------------+-------------------+-------------------+------------+------------+-------+
only showing top 5 rows



In [57]:
new_df = df.select(
    "pickup_datetime", "dropoff_datetime", "PULocationID", "DOLocationID"
)
new_df.show(5)

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-06 07:20:00|2021-01-06 10:27:00|        NULL|        NULL|
|2021-01-09 12:00:00|2021-01-09 12:23:36|         265|          75|
|2021-01-09 11:24:22|2021-01-09 11:41:45|        NULL|          92|
|2021-01-07 10:21:03|2021-01-07 10:39:49|        NULL|         209|
|2021-01-08 18:31:00|2021-01-08 18:39:00|        NULL|          16|
+-------------------+-------------------+------------+------------+
only showing top 5 rows



In [60]:
new_df = df.select(
    "pickup_datetime", "dropoff_datetime", "PULocationID", "DOLocationID"
).filter(df.dispatching_base_num == "B00856")
new_df.show(5)

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-08 17:36:24|2021-01-08 17:43:56|        NULL|         197|
|2021-01-04 07:45:30|2021-01-04 08:30:44|        NULL|          89|
|2021-01-07 12:49:26|2021-01-07 13:09:14|        NULL|         188|
|2021-01-01 00:37:06|2021-01-01 00:40:16|        NULL|          85|
|2021-01-06 00:30:35|2021-01-06 00:34:14|        NULL|          76|
+-------------------+-------------------+------------+------------+
only showing top 5 rows



### Functions


In [61]:
from pyspark.sql import functions as F

In [63]:
df \
    .withColumn("pickup_date", F.to_date(df.pickup_datetime)) \
    .withColumn("dropoff_date", F.to_date(df.dropoff_datetime)) \
    .select("pickup_date", "dropoff_date", "PULocationID", "DOLocationID") \
    .show(5)

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-06|  2021-01-06|        NULL|        NULL|
| 2021-01-09|  2021-01-09|         265|          75|
| 2021-01-09|  2021-01-09|        NULL|          92|
| 2021-01-07|  2021-01-07|        NULL|         209|
| 2021-01-08|  2021-01-08|        NULL|          16|
+-----------+------------+------------+------------+
only showing top 5 rows



### User Defined Functions (UDF)

In [64]:
# A crazy function that changes values when they're divisible by 7 or 3
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f"s/{num:03x}"
    elif num % 3 == 0:
        return f"a/{num:03x}"
    else:
        return f"e/{num:03x}"


# Creating the actual UDF
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [65]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/545| 2021-01-06|  2021-01-06|        NULL|        NULL|
|  s/b83| 2021-01-09|  2021-01-09|         265|          75|
|  e/683| 2021-01-09|  2021-01-09|        NULL|          92|
|  e/bba| 2021-01-07|  2021-01-07|        NULL|         209|
|  e/7c0| 2021-01-08|  2021-01-08|        NULL|          16|
|  a/315| 2021-01-07|  2021-01-07|        NULL|        NULL|
|  a/76b| 2021-01-08|  2021-01-08|        NULL|        NULL|
|  e/479| 2021-01-09|  2021-01-09|        NULL|         247|
|  a/06f| 2021-01-08|  2021-01-08|        NULL|        NULL|
|  s/4a6| 2021-01-06|  2021-01-06|        NULL|          49|
|  e/3c5| 2021-01-08|  2021-01-08|         221|         245|
|  e/479| 2021-01-08|  2021-01-08|        NULL|         127|
|  e/67f| 2021-01-01|  2021-01-01|        NULL|          70|
|  s/c5c| 2021-01-02|  2