# Spark Tutorial for Coralian

> This tutorial will cover commands to manipulate PySpark DataFrame and SparkSQL

## Use `findspark` to make PySpark is importable

In [1]:
import findspark

findspark.init('/home/artthanan/spark-3.0.1-bin-hadoop2.7')

## Start SparkSession

In [2]:
from pyspark.sql import SparkSession

app_name = "SparkApplication"

spark_con = SparkSession.builder.appName(app_name).getOrCreate()

## Read CSV File

In [3]:
csv_file_path = './dataset/yellow_tripdata/'

df = spark_con.read.csv(csv_file_path, header=True)

## Show columns and schema in DataFrame

In [4]:
df.columns

['vendor_id',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'pickup_longtitude',
 'pickup_latitude',
 'pu_location_id',
 'rate_code_id',
 'store_and_fwd_flag',
 'dropoff_longtitude',
 'dropoff_latitude',
 'do_location_id',
 'payment_type',
 'fare_amount',
 'extra',
 'improvement_surcharge',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'consgestion_surcharge']

In [5]:
df.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longtitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- pu_location_id: string (nullable = true)
 |-- rate_code_id: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longtitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- do_location_id: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- consgestion_surcharge: string (nulla

## Show DataFrame

In [6]:
df.show()

+---------+-------------------+-------------------+---------------+-------------+-----------------+---------------+--------------+------------+------------------+------------------+----------------+--------------+------------+-----------+-----+---------------------+-------+----------+------------+------------+---------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_longtitude|pickup_latitude|pu_location_id|rate_code_id|store_and_fwd_flag|dropoff_longtitude|dropoff_latitude|do_location_id|payment_type|fare_amount|extra|improvement_surcharge|mta_tax|tip_amount|tolls_amount|total_amount|consgestion_surcharge|
+---------+-------------------+-------------------+---------------+-------------+-----------------+---------------+--------------+------------+------------------+------------------+----------------+--------------+------------+-----------+-----+---------------------+-------+----------+------------+------------+---------------------

In [7]:
df.head(5)

[Row(vendor_id='4', pickup_datetime='2018-07-30 14:24:04', dropoff_datetime='2018-07-30 14:44:31', passenger_count='1', trip_distance='6.31', pickup_longtitude=None, pickup_latitude=None, pu_location_id='87', rate_code_id='1', store_and_fwd_flag='false', dropoff_longtitude=None, dropoff_latitude=None, do_location_id='237', payment_type='1', fare_amount='22', extra=None, improvement_surcharge='0.3', mta_tax='0.5', tip_amount='4.56', tolls_amount='0', total_amount='27.36', consgestion_surcharge=None),
 Row(vendor_id='4', pickup_datetime='2018-10-07 18:26:38', dropoff_datetime='2018-10-07 19:00:11', passenger_count='1', trip_distance='4.69', pickup_longtitude=None, pickup_latitude=None, pu_location_id='88', rate_code_id='1', store_and_fwd_flag='false', dropoff_longtitude=None, dropoff_latitude=None, do_location_id='50', payment_type='1', fare_amount='23.5', extra=None, improvement_surcharge='0.3', mta_tax='0.5', tip_amount='4', tolls_amount='0', total_amount='28.3', consgestion_surcharge=

## Summary DataFrame

In [8]:
df.summary().show()

+-------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-----+---------------------+-------------------+------------------+------------------+------------------+---------------------+
|summary|         vendor_id|    pickup_datetime|   dropoff_datetime|   passenger_count|     trip_distance| pickup_longtitude|  pickup_latitude|    pu_location_id|     rate_code_id|store_and_fwd_flag|dropoff_longtitude|  dropoff_latitude|    do_location_id|      payment_type|       fare_amount|extra|improvement_surcharge|            mta_tax|        tip_amount|      tolls_amount|      total_amount|consgestion_surcharge|
+-------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+-------------

In [9]:
df.count()

10110213

## DataFrame Manipulation Command: `SELECT` and `WHERE`

In [10]:
temp_df = df.select(["vendor_id", "pickup_datetime", "dropoff_datetime", "pu_location_id", "do_location_id", "passenger_count", "trip_distance", "total_amount"])

In [11]:
temp_df.show()

+---------+-------------------+-------------------+--------------+--------------+---------------+-------------+------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|pu_location_id|do_location_id|passenger_count|trip_distance|total_amount|
+---------+-------------------+-------------------+--------------+--------------+---------------+-------------+------------+
|        4|2018-07-30 14:24:04|2018-07-30 14:44:31|            87|           237|              1|         6.31|       27.36|
|        4|2018-10-07 18:26:38|2018-10-07 19:00:11|            88|            50|              1|         4.69|        28.3|
|        4|2018-11-17 10:24:50|2018-11-17 11:02:06|           138|           211|              1|         10.7|       50.47|
|        4|2018-10-25 23:24:00|2018-10-25 23:48:18|           138|           261|              1|        15.74|       59.47|
|        4|2018-09-09 01:34:52|2018-09-09 01:58:59|            79|           223|              1|         9.95|       38.16|


In [12]:
df.select(["vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance", "total_amount"])\
.where((df['pickup_datetime'] >= '2018-10-01') & (df['passenger_count'] > 2)).show()

+---------+-------------------+-------------------+---------------+-------------+------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|total_amount|
+---------+-------------------+-------------------+---------------+-------------+------------+
|        4|2018-11-10 17:48:49|2018-11-10 18:23:19|              4|         3.41|       24.52|
|        4|2018-10-09 05:22:49|2018-10-09 05:24:50|              3|         0.18|         4.3|
|        4|2018-11-17 14:07:48|2018-11-17 14:33:29|              3|         8.01|       31.56|
|        1|2018-10-04 07:28:23|2018-10-04 08:06:10|              3|          7.4|       33.88|
|        1|2018-11-14 12:58:12|2018-11-14 13:41:06|              3|         12.5|          53|
|        1|2018-12-19 18:18:36|2018-12-19 18:48:10|              3|          6.5|       30.96|
|        1|2018-10-06 12:49:10|2018-10-06 13:52:59|              4|         33.8|      111.35|
|        1|2018-11-06 15:34:24|2018-11-06 15:34:57

## DataFrame Manipulation Command: Manipulate new column

In [13]:
new_df = df.withColumn('new_do_location_id', df['do_location_id'].substr(1, 1))

In [14]:
new_df.select(['do_location_id', 'new_do_location_id']).show()

+--------------+------------------+
|do_location_id|new_do_location_id|
+--------------+------------------+
|           237|                 2|
|            50|                 5|
|           211|                 2|
|           261|                 2|
|           223|                 2|
|           140|                 1|
|            79|                 7|
|            68|                 6|
|           181|                 1|
|           229|                 2|
|           200|                 2|
|            43|                 4|
|           161|                 1|
|           140|                 1|
|           170|                 1|
|           140|                 1|
|           158|                 1|
|           149|                 1|
|            80|                 8|
|           238|                 2|
+--------------+------------------+
only showing top 20 rows



## DataFrame Manipulation Command: `GROUP BY` and Aggregation Functions

In [15]:
from pyspark.sql.functions import mean, min, max, count, sum

df.groupBy("vendor_id", "passenger_count")\
.agg(max("trip_distance").alias("max_trip_distance"), \
     min("trip_distance").alias("min_trip_distance"), \
     mean("trip_distance").alias("avg_trip_distance"), \
     sum("total_amount").alias("total_amount"), \
     mean("total_amount").alias("avg_total_amount")).show()

+---------+---------------+-----------------+-----------------+------------------+--------------------+------------------+
|vendor_id|passenger_count|max_trip_distance|min_trip_distance| avg_trip_distance|        total_amount|  avg_total_amount|
+---------+---------------+-----------------+-----------------+------------------+--------------------+------------------+
|      DDS|              0|              5.7|              0.9|3.3000000000000003|                25.1|             12.55|
|        4|              1|             9.99|                0|  2.76998160073597|   52966.60000000008| 16.24244096902793|
|        4|              2|             6.14|             0.38| 2.701836734693878|              774.04| 15.79673469387755|
|      VTS|              5|             9.99|                0|  2.91708262452701|   6309126.489998391|12.862748095801782|
|        1|              4|              9.9|                0| 3.451417493686197|    431124.310000006|17.874883287035367|
|      VTS|     

## DataFrame Manipulation Command: `ORDER BY`

In [16]:
df.select(["vendor_id", "trip_distance"]).orderBy('vendor_id', ascending=True).show()

+---------+-------------+
|vendor_id|trip_distance|
+---------+-------------+
|        1|         11.7|
|        1|          6.7|
|        1|         10.3|
|        1|          4.3|
|        1|          6.7|
|        1|          9.1|
|        1|          8.5|
|        1|          9.3|
|        1|         11.2|
|        1|          9.6|
|        1|          6.9|
|        1|         12.7|
|        1|          9.3|
|        1|          7.6|
|        1|          7.3|
|        1|          6.3|
|        1|          6.8|
|        1|          5.6|
|        1|          5.8|
|        1|          7.2|
+---------+-------------+
only showing top 20 rows



## DataFrame Manipulation Command: JOIN DataFrame

In [17]:
fhv_bases_path = './dataset/fhv_bases.csv'

fhv_bases_df = spark_con.read.csv(fhv_bases_path, header=True)

In [18]:
fhv_bases_df.show()

+-----------+--------------------+--------------------+------------+
|base_number|           base_name|                 dba|dba_category|
+-----------+--------------------+--------------------+------------+
|     B02800|FLATIRON TRANSIT LLC|VIA-FLATIRON TRAN...|         via|
|     B03136|GREENPOINT TRANSI...|VIA-GREENPOINT TR...|         via|
|     B02907|        SABO ONE LLC|                JUNO|        juno|
|     B02908|        SABO TWO LLC|                JUNO|        juno|
|     B03035|           OMAHA LLC|                JUNO|        juno|
|     B02914|     VULCAN CARS LLC|                JUNO|        juno|
|     B02510|        TRI-CITY,LLC|       LYFT-TRI-CITY|        lyft|
|     B02844|ENDOR CAR & DRIVE...|LYFT-ENDOR CAR & ...|        lyft|
|     B02765|            GRUN LLC|       UBER-GRUN LLC|        uber|
|     B02512|           UNTER LLC|      UBER-UNTER LLC|        uber|
|     B02395|          ABATAR LLC|     UBER-ABATAR LLC|        uber|
|     B02878|          ELF-NY,LLC|

In [19]:
fhv_tripdata_path = './dataset/fhv_tripdata/'

fhv_tripdata_df = spark_con.read.csv(fhv_tripdata_path, header=True)

In [20]:
fhv_tripdata_df.show()

+--------------------+-------------------+-------------------+--------------+--------------+-------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|pu_location_id|do_location_id|sr_flag|
+--------------------+-------------------+-------------------+--------------+--------------+-------+
|              B00429|2019-04-02 12:37:00|2019-04-02 12:44:00|           264|           264|   null|
|              B02800|2019-01-16 20:50:00|2019-01-16 21:09:52|            48|           262|   null|
|              B02060|2019-02-05 22:06:46|2019-02-05 22:27:32|           138|           162|   null|
|              B01065|2019-07-01 16:20:54|2019-07-01 16:27:48|           264|           265|   null|
|              B02877|2019-01-30 09:39:08|2019-01-30 09:52:39|            78|           174|   null|
|              B03055|2019-12-08 07:36:52|2019-12-08 07:59:32|           264|            17|   null|
|              B00887|2019-06-09 12:00:48|2019-06-09 12:29:10|           264|           265

In [21]:
t1 = fhv_bases_df.alias('t1')
t2 = fhv_tripdata_df.alias('t2')

In [22]:
# Two DataFrames with Inner Join

inner_join_df = t1.join(t2, t1.base_number == t2.dispatching_base_num)

In [23]:
inner_join_df.select(['dispatching_base_num', 'base_name', 'dba_category', 'pickup_datetime', 'dropoff_datetime']).show()

+--------------------+--------------------+------------+-------------------+-------------------+
|dispatching_base_num|           base_name|dba_category|    pickup_datetime|   dropoff_datetime|
+--------------------+--------------------+------------+-------------------+-------------------+
|              B00429|SURF CAR SYSTEMS INC|       other|2019-04-02 12:37:00|2019-04-02 12:44:00|
|              B02800|FLATIRON TRANSIT LLC|         via|2019-01-16 20:50:00|2019-01-16 21:09:52|
|              B02060|AMERICAN LIMOUSIN...|       other|2019-02-05 22:06:46|2019-02-05 22:27:32|
|              B01065|EL BARRIO'S CAR S...|       other|2019-07-01 16:20:54|2019-07-01 16:27:48|
|              B02877|       ZWOLF-NY, LLC|        uber|2019-01-30 09:39:08|2019-01-30 09:52:39|
|              B03055|EXPLORER EXPRESS INC|       other|2019-12-08 07:36:52|2019-12-08 07:59:32|
|              B00887|DIAL 7 CAR & LIMO...|       other|2019-06-09 12:00:48|2019-06-09 12:29:10|
|              B02879|    FUNF

In [24]:
# Two DataFrames with Left Join, Right Join, and Full Join
# Just add parameter `how` in Join with values ['left', 'right', 'full', 'full_outer']

t1.join(t2, t1.base_number == t2.dispatching_base_num, how="left").show()

+-----------+---------+----+------------+--------------------+-------------------+-------------------+--------------+--------------+-------+
|base_number|base_name| dba|dba_category|dispatching_base_num|    pickup_datetime|   dropoff_datetime|pu_location_id|do_location_id|sr_flag|
+-----------+---------+----+------------+--------------------+-------------------+-------------------+--------------+--------------+-------+
|     B03035|OMAHA LLC|JUNO|        juno|              B03035|2019-01-06 02:49:33|2019-01-06 03:00:21|           264|           264|   null|
|     B03035|OMAHA LLC|JUNO|        juno|              B03035|2019-01-01 02:15:08|2019-01-01 02:36:18|          null|          null|   null|
|     B03035|OMAHA LLC|JUNO|        juno|              B03035|2019-01-08 18:37:52|2019-01-08 19:14:43|           264|           264|   null|
|     B03035|OMAHA LLC|JUNO|        juno|              B03035|2019-01-06 17:47:16|2019-01-06 17:58:41|           264|           264|   null|
|     B03035|

## Use SQL in PySpark

In [25]:
df.createTempView("yellow_tripdata")

In [26]:
sql_stmt = '''
    SELECT
        vendor_id,
        pickup_datetime,
        dropoff_datetime
    FROM yellow_tripdata
    LIMIT 10
'''

result_df = spark_con.sql(sql_stmt)

In [27]:
result_df.show()

+---------+-------------------+-------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|
+---------+-------------------+-------------------+
|        4|2018-07-30 14:24:04|2018-07-30 14:44:31|
|        4|2018-10-07 18:26:38|2018-10-07 19:00:11|
|        4|2018-11-17 10:24:50|2018-11-17 11:02:06|
|        4|2018-10-25 23:24:00|2018-10-25 23:48:18|
|        4|2018-09-09 01:34:52|2018-09-09 01:58:59|
|        4|2018-11-15 07:45:38|2018-11-15 07:48:02|
|        4|2018-10-09 11:56:57|2018-10-09 12:38:42|
|        4|2018-10-17 20:03:19|2018-10-17 20:30:33|
|        4|2018-11-29 22:46:35|2018-11-29 23:17:38|
|        4|2018-10-05 09:15:01|2018-10-05 09:49:16|
+---------+-------------------+-------------------+



In [28]:
fhv_bases_df.createTempView("fhv_bases")

In [29]:
fhv_tripdata_df.createTempView("fhv_tripdata")

In [30]:
sql_stmt_two = '''
    SELECT
        t.dba_category,
        COUNT(*) AS total_count
    FROM (
            SELECT
                ft.dispatching_base_num,
                fb.dba_category
            FROM fhv_tripdata AS ft
            LEFT JOIN fhv_bases AS fb
            ON
                ft.dispatching_base_num = fb.base_number
    ) AS t
    WHERE
        t.dba_category IS NOT NULL
    GROUP BY
        t.dba_category
'''

spark_con.sql(sql_stmt_two).show()

+------------+-----------+
|dba_category|total_count|
+------------+-----------+
|        lyft|    1285023|
|       other|    2018225|
|        juno|     466501|
|        uber|    5633662|
|         via|     439202|
+------------+-----------+



## Write DataFrame into File

In [31]:
result_df.coalesce(1).write.format("csv").option("header", "true").save("./output/query_result")