## Importing Relevant Libraries

In [25]:
import boto3
import pandas as pd
import psycopg2
from io import StringIO
import glob
import os
from dotenv import load_dotenv

## Service Configuration

In [26]:
# AWS S3 Configuration
s3_client = boto3.client('s3')
bucket_name = 'ridewise'
prefix = 'datasets/'

# Load environment variables from .env file
load_dotenv()

# PostgreSQL Configuration
db_params = {
    "host": os.getenv("DB_HOST"),
    "port": os.getenv("DB_PORT"),
    "dbname": os.getenv("DB_NAME"),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "sslmode": os.getenv("DB_SSLMODE")
}


## Push Datasets to S3

In [4]:
# Path to your local datasets folder
local_folder = 'Datasets'

# Upload each CSV file in the folder
csv_files = glob.glob(os.path.join(local_folder, '*.csv'))

for file_path in csv_files:
    file_name = os.path.basename(file_path)
    s3_key = f"{prefix}{file_name}"
    
    s3_client.upload_file(file_path, bucket_name, s3_key)
    print(f"✅ Uploaded: {file_name} to s3://{bucket_name}/{s3_key}")

✅ Uploaded: drivers.csv to s3://ridewise/datasets/drivers.csv
✅ Uploaded: promotions.csv to s3://ridewise/datasets/promotions.csv
✅ Uploaded: riders.csv to s3://ridewise/datasets/riders.csv
✅ Uploaded: sessions.csv to s3://ridewise/datasets/sessions.csv
✅ Uploaded: trips.csv to s3://ridewise/datasets/trips.csv


## Create Tables in Postgresql and Load the Datasets into them

In [14]:
# --- Connect to PostgreSQL ---
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()

# --- Create table if it doesn't exist ---
def create_table_if_not_exists(df, table_name):
    columns = []
    for col, dtype in zip(df.columns, df.dtypes):
        col = col.strip().lower().replace(" ", "_")
        if 'int' in str(dtype):
            pg_type = 'INTEGER'
        elif 'float' in str(dtype):
            pg_type = 'NUMERIC'
        elif 'bool' in str(dtype):
            pg_type = 'BOOLEAN'
        else:
            pg_type = 'TEXT'
        columns.append(f'"{col}" {pg_type}')
    create_sql = f'CREATE TABLE IF NOT EXISTS "{table_name}" ({", ".join(columns)});'
    cursor.execute(create_sql)
    conn.commit()

# --- Load CSV into PostgreSQL ---
def load_csv_to_postgres(file_obj, table_name):
    try:
        df = pd.read_csv(file_obj)

        # Ensure the table exists with proper schema
        create_table_if_not_exists(df, table_name)

        # Write to buffer
        csv_buffer = StringIO()
        df.to_csv(csv_buffer, index=False, header=False)
        csv_buffer.seek(0)

        # Load into PostgreSQL
        copy_sql = f'COPY "{table_name}" FROM STDIN WITH CSV DELIMITER \',\' NULL \'\''
        cursor.copy_expert(sql=copy_sql, file=csv_buffer)
        conn.commit()
        print(f"✅ Loaded data into table: {table_name}")
    except Exception as e:
        print(f"❌ Error loading {table_name}: {e}")
        conn.rollback()

# --- Process CSV files in S3 ---
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
for obj in response.get('Contents', []):
    file_key = obj['Key']
    if file_key.endswith('.csv'):
        table_name = file_key.split('/')[-1].replace('.csv', '').lower()
        print(f"📥 Processing: {file_key} ➜ Table: {table_name}")
        file_obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)['Body']
        load_csv_to_postgres(file_obj, table_name)

# --- Cleanup ---
cursor.close()
conn.close()
print("🚀 All done!")


📥 Processing: datasets/drivers.csv ➜ Table: drivers
✅ Loaded data into table: drivers
📥 Processing: datasets/promotions.csv ➜ Table: promotions
✅ Loaded data into table: promotions
📥 Processing: datasets/riders.csv ➜ Table: riders
✅ Loaded data into table: riders
📥 Processing: datasets/sessions.csv ➜ Table: sessions
✅ Loaded data into table: sessions
📥 Processing: datasets/trips.csv ➜ Table: trips
✅ Loaded data into table: trips
🚀 All done!
