We will load the input data as dataframe:

In [2]:
import os
from sqlalchemy import create_engine
from tqdm import tqdm
import psycopg2
import json
import pandas as pd
import numpy as np

with open('../credentials.json') as f:
    data = json.load(f)
    psql_config = {
    'dbname': data['db_name'],
    'user': data['db_user'],
    'password': data['db_pwd'],
    'host': data['db_host'],
    'port': 5432
}


def get_psql_connection():
    try:
        conn = psycopg2.connect(**psql_config)
    except Exception as e:
        print("Error connecting to the database:", e)
    return conn

In [3]:
engine = create_engine(f'postgresql://{psql_config["user"]}:{psql_config["password"]}@{psql_config["host"]}/{psql_config["dbname"]}')

df = pd.read_sql("SELECT * FROM m024.citi_bike_data LIMIT 1000;", engine)

df.head()

Unnamed: 0,tripduration,starttime,stoptime,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bikeid,usertype,birth_year,gender,processed,id
0,468,2018-06-03 09:25:25.025,2018-06-03 09:33:13.241,496,E 16 St & 5 Ave,40.737262,-73.99239,490,8 Ave & W 33 St,40.751551,-73.993934,32381,Subscriber,1970,0,False,8846088
1,1409,2018-06-03 19:02:14.385,2018-06-03 19:25:43.571,494,W 26 St & 8 Ave,40.747348,-73.997236,377,6 Ave & Canal St,40.722438,-74.005664,28065,Customer,1969,0,False,8834652
2,1201,2018-06-21 05:13:47.122,2018-06-21 05:33:48.125,487,E 20 St & FDR Drive,40.733143,-73.975739,3163,Central Park West & W 68 St,40.773407,-73.977825,31059,Subscriber,1989,1,False,8806410
3,755,2018-06-02 11:39:15.912,2018-06-02 11:51:51.167,497,E 17 St & Broadway,40.73705,-73.990093,303,Mercer St & Spring St,40.723627,-73.999496,19635,Subscriber,1985,2,False,8852254
4,230,2018-06-20 20:58:08.255,2018-06-20 21:01:59.219,494,W 26 St & 8 Ave,40.747348,-73.997236,3258,W 27 St & 10 Ave,40.750182,-74.002184,18669,Subscriber,1993,2,False,8838835


Now that we have the data, we will start processing it in bunch.
For each chunk, we will apply some validation and transform the source data into fact and dimension data

In [5]:
from math import radians, cos, sin, sqrt, atan2
from psycopg2.extras import execute_values
BATCH_SIZE = 100000

# Function to calculate distance using Haversine formula
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in km
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat, dlon = lat2 - lat1, lon2 - lon1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    return R * 2 * atan2(sqrt(a), sqrt(1 - a))

# Extract Data
def extract_data(last_id):
    query = f"""
    SELECT * FROM m024.citi_bike_data 
    WHERE processed = FALSE AND id > {last_id}
    ORDER BY id
    LIMIT {BATCH_SIZE}
    """
    return pd.read_sql(query, engine)

# Load data into dimension tables
def load_dimension_data(df, table_name, cols, db_cols):
    df = df[cols].astype('str').drop_duplicates()
    try:
        with get_psql_connection() as conn:
            with conn.cursor() as cur:
                statement = f"""
                INSERT INTO m024.p_{table_name} ({', '.join(db_cols)})
                VALUES %s 
                ON CONFLICT DO NOTHING;"""
                data_tuples = [tuple(row) for row in df.to_numpy()]
                    
                # Execute batch insert
                execute_values(cur, statement, data_tuples)
                
                # Commit changes
                conn.commit() 
                print(f"Data inserted/updated successfully in {table_name}")
    except Exception as e:
        print(f"Error in inserting/updating data for table {table_name}- {e}")

