# Load data into RDS

Two different data are loaded into RDS:
1. user metadata
2. song metadata

## Import necessary libraries

In [None]:
import pandas as pd
import psycopg2

from io import StringIO
from contextlib import contextmanager
import os

## Database connection

In [26]:
# Database connection configuration
DB_HOST = "lab-db12.czaiaq68azf6.eu-west-1.rds.amazonaws.com"
DB_NAME = "music_db"
DB_USER = "postgres"
DB_PASSWORD = "lab-db12"

DB_CONFIG = {
    "host": DB_HOST,
    "dbname": DB_NAME,
    "user": DB_USER,
    "password": DB_PASSWORD,
}

# File paths configuration
# Get the current working directory and navigate to the root
CURRENT_DIR = os.getcwd()  # Get current working directory
# Assuming we're in dags/music_streaming/loaders
ROOT_DIR = os.path.dirname(
    os.path.dirname(os.path.dirname(CURRENT_DIR))
)  # Go up 3 levels
DATA_DIR = os.path.join(ROOT_DIR, "data")  # Path to data directory

# Debug print to verify paths
print(f"Current Directory: {CURRENT_DIR}")
print(f"Root Directory: {ROOT_DIR}")
print(f"Data Directory: {DATA_DIR}")

# CSV file paths relative to DATA_DIR
USER_CSV_FILE = "users/users.csv"
SONG_CSV_FILE = "songs/songs.csv"

# Database table names
USER_TABLE = "users"
SONG_TABLE = "songs"

# Batch processing size
CHUNK_SIZE = 50_000


Current Directory: c:\DOJO\VSCode Project\Notebooks\Labs\Phase_II_labs\1_music_streaming\dags\music_streaming\loaders
Root Directory: c:\DOJO\VSCode Project\Notebooks\Labs\Phase_II_labs\1_music_streaming
Data Directory: c:\DOJO\VSCode Project\Notebooks\Labs\Phase_II_labs\1_music_streaming\data


## Create a context manager for PostgreSQL connection


In [27]:
@contextmanager
def postgres_connection():
    """
    Context manager for PostgreSQL connection.

    Yields:
        connection: PostgreSQL connection object

    Ensures proper connection handling and cleanup.
    """
    conn = psycopg2.connect(**DB_CONFIG)
    try:
        yield conn
    finally:
        conn.close()


## Load CSV Data into RDS

In [31]:
def load_csv_to_table(csv_path: str, table_name: str):
    """
    Load data from a CSV file into a specified PostgreSQL table.

    Args:
        csv_path (str): Full path to the CSV file
        table_name (str): Name of the target database table
    """
    # Column mapping for songs table
    column_mappings = {
        "songs": {
            "key": "song_key"  # Map 'key' from CSV to 'song_key' in database
        }
    }

    try:
        with postgres_connection() as conn:
            # Read CSV headers
            df = pd.read_csv(csv_path, nrows=0)
            columns = list(df.columns)

            # Apply column mapping if exists
            if table_name in column_mappings:
                for old_col, new_col in column_mappings[table_name].items():
                    if old_col in columns:
                        columns[columns.index(old_col)] = new_col

            # Process in chunks
            for chunk in pd.read_csv(csv_path, chunksize=CHUNK_SIZE):
                # Rename columns according to mapping
                if table_name in column_mappings:
                    chunk = chunk.rename(columns=column_mappings[table_name])

                # Handle NaN values
                chunk = chunk.where(pd.notnull(chunk), None)

                with StringIO() as buffer:
                    chunk.to_csv(buffer, index=False, header=False)
                    buffer.seek(0)

                    with conn.cursor() as cursor:
                        copy_sql = f"""
                            COPY {table_name} ({",".join(columns)})
                            FROM STDIN WITH CSV
                        """
                        cursor.copy_expert(copy_sql, buffer)
                        conn.commit()

            print(f"Successfully loaded {os.path.basename(csv_path)} → {table_name}")

    except Exception as e:
        print(f"Error loading {csv_path}: {str(e)}")
        if "conn" in locals() and conn and not conn.closed:
            conn.rollback()


In [32]:
# def load_csv_to_table(csv_path: str, table_name: str):
#     """
#     Load data from a CSV file into a specified PostgreSQL table.

#     Args:
#         csv_path (str): Full path to the CSV file
#         table_name (str): Name of the target database table

#     Handles data in chunks to manage memory efficiently and properly handles NULL values.
#     """
#     try:
#         with postgres_connection() as conn:
#             # Get CSV headers for COPY command
#             with open(csv_path, "r") as f:
#                 columns = f.readline().strip().split(",")

#             # Process in chunks
#             for chunk in pd.read_csv(csv_path, chunksize=CHUNK_SIZE):
#                 # Handle NaN values by converting them to None (NULL in PostgreSQL)
#                 chunk = chunk.where(pd.notnull(chunk), None)

#                 with StringIO() as buffer:
#                     chunk.to_csv(buffer, index=False, header=False)
#                     buffer.seek(0)

#                     with conn.cursor() as cursor:
#                         copy_sql = f"""
#                             COPY {table_name} ({",".join(columns)})
#                             FROM STDIN WITH CSV
#                         """
#                         cursor.copy_expert(copy_sql, buffer)
#                         conn.commit()

#             print(f"Successfully loaded {os.path.basename(csv_path)} → {table_name}")

#     except Exception as e:
#         print(f"Error loading {csv_path}: {str(e)}")
#         if "conn" in locals():
#             conn.rollback()


In [None]:
def load_all_data():
    """
    Main function to process and load data into database tables.

    Handles the loading of all CSV files into their respective database tables,
    creating necessary directories if they don't exist.
    """
    file_table_mapping = {
        USER_CSV_FILE: USER_TABLE,
        SONG_CSV_FILE: SONG_TABLE,
    }

    print(f"Data directory path: {DATA_DIR}")  # Debug print

    for csv_file, table_name in file_table_mapping.items():
        # Create full path by joining DATA_DIR with the CSV file path
        csv_path = os.path.join(DATA_DIR, csv_file)

        # Debug print
        print(f"Attempting to load: {csv_path}")

        # Ensure directory exists
        os.makedirs(os.path.dirname(csv_path), exist_ok=True)

        if os.path.exists(csv_path):
            load_csv_to_table(csv_path, table_name)
            print(f"File found: {csv_path}")

        else:
            print(f"File not found: {csv_path}")
            print("Please ensure the file exists at the specified location")


## Load data from CSV files into PostgreSQL tables


In [34]:
if __name__ == "__main__":
    load_all_data()


Data directory path: c:\DOJO\VSCode Project\Notebooks\Labs\Phase_II_labs\1_music_streaming\data
Attempting to load: c:\DOJO\VSCode Project\Notebooks\Labs\Phase_II_labs\1_music_streaming\data\users/users.csv


Successfully loaded users.csv → users
Attempting to load: c:\DOJO\VSCode Project\Notebooks\Labs\Phase_II_labs\1_music_streaming\data\songs/songs.csv
Successfully loaded songs.csv → songs
