In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)


25/03/05 22:18:30 WARN Utils: Your hostname, aguan-pc2 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/03/05 22:18:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/05 22:18:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/05 22:18:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/03/05 22:18:32 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
df_green = spark.read.parquet('data/raw/green/*/*')
df_green.printSchema()

                                                                                

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [4]:
df_yellow = spark.read.parquet('data/raw/yellow/*/*')
df_yellow.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [10]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [11]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [12]:
yellow_cols = set((field.name, field.dataType) for field in df_yellow.schema.fields)
green_cols = set((field.name, field.dataType) for field in df_green.schema.fields)

diff = yellow_cols.symmetric_difference(green_cols)  # Find differences

if diff:
    print("Differences found:", diff)
else:
    print("Schemas are identical.")


Differences found: {('payment_type', DoubleType()), ('ehail_fee', IntegerType()), ('payment_type', LongType()), ('trip_type', DoubleType()), ('airport_fee', IntegerType())}


In [15]:
common_cols = []

yellow_cols = set(df_yellow.columns)

for cols in df_green.columns:
    if cols in yellow_cols:
        common_cols.append(cols)

In [16]:
common_cols

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [17]:
from pyspark.sql import functions as F

In [24]:
df_yellow_servicetype = df_yellow \
    .select(common_cols) \
    .withColumn('service_type', F.lit('yellow'))

In [25]:
df_green_servicetype = df_green \
    .select(common_cols) \
    .withColumn('service_type', F.lit('green'))

In [26]:
df_trips_data = df_green_servicetype.unionAll(df_yellow_servicetype)

In [27]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2802931|
|      yellow|96723120|
+------------+--------+



                                                                                

In [29]:
# lets convert the df into a table so we can run queries on it

df_trips_data.createOrReplaceTempView('trips_data')

In [33]:
spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY
    service_type
""").show()



+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green| 2802931|
|      yellow|96723120|
+------------+--------+



                                                                                