# Load dimension tables and return mapping IDs
def get_dimension_id(df, table_name, lookup_col, db_lookup_col,db_return_col):
    lookup_values = df[lookup_col].drop_duplicates().tolist()
    query = f"SELECT {db_lookup_col}, {db_return_col} FROM m024.p_{table_name} WHERE {db_lookup_col} IN %s"
    mapping = pd.read_sql(query, engine, params=(tuple(lookup_values),))
    return dict(zip(mapping[db_lookup_col], mapping[db_return_col]))

def get_time_dimension_id(df, table_name, lookup_col, db_lookup_cols, db_return_col):
    lookup_values = df[lookup_col].drop_duplicates().tolist()

    # Build query based on date components to avoid precision issues with timestamps
    query = f"""
    SELECT {', '.join(db_lookup_cols)}, {db_return_col}
    FROM m024.p_{table_name}
    WHERE ({', '.join(db_lookup_cols)}) IN %s
    """
    lookup_tuples = [
        (row['year'], row['month'], row['day'], row['hour']) for _, row in df.iterrows()
    ]
    
    mapping = pd.read_sql(query, engine, params=(tuple(lookup_tuples),))
    return dict(zip(mapping[db_lookup_cols], mapping[db_return_col]))


In [11]:
# Create and update the gender dimension
gender_map = {1:"Male",2:"Female"}
def initUpdateGenderDimension():
    try:
        with get_psql_connection() as conn:
            with conn.cursor() as cur:
                select_st = f"""
                SELECT DISTINCT gender from m024.citi_bike_data where processed = false;
                """
                df = pd.read_sql(select_st, engine)
                print(df)
                df.gender = df.gender.map(gender_map).fillna("Unknown")
                statement = f"""
                INSERT INTO m024.p_gender_dimension (gender_type)
                VALUES %s 
                ON CONFLICT DO NOTHING;"""
                data_tuples = [tuple(row) for row in df.to_numpy()]
                    
                # Execute batch insert
                execute_values(cur, statement, data_tuples)
                
                # Commit changes
                conn.commit() 
                print(f"Data inserted/updated successfully in p_gender_dimension")
    except Exception as e:
        print(f"Error in inserting/updating data for table p_gender_dimension- {e}")
        conn.rollback()

initUpdateGenderDimension()

   gender
0       0
1       1
2       2
Data inserted/updated successfully in p_gender_dimension


In [12]:
# Create and update the user type dimension
user_type_map = {'Subscriber', 'Customer'}
def initUpdateUserTypeDimension():
    try:
        with get_psql_connection() as conn:
            with conn.cursor() as cur:
                select_st = f"""
                SELECT DISTINCT usertype from m024.citi_bike_data where processed = false;
                """
                df = pd.read_sql(select_st, engine)
                print(df)
                df.usertype = df.usertype.apply(lambda x: x if x in user_type_map else 'Unknown')
                statement = f"""
                INSERT INTO m024.p_user_type_dimension (user_type)
                VALUES %s 
                ON CONFLICT DO NOTHING;"""
                data_tuples = [tuple(row) for row in df.to_numpy()]
                    
                # Execute batch insert
                execute_values(cur, statement, data_tuples)
                
                # Commit changes
                conn.commit() 
                print(f"Data inserted/updated successfully in p_user_type_dimension")
    except Exception as e:
        print(f"Error in inserting/updating data for table p_user_type_dimension- {e}")

initUpdateUserTypeDimension()

     usertype
0    Customer
1  Subscriber
Data inserted/updated successfully in p_user_type_dimension


In [None]:
# Create and update the user birth year dimension
def initUpdateBirthYearDimension():
    try:
        with get_psql_connection() as conn:
            with conn.cursor() as cur:
                select_st = f"""
                SELECT DISTINCT birth_year from m024.citi_bike_data where processed = false;
                """
                df = pd.read_sql(select_st, engine)
                print(df)
                df.birth_year = df.birth_year.apply(lambda x: x if x > 1940 and x <2013 else 0).astype("str").drop_duplicates() # Assuming you need to be atleast 5 to ride the bike
                statement = f"""
                INSERT INTO m024.p_user_birthyear_dimension (user_birthyear)
                VALUES %s 
                ON CONFLICT DO NOTHING;"""
                data_tuples = [tuple(row) for row in df.to_numpy()]
                    
                # Execute batch insert
                execute_values(cur, statement, data_tuples)
                
                # Commit changes
                conn.commit() 
                print(f"Data inserted/updated successfully in p_user_birthyear_dimension")
    except Exception as e:
        print(f"Error in inserting/updating data for table p_user_birthyear_dimension- {e}")

