## Data discovery: Some SQL Joins examples
> Download the dataset from [the official TLC Trip Record Data website](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

### This cell only shows how to document code
```python
# Load file
local_file = 'datasets/your-downloaded-from-TLC-taxis-file-here.parquet'

# Show data
spark.read.parquet(local_file).show()
```

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType

### What is master(local N)?
The --master option specifies the master URL for a distributed cluster, or local to run locally with one thread, or local[N] to run locally with N threads.

<b>Source</b>: See Spark [docs here](spark.apache.org/docs/latest). See all [options here](https://spark.apache.org/docs/latest/submitting-applications.html#master-urls)

In [2]:
# Create SparkSession
spark = SparkSession.builder\
             .master("local[*]")\
             .appName("spark-app-version-x")\
             .getOrCreate()

In [3]:
# Read taxi data
local_file = 'datasets/parquet/'
df = spark.read.parquet(local_file)

In [4]:
# DF is like a relation table in memory. Let's see the columns
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [5]:
# Query sample, using Spark SQL
df.createOrReplaceTempView('tbl_raw_yellow_taxis')

In [6]:
# SQL Statement
spark.sql('''
          select *
          from tbl_raw_yellow_taxis
          ''').show(n=5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2023-02-01 00:32:53|  2023-02-01 00:34:34|              2|          0.3|         1|                 N|         142|         163|           2|        4.4|  3.5|    0.5|       0.

### Let's create a real "dimension" table, for our RateCodeID
1. Standard rate
2. JFK
3. Newark
4. Nassau or Westchester 
5. Negotiated fare
6. Group ride

In [7]:
# Add Rate Code IDs
data = [("1", "Standard rate"), ("2", "JFK"), ("3", "Newark"),("4","Nassau or Westchester "),("5","Negotiated fare"), ("6","Group ride")]

In [8]:
# Define schema, to ensure data types
schema = StructType([ \
    StructField("RatecodeID",StringType(),True), \
    StructField("RatecodeName",StringType(),True)
  ])

In [9]:
# Create Dataframe for Rate Codes
df_rate_codes = spark.createDataFrame(data=data,schema=schema)

In [10]:
# Show rates
df_rate_codes.show()

+----------+--------------------+
|RatecodeID|        RatecodeName|
+----------+--------------------+
|         1|       Standard rate|
|         2|                 JFK|
|         3|              Newark|
|         4|Nassau or Westche...|
|         5|     Negotiated fare|
|         6|          Group ride|
+----------+--------------------+



In [11]:
# Inner join example:
df.join(df_rate_codes, df["RatecodeID"] == df_rate_codes["RatecodeID"], "inner").show(n=5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+---------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|RatecodeID|   RatecodeName|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+---------------+
|       2| 2023-02-01 00:34:08|  2023-02-01 01:31:55|              1|        50.52|         5|            

In [12]:
# Inner join example. Using only columns, to avoid duplicating columns from both tables
# - for example, see above how RatecodeID shows twice (one for each table joined)
df.join(df_rate_codes, df["RatecodeID"] == df_rate_codes["RatecodeID"], "left").drop(df_rate_codes["RatecodeID"]).show(n=5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee| RatecodeName|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------+
|       1| 2023-02-01 00:32:53|  2023-02-01 00:34:34|              2|          0.3|         1|                 N|         142|         163|      

In [14]:
# Left join example, showing those WITH NO rate code:
df.join(df_rate_codes, df["RatecodeID"] == df_rate_codes["RatecodeID"], "left").where(df["RatecodeID"].isNull()).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|RatecodeID|RatecodeName|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+------------+
|       2| 2023-02-01 00:18:00|  2023-02-01 00:25:00|           NULL|         1.16|      NULL|              NULL|  

In [18]:
# SQL Statement
spark.sql('''
          select RatecodeID, count(1)
          from tbl_raw_yellow_taxis
          group by RatecodeID
          order by RatecodeID
          ''').show(n=5)

+----------+--------+
|RatecodeID|count(1)|
+----------+--------+
|      NULL|  255126|
|         1| 8849004|
|         2|  361464|
|         3|   30360|
|         4|   15839|
+----------+--------+
only showing top 5 rows



In [25]:
# If user doesn't want to see NULL values
df_na_rate_codes = df.na.fill(value=0,subset=["RatecodeID"])
df_na_rate_codes.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2023-02-01 00:32:53|  2023-02-01 00:34:34|              2|          0.3|         1|                 N|         142|         163|           2|        4.4|  3.5|    0.5|       0.

In [29]:
# Left join example, showing those WITH NO rate code:
df_na_rate_codes.where(df_na_rate_codes["RatecodeID"].isNull()).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+---------

In [30]:
# Confirm counts
df_na_rate_codes.createOrReplaceTempView('tbl_na_rate_codes')

In [31]:
# SQL Statement
spark.sql('''
          select RatecodeID, count(1)
          from tbl_na_rate_codes
          group by RatecodeID
          ''').show(n=5)

+----------+--------+
|RatecodeID|count(1)|
+----------+--------+
|         0|  255126|
|         6|      14|
|         5|   45262|
|         1| 8849004|
|         3|   30360|
+----------+--------+
only showing top 5 rows



In [32]:
# Stop the session
spark.stop()