# ETL - Database Connection

## Dependencies

In [1]:
!pip install sqlalchemy

Defaulting to user installation because normal site-packages is not writeable


In [2]:
!pip install pymysql

Defaulting to user installation because normal site-packages is not writeable


In [1]:
!pip install python-dotenv

Defaulting to user installation because normal site-packages is not writeable


In [2]:
### Library imports
import pandas as pd
import ast
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
import pymysql

In [5]:
### Set pandas settings
pd.set_option('display.max_columns', None)

## Replicate Data Warehouse Schema from `final_df`

In [10]:
import os

# Specify the path to the 'data' subfolder
data_folder = 'data'

# Get a list of all CSV files in the 'data' subfolder
csv_files = [os.path.join(data_folder, file) for file in os.listdir(data_folder) if file.endswith('.csv') and file.startswith('bulk_data_')]

# Check if any CSV files exist in the 'data' subfolder
if not csv_files:
    raise FileNotFoundError("No CSV files found in the 'data' subfolder with the specified prefix.")

# Sort CSV files by modification time (most recent first)
csv_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)

# Select the most recent CSV file
latest_csv_file = csv_files[0]

# Read in the most recent CSV file
final_df = pd.read_csv(latest_csv_file)

# Print the name of the file being read
print(f"Reading DataFrame from CSV file: {latest_csv_file}")

Reading DataFrame from CSV file: data/bulk_data_20240410_083343.csv


In [8]:
### Create dimension tables
# dim_chart_month = final_df[['chart_month', 'track_id']].drop_duplicates().reset_index(drop=True)
dim_album = final_df[['album', 'main_artist', 'album_type']].drop_duplicates().reset_index(drop=True)
dim_pitch_class = final_df[['pitch_key', 'tonal_counterparts', 'solfege']].drop_duplicates().reset_index(drop=True)

## Create dim_artist
max_followers = final_df.groupby('main_artist')['followers'].idxmax()
dim_artist = final_df.loc[max_followers][[
    'main_artist', 'followers']].drop_duplicates().reset_index(drop=True)

## Create dim_song
max_weeks_index = final_df.groupby('track_id')['weeks_on_chart'].idxmax()
dim_song = final_df.loc[max_weeks_index][[
    'track_id', 'track_name', 'all_artists', 'album', 'release_date',
    'weeks_on_chart', 'preview_url', 'cover_art_url', 'external_url',
    'analysis_url', 'explicit', 'track_number', 'available_markets',
    'uri', 'track_href'
]].drop_duplicates().reset_index(drop=True)

## Create dim_genre
dim_genre_data = []
for index, row in final_df.iterrows():
    main_artist = row['main_artist']
    genres_list_str = row['genres']
    # convert the string representation of list into a list object
    genres_list = ast.literal_eval(genres_list_str)
    for genre in genres_list:
        dim_genre_data.append({'main_artist': main_artist, 'genre': genre})
dim_genres = pd.DataFrame(dim_genre_data).drop_duplicates()

### Create fact table
avg_popularity_index = final_df.groupby('track_id')['popularity'].idxavg()
fact_table = final_df[[
    'track_id', 'main_artist', avg_popularity_index, 'danceability', 'energy', 'pitch_key',
    'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness',
    'liveness', 'valence', 'tempo', 'time_signature', 'duration_sec'
]].drop_duplicates().reset_index(drop=True)

In [9]:
### Verify df column names and datatypes match that of database schema
print('Verify df column names and datatypes match that of database schema:')
print()
print('dim_album')
print(dim_album.dtypes)
print()
print('dim_artist')
print(dim_artist.dtypes)
print()
print('dim_pitch_class')
print(dim_pitch_class.dtypes)
print()
print('dim_song')
print(dim_song.dtypes)
print()
print('dim_genres')
print(dim_genres.dtypes)
print()
print('fact_table')
print(fact_table.dtypes)
print()

Verify df column names and datatypes match that of database schema:

dim_album
album          object
main_artist    object
album_type     object
dtype: object

dim_artist
main_artist    object
followers       int64
dtype: object

dim_pitch_class
pitch_key              int64
tonal_counterparts    object
solfege               object
dtype: object

dim_song
track_id             object
track_name           object
all_artists          object
album                object
release_date         object
popularity            int64
weeks_on_chart        int64
preview_url          object
cover_art_url        object
external_url         object
analysis_url         object
explicit               bool
track_number          int64
available_markets    object
uri                  object
track_href           object
dtype: object

dim_genres
main_artist    object
genre          object
dtype: object

fact_table
track_id             object
main_artist          object
danceability        float64
energy         

## Ingestion to Data Warehouse

In [12]:
### Data Ingestion

## Load environment variables from .env file
load_dotenv('config.env')

## 1. Connect to your RDS database
# Replace 'username', 'password', 'host', 'port', and 'database_name' with your credentials

username = os.environ.get('RDS_USERNAME')
password = os.environ.get('RDS_PASSWORD')
host = os.environ.get('RDS_HOST')
port = os.environ.get('RDS_PORT')
database_name = os.environ.get('RDS_DATABASE')

## 2. Create SQLAlchemy engine for MySQL
engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database_name}")

## 3. Read your Pandas DataFrame (assuming you already have it loaded)
# Replace 'df' with the name of your DataFrame
# Replace 'table_name' with the name of the table you want to create in the database
# Replace 'if_exists' parameter with 'replace' if you want to replace the table if it already exists
dim_artist.to_sql('dim_artist', engine, if_exists='append', index=False)
dim_album.to_sql('dim_album', engine, if_exists='append', index=False)
dim_genres.to_sql('dim_genres', engine, if_exists='append', index=False)
dim_pitch_class.to_sql('dim_pitch_class', engine, if_exists='append', index=False)
dim_song.to_sql('dim_song', engine, if_exists='append', index=False)
fact_table.to_sql('fact_table', engine, if_exists='append', index=False)

## Close the engine connection
engine.dispose()

worked fine breh
