pip install pyspark
pip install shapely

In [1]:
#import spark functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col#, to_timestamp, lit, concat, udf, expr, round
from pyspark.sql.column import Column#, _to_java_column, _to_seq
from pyspark.sql.types import StringType, StructType, StructField, FloatType, IntegerType, BooleanType, TimestampType, DoubleType
from pyspark import SparkContext
import pandas as pd
# Import the storage module
from google.cloud import storage
import requests

In [2]:
# Create SparkContext
sc = SparkContext.getOrCreate()

# Set the log level to ERROR to suppress INFO messages
sc.setLogLevel("ERROR")

In [3]:
#fix the formating of the shows, so they don't overlap.
def hscroll(activate=True):
  """activate/deactivate horizontal scrolling for wide output cells"""
  from IPython.display import display, HTML
  style = ('pre-wrap','pre')[activate] # select white-space style
  display(HTML("<style>pre {white-space: %s !important}</style>" % style))
hscroll()

In [4]:
def store_and_fwd_flag_filling(df): #cleans store_and_fwd_flag data

    # Replace "0" and "Null" with "False" and "1" with "True" in store_and_fwd_flag column
    df = df.withColumn("store_and_fwd_flag",
                       when((df["store_and_fwd_flag"] == "0") | (df["store_and_fwd_flag"] == "N"), False)
                       .when((df["store_and_fwd_flag"] == "1") | (df["store_and_fwd_flag"] == "Y"), True) 
                       .otherwise(False)
                       .cast("boolean"))
    return df

In [5]:
def month_year(blob):
    # Create a dictionary mapping month names to numbers
    month_mapping = {
        "January": "01",
        "February": "02",
        "March": "03",
        "April": "04",
        "May": "05",
        "June": "06",
        "July": "07",
        "August": "08",
        "September": "09",
        "October": "10",
        "November": "11",
        "December": "12"
    }

    # Extract year and month name from the blob name
    year_month = blob.name.split('/')[2].split('_')
    year = int(year_month[0])
    month_name = year_month[1].split('.')[0]

    # Get the numeric value of the month from the dictionary
    month_number = month_mapping.get(month_name)

    return year, month_number

In [6]:
def transform_schema(df):
    # Convert columns to the desired data types
    df_transformed = df.withColumn("store_and_fwd_flag", col("store_and_fwd_flag").cast(BooleanType())) \
        .withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(TimestampType())) \
        .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(TimestampType())) \
        .withColumn("VendorID", col("VendorID").cast(IntegerType())) \
        .withColumn("PULocationID", col("PULocationID").cast(IntegerType())) \
        .withColumn("DOLocationID", col("DOLocationID").cast(IntegerType())) \
        .withColumn("payment_type", col("payment_type").cast(IntegerType())) \
        .withColumn("passenger_count", col("passenger_count").cast(IntegerType())) \
        .withColumn("RatecodeID", col("RatecodeID").cast(IntegerType())) \
        .withColumn("fare_amount", col("fare_amount").cast(DoubleType())) \
        .withColumn("extra", col("extra").cast(DoubleType())) \
        .withColumn("mta_tax", col("mta_tax").cast(DoubleType())) \
        .withColumn("tip_amount", col("tip_amount").cast(DoubleType())) \
        .withColumn("tolls_amount", col("tolls_amount").cast(DoubleType())) \
        .withColumn("improvement_surcharge", col("improvement_surcharge").cast(DoubleType())) \
        .withColumn("total_amount", col("total_amount").cast(DoubleType())) \
        .withColumn("congestion_surcharge", col("congestion_surcharge").cast(DoubleType())) \
        .withColumn("airport_fee", col("airport_fee").cast(DoubleType()))

    return df_transformed

In [None]:
# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a client object that points to GCS
storage_client = storage.Client()

# Get a list of the 'blobs' (objects or files) in the bucket
blobs = storage_client.list_blobs('my-bigdata-project-cm', prefix="landing/")

#run through the my-bigdata-project-cm bucket.
for blob in blobs:
    file_path = f'gs://my-bigdata-project-cm/{blob.name}'
    
    #exscluding the file title.
    if not blob.name.endswith('.parquet') or blob.name == 'landing/':
        print(f"Skipping file {blob.name}")
        continue
        
    year, month = month_year(blob)#call returns the month and year of the file
    
    #if year <= 2010:# skip 2009 and 2010
    #   print(f"Skipping file {blob.name} from the year {year}")
    #   continue
        
    if year < 2017:#skipping those years i have already done in a previous batch
        print(f"Skipping file {blob.name} since it's already completed")
        continue
    #if (year == 2013) and (month == "04" or month == "08"):
    #   print(f"Skipping file {blob.name} since it's already completed")
    #   continue
    
    try:
        # Read Parquet file from Google Cloud Storage
        df = spark.read.parquet(file_path)
        
        print(f"Processing {blob.name}:")
        # Print the row count pre-cleaning.
        #print("Number of rows pre-cleaning:", df.count())

        # Show the first row of the DataFrame pre-cleaning
        #print(df.show(1))

    except Exception as e:
        print(f"An error occurred on {blob.name}:", str(e))
        continue

    #drop duplicate column
    df = df.dropDuplicates()

    #clean store_and_fwd_flag
    df = store_and_fwd_flag_filling(df)

    #change passenger_count to int
    df = df.withColumn("passenger_count", col("passenger_count").cast("int"))

    #change RatecodeID to int
    df = df.withColumn("RatecodeID", col("RatecodeID").cast("int"))

    #Replace null values in the "congestion_surcharge" and "Airport_fee" columns with 0
    df = df.fillna({'congestion_surcharge': 0.0, 'Airport_fee': 0.0})

    # Drop rows where trip_distance is equal to 0.0
    #df = df.filter(df["trip_distance"] != 0.0) #leaving incase i want to look at cancled trips.

    #drop all null rows.
    df = df.dropna()

    df = transform_schema(df)#enforce schema
    
    # Print the row count post-cleaning.
    #print("Number of rows post-cleaning:", df.count())

    # Show the first row of the DataFrame post-cleaning
    #print(df.show(1))
    
    # Print schema to see data types of all columns
    #print(df.printSchema())
    
    # Save the cleaned dataframe as Parquet
    output_file_path=f"gs://my-bigdata-project-cm/cleaned/yellow_tripdata_{year}-{month}.parquet"
    df.write.parquet(output_file_path)
    print("File successfully processed and uploaded.\n")
print("All files finished processing.")
spark.stop()