In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName('test') \
        .getOrCreate()

In [3]:
df_pd = pd.read_parquet("data/raw/green/2020/01/")

In [4]:
df_pd.head(1000).to_csv("df_green_pd.csv",index = False)

In [5]:
df_pd = pd.read_csv("df_green_pd.csv")

In [7]:
df_pd = pd.read_csv("df_green_pd.csv",parse_dates=['lpep_pickup_datetime', 'lpep_dropoff_datetime'])

In [13]:
df_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 20 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   VendorID               1000 non-null   int64         
 1   lpep_pickup_datetime   1000 non-null   datetime64[ns]
 2   lpep_dropoff_datetime  1000 non-null   datetime64[ns]
 3   store_and_fwd_flag     1000 non-null   object        
 4   RatecodeID             1000 non-null   float64       
 5   PULocationID           1000 non-null   int64         
 6   DOLocationID           1000 non-null   int64         
 7   passenger_count        1000 non-null   float64       
 8   trip_distance          1000 non-null   float64       
 9   fare_amount            1000 non-null   float64       
 10  extra                  1000 non-null   float64       
 11  mta_tax                1000 non-null   float64       
 12  tip_amount             1000 non-null   float64       
 13  toll

In [8]:
spark.createDataFrame(df_pd).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', TimestampType(), True), StructField('lpep_dropoff_datetime', TimestampType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', DoubleType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', DoubleType(), True), StructField('trip_type', DoubleType(), True), StructField('congestion_surcharge', DoubleType()

In [10]:
from pyspark.sql import types
from pyspark.sql.functions import col

In [19]:
year = 2020
color = "yellow"

for month in range(1,13):
    print(f'processing data for {year}/{month}')
    
    input_path = f'data/raw/{color}/{year}/{month:02d}/'
    output_path = f'data/pq/{color}/{year}/{month:02d}/'
          
    df = spark.read \
                .parquet(input_path)
    
    if color == "yellow":
        df = df \
        .withColumn("VendorID", col("VendorID").cast("integer")) \
        .withColumn("pickup_datetime", col("tpep_pickup_datetime").cast("timestamp")) \
        .withColumn("dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp")) \
        .withColumn("passenger_count", col("passenger_count").cast("integer")) \
        .withColumn("trip_distance", col("trip_distance").cast("double")) \
        .withColumn("RatecodeID", col("RatecodeID").cast("integer")) \
        .withColumn("store_and_fwd_flag", col("store_and_fwd_flag").cast("string")) \
        .withColumn("PULocationID", col("PULocationID").cast("integer")) \
        .withColumn("DOLocationID", col("DOLocationID").cast("integer")) \
        .withColumn("payment_type", col("payment_type").cast("integer")) \
        .withColumn("fare_amount", col("fare_amount").cast("double")) \
        .withColumn("extra", col("extra").cast("double")) \
        .withColumn("mta_tax", col("mta_tax").cast("double")) \
        .withColumn("tip_amount", col("tip_amount").cast("double")) \
        .withColumn("tolls_amount", col("tolls_amount").cast("double")) \
        .withColumn("improvement_surcharge", col("improvement_surcharge").cast("double")) \
        .withColumn("total_amount", col("total_amount").cast("double")) \
        .withColumn("congestion_surcharge", col("congestion_surcharge").cast("double")) \
        .withColumn("airport_fee", col("airport_fee").cast("double"))
        
    elif color == "green":
        df = df \
        .withColumn("VendorID", col("VendorID").cast("integer")) \
        .withColumn("pickup_datetime", col("lpep_pickup_datetime").cast("timestamp")) \
        .withColumn("dropoff_datetime", col("lpep_dropoff_datetime").cast("timestamp")) \
        .withColumn("passenger_count", col("passenger_count").cast("integer")) \
        .withColumn("trip_distance", col("trip_distance").cast("double")) \
        .withColumn("RatecodeID", col("RatecodeID").cast("integer")) \
        .withColumn("store_and_fwd_flag", col("store_and_fwd_flag").cast("string")) \
        .withColumn("PULocationID", col("PULocationID").cast("integer")) \
        .withColumn("DOLocationID", col("DOLocationID").cast("integer")) \
        .withColumn("payment_type", col("payment_type").cast("integer")) \
        .withColumn("fare_amount", col("fare_amount").cast("double")) \
        .withColumn("extra", col("extra").cast("double")) \
        .withColumn("mta_tax", col("mta_tax").cast("double")) \
        .withColumn("tip_amount", col("tip_amount").cast("double")) \
        .withColumn("tolls_amount", col("tolls_amount").cast("double")) \
        .withColumn("improvement_surcharge", col("improvement_surcharge").cast("double")) \
        .withColumn("total_amount", col("total_amount").cast("double")) \
        .withColumn("congestion_surcharge", col("congestion_surcharge").cast("double"))
    
    df \
        .repartition(4) \
        .write.parquet(output_path, mode = "overwrite")        

processing data for 2020/1
processing data for 2020/2
processing data for 2020/3
processing data for 2020/4
processing data for 2020/5
processing data for 2020/6
processing data for 2020/7
processing data for 2020/8
processing data for 2020/9
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [20]:
df = spark.read \
    .parquet(f"data/raw/{color}/{year}/*")

In [21]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2020-01-01 00:28:15|  2020-01-01 00:33:03|            1.0|          1.2|       1.0|                 N|         238|         239|           1|        6.0|  3.0|    0.5|      1.4