initUpdateBirthYearDimension()

     birth_year
0          1885
1          1886
2          1887
3          1888
4          1889
..          ...
100        1998
101        1999
102        2000
103        2001
104        2002

[105 rows x 1 columns]
Data inserted/updated successfully in p_user_birthyear_dimension


In [16]:
# Create and update the bikeid dimension
def initUpdateBikeIDDimension():
    try:
        with get_psql_connection() as conn:
            with conn.cursor() as cur:
                select_st = f"""
                SELECT DISTINCT bikeid from m024.citi_bike_data where processed = false;
                """
                df = pd.read_sql(select_st, engine)
                print(df)
                df.bikeid = df.bikeid.astype("str").drop_duplicates()
                statement = f"""
                INSERT INTO m024.p_bike_dimension (bike_id)
                VALUES %s 
                ON CONFLICT DO NOTHING;"""
                data_tuples = [tuple(row) for row in df.to_numpy()]
                    
                # Execute batch insert
                execute_values(cur, statement, data_tuples)
                
                # Commit changes
                conn.commit() 
                print(f"Data inserted/updated successfully in p_bike_dimension")
    except Exception as e:
        print(f"Error in inserting/updating data for table p_bike_dimension- {e}")
    
initUpdateBikeIDDimension()

       bikeid
0       14529
1       14530
2       14532
3       14533
4       14534
...       ...
13091   33727
13092   33728
13093   33729
13094   33730
13095   33733

[13096 rows x 1 columns]
Data inserted/updated successfully in p_bike_dimension


In [None]:
# Create and update the station dimension
start_station_id, start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude
def initUpdateStationDimension():
    try:
        with get_psql_connection() as conn:
            with conn.cursor() as cur:
                select_st = f"""
                SELECT start_station_id, start_station_name, start_station_latitude, start_station_longitude,
                end_station_id, end_station_name, end_station_latitude, end_station_longitude 
                from m024.citi_bike_data where processed = false;
                """
                df = pd.read_sql(select_st, engine)
                print(df)
                df.bikeid = df.bikeid.astype("str").drop_duplicates()
                statement = f"""
                INSERT INTO m024.p_station_dimension (bike_id)
                VALUES %s 
                ON CONFLICT DO NOTHING;"""
                data_tuples = [tuple(row) for row in df.to_numpy()]
                    
                # Execute batch insert
                execute_values(cur, statement, data_tuples)
                
                # Commit changes
                conn.commit() 
                print(f"Data inserted/updated successfully in p_bike_dimension")
    except Exception as e:
        print(f"Error in inserting/updating data for table p_bike_dimension- {e}")
    
initUpdateStationDimension()

