In [1]:
from dotenv import load_dotenv
load_dotenv(override=True)

import os
from sqlalchemy import create_engine, text
from datetime import datetime
import pandas as pd
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

In [2]:
SPOTIPY_CLIENT_ID = os.getenv("SPOTIPY_CLIENT_ID")
SPOTIPY_CLIENT_SECRET = os.getenv("SPOTIPY_CLIENT_SECRET")
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD')

In [3]:
# Extract function
def extract_spotify_data(playlist_id, SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET):
    credentials = SpotifyClientCredentials(client_id=SPOTIPY_CLIENT_ID, client_secret=SPOTIPY_CLIENT_SECRET)
    sp = spotipy.Spotify(client_credentials_manager=credentials)

    results = sp.playlist_items(playlist_id)
    hot_100 = []

    for i, item in enumerate(results['items'], start=1):
        track = item['track']
        if track is None:
            continue

        genre = 'Unknown'
        if track['artists']:
            artist_id = track['artists'][0]['id']
            if artist_id:
                artist_info = sp.artist(artist_id)
                genres = artist_info.get('genres')
                if genres:
                    genre = genres[0]
                    
        track_data = {
            'chart_position': i,
            'release_date': track['album'].get('release_date', 'N/A'),
            'track_name': track['name'],
            'track_id': track['id'],
            'popularity': track['popularity'],
            'duration_ms': track['duration_ms'],
            'track_number': track['track_number'],
            'artists': ', '.join([artist['name'] for artist in track['artists']]),
            'album_name': track['album']['name'],
            'album_url': track['album']['external_urls']['spotify'],
            'track_url': track['external_urls']['spotify'],
            'genre': genre
        }
        hot_100.append(track_data)

    return hot_100


# Transformation function
def transform_data(extracted_data):

    df = pd.DataFrame(extracted_data)
    
    df['release_date'] = pd.to_datetime(df['release_date'], errors='coerce')
    df['extraction_date'] = datetime.now().date()
    df.to_csv("spotify_billboard_100_data.csv", index=False)
    print(f"Transformed data with {len(df)} records")
    
    return df


# Load function
def load_data_to_sql(df, db_url, table_name='hot_100'):
    engine = create_engine(db_url)

    if df.empty:
        print("No data to load.")
        return df

    with engine.begin() as connection:
        connection.execute(text(f"DELETE FROM {table_name}"))

    df.to_sql(table_name, con=engine, if_exists='append', index=False)
    print(f"Replaced all rows in SQL table: {table_name} with {len(df)} new rows")

    return df

In [4]:
def etl_spotify_data_to_sql(playlist_id):
    db_url = f'mysql+pymysql://root:{MYSQL_PASSWORD}@127.0.0.1:3306/SpotifyHot100'
    
    extracted_data = extract_spotify_data(playlist_id, SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET)
    transformed_data = transform_data(extracted_data)
    load_data_to_sql(transformed_data, db_url)


playlist_id = "6UeSakyzhiEt4NB3UAd6NQ"
etl_spotify_data_to_sql(playlist_id)


Transformed data with 100 records
Replaced all rows in SQL table: hot_100 with 100 new rows
