In [30]:
import datetime
import pandas as pd
from smart_open import open
import pymysql.cursors
# import rds_config
import s3_file_operations as s3_ops

We save our RDS credentials via the following variables. please note this is not a safe way to access credentials, its advisable to use environmental variables instead

In [32]:
# RDS configurations
rds_host = "***********"  # RDS endpoint
rds_username = "*****"
rds_user_pwd = "*********"  # RDS password
rds_db_name = "Rick_And_Morty"
bucket_name = "de-masterclass-shisia"   # S3 bucket name

Reading one of our Transformed Datasets from s3

In [33]:
def read_data_from_s3(bucket_name, keys):
    dataframes = {}
    for key in keys:
        print(f"Reading {key} from S3...")
        df = s3_ops.read_csv_from_s3(bucket_name, key)
        if df is not None:
            dataframes[key] = df
            print(f"{key} DataFrame shape: {df.shape}")
        else:
            print(f"Error in loading {key} from S3")
    return dataframes


In [34]:
bucket = "de-masterclass-shisia"  # S3 bucket name

# Read data from S3
print("Reading Character data from S3...")
characters_df = s3_ops.read_csv_from_s3(bucket, 'Rick&Morty/Transformed/Character.csv')
print(f"Characters DataFrame shape: {characters_df.shape}")
print("Reading Episode data from S3...")
episodes_df = s3_ops.read_csv_from_s3(bucket, 'Rick&Morty/Transformed/Episode.csv')
print(f"Characters DataFrame shape: {episodes_df.shape}")
print("Reading Location data from S3...")
location_df = s3_ops.read_csv_from_s3(bucket, 'Rick&Morty/Transformed/Location.csv')
print(f"Locations DataFrame shape: {location_df.shape}")

# Check if data is loaded successfully
if characters_df is None or episodes_df is None or location_df is None:
    print("Error in loading data from S3")

print("Data loaded successfully from S3")



Reading Character data from S3...
Characters DataFrame shape: (826, 11)
Reading Episode data from S3...
Characters DataFrame shape: (51, 6)
Reading Location data from S3...
Locations DataFrame shape: (126, 6)
Data loaded successfully from S3


In [35]:
def create_tables(cursor):
    # SQL create table scripts
    create_character_table = """
        CREATE TABLE IF NOT EXISTS Character_Table (
            character_id INT NOT NULL PRIMARY KEY,
            name VARCHAR(255),
            status VARCHAR(255),
            species VARCHAR(255),
            type VARCHAR(255),
            gender VARCHAR(255),
            origin_id VARCHAR(255),
            location_id VARCHAR(255),
            image VARCHAR(255),
            url VARCHAR(255),
            created TIMESTAMP
        ) ENGINE=INNODB;
    """

    create_episode_table = """
        CREATE TABLE IF NOT EXISTS Episode_Table (
            episode_id INT NOT NULL PRIMARY KEY,
            name VARCHAR(255),
            air_date VARCHAR(255),
            episode VARCHAR(255),
            url VARCHAR(255),
            created TIMESTAMP
        ) ENGINE=INNODB;
    """

    create_appearance_table = """
        CREATE TABLE IF NOT EXISTS Appearance_Table (
            appearance_id INT NOT NULL PRIMARY KEY,
            episode_id INT,
            character_id INT
        ) ENGINE=INNODB;
    """

    create_location_table = """
        CREATE TABLE IF NOT EXISTS Location_Table (
            location_id INT NOT NULL PRIMARY KEY,
            name VARCHAR(255),
            type VARCHAR(255),
            dimension VARCHAR(255),
            url VARCHAR(255),
            created TIMESTAMP
        ) ENGINE=INNODB;
    """

    # Execute create table queries
    cursor.execute(create_character_table)
    cursor.execute(create_episode_table)
    cursor.execute(create_appearance_table)
    cursor.execute(create_location_table)

- Defining our connection to our remote database on AWS, creating our Destination Table and finally inserting the just transformed records to the database.

In [36]:
def bulk_insert_data(cursor, conn, df, table_name):
    column_names = list(df.columns)
    placeholders = ','.join(['%s'] * len(column_names))
    sql_insert = f"INSERT INTO {table_name} ({','.join(column_names)}) VALUES ({placeholders});"

    # Convert dataframe to list of tuples for batch insert
    data = [tuple(row[column] if pd.notna(row[column]) else None for column in column_names) for _, row in df.iterrows()]

    try:
        # Batch insert data using executemany for better performance
        cursor.executemany(sql_insert, data)
        conn.commit()
        print(f"Data inserted into {table_name}")
    except Exception as e:
        print(f"Error inserting data into {table_name}: {e}")
        conn.rollback()

In [37]:
# Establish RDS connection
def connect_to_rds():
    return pymysql.connect(host=rds_host,
                           user=rds_username,
                           password=rds_user_pwd,
                           port=3306,
                           database=rds_db_name,
                           cursorclass=pymysql.cursors.DictCursor)


In [38]:
# Main processing function
def process_data():
    try:
        # Read transformed data from S3
        keys = [
            'Rick&Morty/Transformed/Character.csv',
            'Rick&Morty/Transformed/Episode.csv',
            'Rick&Morty/Transformed/Appearance.csv',
            'Rick&Morty/Transformed/Location.csv'
        ]
        dataframes = read_data_from_s3(bucket_name, keys)

        if not dataframes:
            print("No data loaded from S3")
            return

        # Establish RDS connection
        conn = connect_to_rds()
        cursor = conn.cursor()

        # Create tables
        create_tables(cursor)

        # Insert data into tables
        bulk_insert_data(cursor, conn, dataframes['Rick&Morty/Transformed/Character.csv'], "Character_Table")
        bulk_insert_data(cursor, conn, dataframes['Rick&Morty/Transformed/Episode.csv'], "Episode_Table")
        bulk_insert_data(cursor, conn, dataframes['Rick&Morty/Transformed/Appearance.csv'], "Appearance_Table")
        bulk_insert_data(cursor, conn, dataframes['Rick&Morty/Transformed/Location.csv'], "Location_Table")

        print("Data insertion completed successfully")
    except Exception as e:
        print("Exception: ", e)

# Run the processing function
process_data()

Reading Rick&Morty/Transformed/Character.csv from S3...
Rick&Morty/Transformed/Character.csv DataFrame shape: (826, 11)
Reading Rick&Morty/Transformed/Episode.csv from S3...
Rick&Morty/Transformed/Episode.csv DataFrame shape: (51, 6)
Reading Rick&Morty/Transformed/Appearance.csv from S3...
Rick&Morty/Transformed/Appearance.csv DataFrame shape: (1266, 3)
Reading Rick&Morty/Transformed/Location.csv from S3...
Rick&Morty/Transformed/Location.csv DataFrame shape: (126, 6)
Error inserting data into Character_Table: (1062, "Duplicate entry '1' for key 'Character_Table.PRIMARY'")
Error inserting data into Episode_Table: (1062, "Duplicate entry '1' for key 'Episode_Table.PRIMARY'")
Error inserting data into Appearance_Table: (1062, "Duplicate entry '0' for key 'Appearance_Table.PRIMARY'")
Error inserting data into Location_Table: (1062, "Duplicate entry '1' for key 'Location_Table.PRIMARY'")
Data insertion completed successfully