In [17]:
# Transform Data
def transform_data(df):

    # transform gender and load the dimension
    df.gender = df.gender.map(dict(zip([1, 2],['Male','Female']))).fillna('Unknown')
    #load_dimension_data(df, 'gender_dimension', ['gender'],['gender_type'])

    # transform user type and load the dimension
    df.usertype = df.usertype.apply(lambda x: x if x in ['Subscriber', 'Customer'] else 'Unknown')
    #load_dimension_data(df, 'user_type_dimension', ['usertype'],['user_type'])

    # transform birth year and load the dimension
    df.birth_year = df.birth_year.apply(lambda x: x if x > 1940 and x <2013 else 0).astype(int) # Assuming you need to be atleast 5 to ride the bike
    #load_dimension_data(df, 'user_birthyear_dimension', ['birth_year'],['user_birthyear'])

    # Clean station names
    df['start_station_name'] = df['start_station_name'].str.strip().fillna('Unknown')
    df['end_station_name'] = df['end_station_name'].str.strip().fillna('Unknown')

    # Validate Latitude and Longitude
    df['start_station_latitude'] = df['start_station_latitude'].apply(
        lambda x: x if -90 <= x <= 90 else None
    )
    df['start_station_longitude'] = df['start_station_longitude'].apply(
        lambda x: x if -180 <= x <= 180 else None
    )
    df['end_station_latitude'] = df['end_station_latitude'].apply(
        lambda x: x if -90 <= x <= 90 else None
    )
    df['end_station_longitude'] = df['end_station_longitude'].apply(
        lambda x: x if -180 <= x <= 180 else None
    )

    # For missing latitude/longitude values
    df['start_station_latitude'] = df['start_station_latitude'].fillna('Unknown')
    df['start_station_longitude'] = df['start_station_longitude'].fillna('Unknown')
    df['end_station_latitude'] = df['end_station_latitude'].fillna('Unknown')
    df['end_station_longitude'] = df['end_station_longitude'].fillna('Unknown')
    load_dimension_data(df, 'station_dimension',
                         ['start_station_id', 'start_station_name', 'start_station_latitude', 'start_station_longitude'],
                           ['station_key', 'station_name', 'latitude', 'longitude'])
    load_dimension_data(df, 'station_dimension',
                         ['end_station_id', 'end_station_name', 'end_station_latitude', 'end_station_longitude'],
                           ['station_key', 'station_name', 'latitude', 'longitude'])
    
    # Fill missing values
    df['starttime'] = df['starttime'].fillna('1.1.1900')
    df['stoptime'] = df['stoptime'].fillna('1.1.1900')

    # Convert to datetime format
    df['starttime_dt'] = pd.to_datetime(df['starttime'])
    df['stoptime_dt'] = pd.to_datetime(df['stoptime'])

    # Extract fields for start time
    df['start_date'] = df['starttime_dt'].dt.date
    df['start_year'] = df['starttime_dt'].dt.year
    df['start_month'] = df['starttime_dt'].dt.month
    df['start_day'] = df['starttime_dt'].dt.day
    df['start_hour'] = df['starttime_dt'].dt.hour

    # Extract fields for stop time
    df['stop_date'] = df['stoptime_dt'].dt.date
    df['stop_year'] = df['stoptime_dt'].dt.year
    df['stop_month'] = df['stoptime_dt'].dt.month
    df['stop_day'] = df['stoptime_dt'].dt.day
    df['stop_hour'] = df['stoptime_dt'].dt.hour

    load_dimension_data(df, 'time_dimension',
                         ['starttime', 'start_date', 'start_year', 'start_month', 'start_day', 'start_hour'],
                         ['time', 'date', 'year', 'month', 'day', 'hour'])
    
    load_dimension_data(df, 'time_dimension',
                         ['stoptime', 'stop_date', 'stop_year', 'stop_month', 'stop_day', 'stop_hour'],
                         ['time', 'date', 'year', 'month', 'day', 'hour'])
    
    #load_dimension_data(df, 'bike_dimension', ['bikeid'], ['bike_id'])

    df['starttime_dt'] = pd.to_datetime(df['starttime'])
    df['stoptime_dt'] = pd.to_datetime(df['stoptime'])

    # Extract fields for start time
    df['start_date'] = df['starttime_dt'].dt.date
    df['start_year'] = df['starttime_dt'].dt.year
    df['start_month'] = df['starttime_dt'].dt.month
    df['start_day'] = df['starttime_dt'].dt.day
    df['start_hour'] = df['starttime_dt'].dt.hour

    # Extract fields for stop time
    df['stop_date'] = df['stoptime_dt'].dt.date
    df['stop_year'] = df['stoptime_dt'].dt.year
    df['stop_month'] = df['stoptime_dt'].dt.month
    df['stop_day'] = df['stoptime_dt'].dt.day
    df['stop_hour'] = df['stoptime_dt'].dt.hour

    load_dimension_data(df, 'time_dimension',
                         ['starttime', 'start_date', 'start_year', 'start_month', 'start_day', 'start_hour'],
                         ['time', 'date', 'year', 'month', 'day', 'hour'])

    load_dimension_data(df, 'time_dimension',
                         ['stoptime', 'stop_date', 'stop_year', 'stop_month', 'stop_day', 'stop_hour'],
                         ['time', 'date', 'year', 'month', 'day', 'hour'])

    # Calculate trip distance
    df['distance'] = df.apply(lambda row: haversine(
        row['start_station_latitude'], row['start_station_longitude'], 
        row['end_station_latitude'], row['end_station_longitude']
    ), axis=1)

    # Get dimension table mappings
    station_map = get_dimension_id(df, 'station_dimension', 'start_station_id', 'station_key','station_id')
    end_station_map = get_dimension_id(df, 'station_dimension', 'end_station_id', 'station_key','station_id')
    start_time_map = get_dimension_id(df, 'time_dimension', 'starttime', 'time', 'time_id')
    stop_time_map = get_dimension_id(df, 'time_dimension', 'stoptime', 'time', 'time_id')
    user_type_map = get_dimension_id(df, 'user_type_dimension', 'usertype', 'user_type','user_type_id')
    gender_map = get_dimension_id(df, 'gender_dimension', 'gender', 'gender_type','gender_id')
    birth_year_map = get_dimension_id(df, 'user_birthyear_dimension', 'birth_year', 'user_birthyear', 'user_birthyear_id')

    # Map dimension table IDs
    df['start_time_id'] = df['starttime'].map(start_time_map)
    df['end_time_id'] = df['stoptime'].map(stop_time_map)
    df['start_station_id'] = df['start_station_id'].map(station_map)
    df['end_station_id'] = df['end_station_id'].map(end_station_map)
    df['bike_id'] = df['bikeid']
    df['user_type_id'] = df['usertype'].map(user_type_map)
    df['gender_type_id'] = df['gender'].map(gender_map)
    df['user_birthyear_id'] = df['birth_year'].map(birth_year_map)
    df['duration'] = df['tripduration']

    return df[['id','duration', 'distance', 'start_time_id', 'end_time_id', 'start_station_id', 'end_station_id', 'bike_id', 'user_type_id', 'gender_type_id', 'user_birthyear_id']]

