# Loading initial CSV files
* Since there are ~320 csv files, we are checking if all the csvs have same schema before we load into a single dataframe
* We are setting spark session config for better output format, as discussed here - https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html
* The Dataproc cluster is configured with 5 workers for this so we are setting `spark.sql.shuffle.partitions` to 20 for better usage of resources

In [1]:
%%time

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

spark = (SparkSession.builder
         .appName("DetectSchemaChangesByMonth")
         .config("spark.sql.repl.eagerEval.enabled", True)
         .config("spark.sql.repl.eagerEval.maxNumRows", 10)
         .config("spark.sql.shuffle.partitions", 20)
         .config("spark.sql.sources.partitionColumnTypeInference.enabled", True)
         .getOrCreate())

# Path to CSV files on GCS
gcs_path = "gs://bucket121024/csv"

# List of months listed as per dataset from January 2013 to December 2024
years = range(2013, 2025)
months = range(1, 13)  # Months from 1 to 12
dates = [f"{year}{str(month).zfill(2)}" for year in years for month in months]


schemas = {}

# Function to get the schema as a list of column names and types
def get_schema(df):
    return [f"{field.name}:{field.dataType}" for field in df.schema.fields]

for date in dates:
    file_path = os.path.join(gcs_path, f"{date}*.csv")  # this is a wild card to match all the csv that start with (year)(month)
    try:
        print(f"Processing date: {date}")
        df = spark.read.option("header", "true").csv(file_path)
        schemas[date] = get_schema(df)
    except Exception as e:
        print(f"No data or error processing {date}: {str(e)}")

# Comparing schemas month-over-month to find changes
previous_schema = None
previous_date = None

for date, schema in sorted(schemas.items()):
    if previous_schema and schema != previous_schema:
        print(f"-------> Schema change detected in {date}")
        print(f"Previous Schema ({previous_date}): {previous_schema}")
        print(f"Current Schema ({date}): {schema}")
    previous_schema = schema
    previous_date = date

print("Schema comparison completed.")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/20 22:08:06 INFO SparkEnv: Registering MapOutputTracker
24/12/20 22:08:06 INFO SparkEnv: Registering BlockManagerMaster
24/12/20 22:08:06 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/12/20 22:08:06 INFO SparkEnv: Registering OutputCommitCoordinator


Processing date: 201301
No data or error processing 201301: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201301*.csv.
Processing date: 201302
No data or error processing 201302: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201302*.csv.
Processing date: 201303
No data or error processing 201303: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201303*.csv.
Processing date: 201304
No data or error processing 201304: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201304*.csv.
Processing date: 201305
No data or error processing 201305: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201305*.csv.
Processing date: 201306


                                                                                

Processing date: 201307
Processing date: 201308
Processing date: 201309
Processing date: 201310


                                                                                

Processing date: 201311
Processing date: 201312
Processing date: 201401
Processing date: 201402
Processing date: 201403
Processing date: 201404
Processing date: 201405
Processing date: 201406
Processing date: 201407
Processing date: 201408
Processing date: 201409
Processing date: 201410
Processing date: 201411
Processing date: 201412
Processing date: 201501
Processing date: 201502
Processing date: 201503
Processing date: 201504
Processing date: 201505
Processing date: 201506
Processing date: 201507
Processing date: 201508
Processing date: 201509
Processing date: 201510
Processing date: 201511
Processing date: 201512
Processing date: 201601
Processing date: 201602
Processing date: 201603
Processing date: 201604
Processing date: 201605
Processing date: 201606
Processing date: 201607
Processing date: 201608
Processing date: 201609
Processing date: 201610
Processing date: 201611
Processing date: 201612
Processing date: 201701
Processing date: 201702
Processing date: 201703
Processing date:

In [2]:
target_schema = [
    "bikeid", "birthyear", "endlatitude", "endlongitude", "endstationid", 
    "endstationname", "gender", "membercasual", "rideabletype", 
    "rideabletypeduplicatecolumnname1", "rideid", "startlatitude", 
    "startlongitude", "startstationid", "startstationname", "starttime", 
    "stoptime", "tripduration", "unnamed:0", "usertype"
]

