## Seting up PySpark

In [1]:
# Import libraries
import pandas as pd
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

# Ensure that pyspark is available and that the environment variables are set
findspark.init()

In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [11]:
# Read data and print schema
df = spark.read.parquet('fhvhv/2021/01/')
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

In [12]:
# A crazy function that changes values when they're divisible by 7 or 3
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

# Creating the actual UDF
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

# Using the UDF
df.withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/acc| 2021-01-11|  2021-01-11|         262|         231|
|  e/a39| 2021-01-05|  2021-01-05|          61|         181|
|  e/9ce| 2021-01-02|  2021-01-02|         100|           1|
|  e/b42| 2021-01-31|  2021-01-31|         232|           4|
|  s/af0| 2021-01-05|  2021-01-05|         162|           1|
|  a/b43| 2021-01-27|  2021-01-27|          68|          68|
|  e/9ce| 2021-01-18|  2021-01-18|         205|         205|
|  e/b35| 2021-01-30|  2021-01-30|         256|         255|
|  e/b3b| 2021-01-16|  2021-01-16|          89|          91|
|  e/9ce| 2021-01-05|  2021-01-05|         132|         102|
|  e/acc| 2021-01-11|  2021-01-11|          97|          61|
|  e/9ce| 2021-01-22|  2021-01-22|          79|          37|
|  e/b32| 2021-01-03|  2021-01-03|          26|         178|
|  a/b49| 2021-01-14|  2

In [None]:
spark.stop()

## PySpark SQL

In [4]:
# Load libraries and create SparkSession
# Import libraries
import pandas as pd
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

# Ensure that pyspark is available and that the environment variables are set
findspark.init()

# Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('pyspark-sql') \
    .getOrCreate()

In [5]:
# Read parquet files
df_green = spark.read.parquet("data/pq/green/*/*")
df_yellow = spark.read.parquet("data/pq/yellow/*/*")

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [7]:
# Rename columns to match pickup_datetime and dropoff_datetime
df_green = df_green \
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

df_yellow = df_yellow \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")


In [10]:
# Get common columns in both dataframes in original order
common_cols = [col for col in df_green.columns if col in df_yellow.columns]
# Select common columns in both dataframes
df_green_mod = df_green.select(common_cols)
df_yellow_mod = df_yellow.select(common_cols)
# Establish service type in each dataframe
df_green_mod = df_green_mod.withColumn("service_type", F.lit("green"))
df_yellow_mod = df_yellow_mod.withColumn("service_type", F.lit("yellow"))
# Combine dataframes
trips_data = df_green_mod.unionAll(df_yellow_mod)
# Count records by service type
trips_data.groupBy("service_type").count().show()

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In [18]:
# Register the DataFrame as a temporary table
trips_data.registerTempTable("trips_data")
# Run a SQL query
spark.sql("""
SELECT 
    service_type,
    count(1) as record_count
FROM
    trips_data
GROUP BY
    service_type
""").show()

+------------+------------+
|service_type|record_count|
+------------+------------+
|       green|     2304517|
|      yellow|    39649199|
+------------+------------+



In [19]:
# Run query used in dbt modeling
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

# Show query results
df_result.show()

+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|         127|2020-

In [20]:
# Write results to parquet using 1 partition
df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')

## Groupby in Spark

In [22]:
# Load libraries and create SparkSession
# Import libraries
import pandas as pd
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

# Ensure that pyspark is available and that the environment variables are set
findspark.init()

# Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('pyspark-sql') \
    .getOrCreate()

In [24]:
# Read green data and register as temporary table
df_green = spark.read.parquet("data/pq/green/*/*")
df_green.registerTempTable('green')



In [26]:
# Execute a SQL query
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [27]:
# Write results to parquet using 20 partitions
df_green_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/green', mode='overwrite')

In [28]:
# Read yellow data and register as temporary table
df_yellow = spark.read.parquet('data/pq/yellow/*/*')
df_yellow.registerTempTable('yellow')