# Update processed records
def update_processed(ids):
    if not ids:
        return
    query = f"""
    UPDATE m024.citi_bike_data
    SET processed = TRUE
    WHERE id IN (SELECT UNNEST(%s));
    """
    
    try:
        with get_psql_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(query, (ids,))
                conn.commit()
                print(f"Successfully updated the processed flag for {BATCH_SIZE} rows).")
    except Exception as e:
        print(f"Error in updating processed flag: {e}")

# Process batch
def process_batch(last_id):
    df = extract_data(last_id)
    if df.empty:
        print("No more records to process.")
        return None
    
    df_transformed = transform_data(df)
    #Extract the ids 
    processed_ids = df_transformed['id'].tolist()

    # Drop the ID column before loading fact table
    df_transformed = df_transformed.drop(columns=['id'])

    load_dimension_data(df_transformed, 'trip_fact', df_transformed.columns, df_transformed.columns)
    
    # Update processed flag for processed IDs
    update_processed(processed_ids)
    
    # Get the max ID from the batch
    max_id = max(processed_ids)
    print(f"Processed batch up to ID: {max_id}")
    return max_id


last_id = 0
limit_record_id = 9999
while last_id < limit_record_id:
    last_id = process_batch(last_id)


Data inserted/updated successfully in station_dimension
Data inserted/updated successfully in station_dimension
Data inserted/updated successfully in time_dimension
Data inserted/updated successfully in time_dimension
Data inserted/updated successfully in time_dimension
Data inserted/updated successfully in time_dimension
Data inserted/updated successfully in trip_fact
Successfully updated the processed flag for 100000 rows).
Processed batch up to ID: 100000