def standardize_schema(file_path, target_schema):
    try:
        df = spark.read.csv(os.path.join(gcs_path, f"{file_path}*.csv"), header=True, inferSchema=True)
        
        column_mapping = {
            "Trip Duration": "tripduration", "tripduration": "tripduration",
            "Start Time": "starttime", "started_at": "starttime",
            "Stop Time": "stoptime", "ended_at": "stoptime",
            "Start Station ID": "startstationid", "start station id": "startstationid", 
            "start_station_id": "startstationid",
            "Start Station Name": "startstationname", "start station name": "startstationname", 
            "start_station_name": "startstationname",
            "Start Station Latitude": "startlatitude", "start station latitude": "startlatitude", 
            "start_lat": "startlatitude",
            "Start Station Longitude": "startlongitude", "start station longitude": "startlongitude", 
            "start_lng": "startlongitude",
            "End Station ID": "endstationid", "end station id": "endstationid", 
            "end_station_id": "endstationid",
            "End Station Name": "endstationname", "end station name": "endstationname", 
            "end_station_name": "endstationname",
            "End Station Latitude": "endlatitude", "end station latitude": "endlatitude", 
            "end_lat": "endlatitude",
            "End Station Longitude": "endlongitude", "end station longitude": "endlongitude", 
            "end_lng": "endlongitude",
            "Bike ID": "bikeid", "bikeid": "bikeid",
            "User Type": "usertype", "usertype": "usertype",
            "Birth Year": "birthyear", "birth year": "birthyear",
            "Gender": "gender", "ride_id": "rideid",
            "rideable_type": "rideabletype", "rideable_type_duplicate_column_name_1": "rideabletypeduplicatecolumnname1",
            "Unnamed: 0": "unnamed:0", "member_casual": "membercasual"
        }

        for original_col, target_col in column_mapping.items():
            if original_col in df.columns:
                df = df.withColumnRenamed(original_col, target_col)
        
        for col in target_schema:
            if col not in df.columns:
                df = df.withColumn(col, F.lit(None))
        
        df = df.select(target_schema)
        return df
    
    except Exception as e:
        print(f"Error processing {file_path}: {str(e)}")
        return None

standardized_dfs = [standardize_schema(file, target_schema) for file in dates]

standardized_dfs = [df for df in standardized_dfs if df is not None]

if standardized_dfs:
    final_df = standardized_dfs[0]
    for df in standardized_dfs[1:]:
        final_df = final_df.unionByName(df)
    final_df.show()
else:
    print("No valid data found.")


Error processing 201301: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201301*.csv.
Error processing 201302: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201302*.csv.
Error processing 201303: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201303*.csv.
Error processing 201304: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201304*.csv.
Error processing 201305: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/201305*.csv.


                                                                                

Error processing 202412: [PATH_NOT_FOUND] Path does not exist: gs://bucket121024/csv/202412*.csv.


24/12/20 22:15:41 WARN DAGScheduler: Broadcasting large task binary with size 1628.1 KiB


+------+---------+-----------------+------------------+------------+--------------------+------+------------+------------+--------------------------------+------+-------------+--------------+--------------+--------------------+-------------------+-------------------+------------+---------+----------+
|bikeid|birthyear|      endlatitude|      endlongitude|endstationid|      endstationname|gender|membercasual|rideabletype|rideabletypeduplicatecolumnname1|rideid|startlatitude|startlongitude|startstationid|    startstationname|          starttime|           stoptime|tripduration|unnamed:0|  usertype|
+------+---------+-----------------+------------------+------------+--------------------+------+------------+------------+--------------------------------+------+-------------+--------------+--------------+--------------------+-------------------+-------------------+------------+---------+----------+
| 19678|     1983|      40.74317449|      -74.00366443|         434|     9 Ave & W 18 St|     

In [3]:
%%time

final_df.count()



CPU times: user 138 ms, sys: 4.45 ms, total: 142 ms
Wall time: 46.1 s


                                                                                

268834357

In [4]:
%%time

if final_df:
    null_counts = final_df.select(
        [F.count(F.when(F.col(c).isNull(), 1)).alias(c) for c in final_df.columns]
    ).collect()[0]

    for col, count in null_counts.asDict().items():
        print(f"{col}: {count}")


24/12/20 22:16:33 WARN DAGScheduler: Broadcasting large task binary with size 1522.9 KiB

bikeid: 153726709
birthyear: 158708540
endlatitude: 200109
endlongitude: 200109
endstationid: 448364
endstationname: 441675
gender: 153726709
membercasual: 115107648
rideabletype: 115107648
rideabletypeduplicatecolumnname1: 264230782
rideid: 115107648
startlatitude: 6
startlongitude: 6
startstationid: 50286
startstationname: 50286
starttime: 0
stoptime: 0
tripduration: 153726709
unnamed:0: 264111461
usertype: 153778489
CPU times: user 456 ms, sys: 129 ms, total: 585 ms
Wall time: 3min 49s


                                                                                

In [5]:
%%time
output_path = "gs://bucket121024/pipeline1/final_output.parquet"
final_df.write.mode("overwrite").parquet(output_path)

24/12/20 22:20:22 WARN DAGScheduler: Broadcasting large task binary with size 1724.3 KiB
                                                                                

CPU times: user 618 ms, sys: 170 ms, total: 788 ms
Wall time: 5min 51s