In [29]:
# Get report revenue for yellow taxis
df_yellow_revenue = spark.sql("""
SELECT 
    date_trunc('hour', tpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    yellow
WHERE
    tpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [30]:
# Write results to parquet using 20 partitions
df_yellow_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/yellow', mode='overwrite')

In [31]:
# Read back data from parquet
df_green_revenue = spark.read.parquet('data/report/revenue/green')
df_yellow_revenue = spark.read.parquet('data/report/revenue/yellow')
# Rename columns to join dataframes
df_green_revenue_tmp = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records')

df_yellow_revenue_tmp = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records')

In [32]:
# Perform an outer join on the dataframes
df_join = \
    df_green_revenue_tmp \
        .join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')

In [33]:
df_join.show()

+-------------------+----+------------------+--------------------+------------------+---------------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|
+-------------------+----+------------------+--------------------+------------------+---------------------+
|2020-01-01 00:00:00|  22|              15.8|                   1|              null|                 null|
|2020-01-01 00:00:00|  25|             531.0|                  26|            324.35|                   16|
|2020-01-01 00:00:00|  55|129.29000000000002|                   4|              null|                 null|
|2020-01-01 00:00:00|  56|             99.69|                   3|              18.1|                    2|
|2020-01-01 00:00:00|  60|            160.04|                   6|57.620000000000005|                    2|
|2020-01-01 00:00:00|  61|            526.71|                  17|            146.64|                    3|
|2020-01-01 00:00:00|  65|  

In [34]:
# Write joined data
df_join.write.parquet('data/report/revenue/total', mode='overwrite')

In [35]:
# Read zones dataframe
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')
df_zones.write.parquet('zones')
df_zones = spark.read.parquet('zones')

In [39]:
# Merge large dataframe with small dataframe
# Merge total revenue with zone data
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)
# Write merged results to a parquet file
df_result.drop('LocationID', 'zone').write.parquet('tmp/revenue-zones')

## RDDs operations - Map and Reduce

In [1]:
# Load libraries and create SparkSession
# Import libraries
import pandas as pd
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
from datetime import datetime
from collections import namedtuple

# Ensure that pyspark is available and that the environment variables are set
findspark.init()

# Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('rdd') \
    .getOrCreate()

In [3]:
# Read green data and transform it into an RDD object
df_green = spark.read.parquet('data/pq/green/*/*')
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [6]:
# Define functions to be used in the rdd methods

def filter_outliers(row):
    return row.lpep_pickup_datetime >= datetime(year=2020, month=1, day=1)

def prepare_for_grouping(row): 
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

def unwrap(row):
    RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [11]:
# Define schema for the dataframe
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [13]:
# Apply RDD methods and write results to a dataframe
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema) 

df_result.write.parquet('tmp/green-revenue')

## RDDs operations - MapPartitions

In [14]:
import pandas as pd
columns = \
    ['VendorID', 'lpep_pickup_datetime', 'PULocationID',
    'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [18]:
# Define ML simple model
def model_predict(df):
#     y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

# Define function to apply the model in batchs (each Spark partition). Partitions
# might not be balanced, so slicing (itertools.slice) each partition might be necessary.
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions
    # It returns a generator that gathers the rows of the dataframe
    for row in df.itertuples():
        yield row

In [19]:
# Apply the ML model for each Spark partition using mapPartitions
df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch)\
    .toDF() \
    .drop('Index')

df_predicts.select('predicted_duration').show()

+------------------+
|predicted_duration|
+------------------+
|3.9000000000000004|
|               4.9|
|              13.5|
|               4.0|
|             11.65|
|13.100000000000001|
|5.6499999999999995|
| 6.800000000000001|
|             55.75|
|               8.9|
|               5.0|
|             13.75|
|               5.5|
|             19.05|
|              9.25|
|              45.7|
|               5.2|
| 5.699999999999999|
|              5.75|
|4.6000000000000005|
+------------------+
only showing top 20 rows



In [20]:
spark.stop()

## Spark in the Cloud

In [21]:
# Upload data to GCS
!gsutil -m cp -r data/pq gs://dtc_data_lake_dtc-data-engineering-377412

Copying file://data\pq\green\2020\02\.part-00002-1775226c-d7af-47a0-9cf5-4953c84fb628-c000.snappy.parquet.crc [Content-Type=application/octet-stream]...
/ [0/380 files][    0.0 B/  1.1 GiB]   0% Done                                  
Copying file://data\pq\green\2020\02\part-00001-1775226c-d7af-47a0-9cf5-4953c84fb628-c000.snappy.parquet [Content-Type=application/octet-stream]...
/ [0/380 files][    0.0 B/  1.1 GiB]   0% Done                                  
Copying file://data\pq\green\2020\02\._SUCCESS.crc [Content-Type=application/octet-stream]...
/ [0/380 files][    0.0 B/  1.1 GiB]   0% Done                                  
Copying file://data\pq\green\2020\01\part-00001-4c35630f-8821-4e51-bbab-7b18067752cd-c000.snappy.parquet [Content-Type=application/octet-stream]...
Copying file://data\pq\green\2020\01\part-00003-4c35630f-8821-4e51-bbab-7b18067752cd-c000.snappy.parquet [Content-Type=application/octet-stream]...
/ [0/380 files][    0.0 B/  1.1 GiB]   0% Done                    

In [22]:
# 1) Download the GCS connector for Hadoop - 
# we need to tell Spark how to access GCS
!gsutil cp gs://hadoop-lib/gcs/gcs-connector-hadoop3-2.2.5.jar C:\tools\gcs-connector-hadoop3-2.2.5.jar

Copying gs://hadoop-lib/gcs/gcs-connector-hadoop3-2.2.5.jar...
/ [0 files][    0.0 B/ 30.1 MiB]                                                
-
- [0 files][  9.4 MiB/ 30.1 MiB]                                                
\
\ [1 files][ 30.1 MiB/ 30.1 MiB]                                                

Operation completed over 1 objects/30.1 MiB.                                     


In [1]:
# 3) Create a SparkSession with the GCS connector
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

# Ensure that pyspark is available and that the environment variables are set
findspark.init()

# Setting the credentials for GCS and the GCS connector
credentials_location = ".\\.gc\\ny-rides.json"

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "C:\\tools\\gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [2]:
# 4) Set the Spark Context: tell Hadoop to use the GCS connector and the credentials
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [3]:
# 5) Create a SparkSession
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [4]:
# Read data from GCS and show first 5 rows
df_green = spark.read.parquet('gs://dtc_data_lake_dtc-data-engineering-377412/pq/green/*/*')
df_green.head(5)

In [6]:
spark.stop()

## Creating a local Spark cluster - standalone mode

In [11]:
# Load libraries and create SparkSession
# Import libraries
import pandas as pd
import pyspark
import findspark
from pyspark.sql import SparkSession

# Ensure that pyspark is available and that the environment variables are set
findspark.init()

# Create a SparkSession
spark = SparkSession.builder \
    .master("spark://localhost:7077") \
    .appName('standalone') \
    .getOrCreate()

In [2]:
# Check that Spark Standalone is running and working properly
df_green = spark.read.parquet('data/pq/green/*/*')
df_green.head()

Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), lpep_dropoff_datetime=datetime.datetime(2020, 1, 12, 18, 19, 52), store_and_fwd_flag='N', RatecodeID=1, PULocationID=41, DOLocationID=41, passenger_count=1, trip_distance=0.78, fare_amount=5.5, extra=0.0, mta_tax=0.5, tip_amount=1.58, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=7.88, payment_type=1, trip_type=1, congestion_surcharge=0.0)

In [4]:
# Run Python script in Spark Standalone
!python test_spark_sql.py \
    --input_green=data/pq/green/2020/*/ \
    --input_yellow=data/pq/yellow/2020/*/ \
    --output=data/report-2020

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

[Stage 0:>                                                          (0 + 1) / 1]

                                                                                

[Stage 2:>                                                         (0 + 8) / 17]

[Stage 2:>                                                         (0 + 9) / 17]










[Stage 4:>                                                          (0 + 1) / 1]

                                                                                


23/03/05 22:55:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
SUCCESS: The process with PID 16528 (child process of PID 1044) has been terminated.
SUCCESS: The process with PID 1044 (child process of PID 19472) has been terminated.
SUCCESS: The process with PID 19472 (child process of PID 16016) has been terminated.


In [12]:
# Run Python script in Spark Standalone
!python test_spark_sql.py \
    --input_green=data/pq/green/2021/*/ \
    --input_yellow=data/pq/yellow/2021/*/ \
    --output=data/report-2021

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

[Stage 0:>                                                          (0 + 1) / 1]

                                                                                

[Stage 2:>                                                         (0 + 8) / 17]

[Stage 2:===>                                                      (1 + 8) / 17]















[Stage 4:>                                                          (0 + 1) / 1]

                                                                                


23/03/06 00:29:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
SUCCESS: The process with PID 16092 (child process of PID 1232) has been terminated.
SUCCESS: The process with PID 1232 (child process of PID 7100) has been terminated.
SUCCESS: The process with PID 7100 (child process of PID 8504) has been terminated.


Use spark-submit for running Python scripts in Spark Standalone
Before running this, you should stop the standalone application created above
Run this on CMD, following the next steps:
- Make sure the PYSPARK_PYTHON env variable is correctly set up.
    - Check where python
    - Set the first value of where python (Python path of the corresponding environment) as PYSPARK_PYTHON: `set PYSPARK_PYTHON=C:\Users\aarro\anaconda3\envs\de-zoomcamp2\python.exe`
- cd to the corresponding folder and execute the spark-submit command. The master and a worker have to be set up previously, check 
```cmd
spark-submit ^
    --master="spark://localhost:7077" ^
    test_spark_sql.py ^
        --input_green=data/pq/green/2021/*/ ^
        --input_yellow=data/pq/yellow/2021/*/ ^
        --output=data/report-2021
```

In [13]:
spark.stop()