# Spotify ETL Pipeline

This notebook implements an ETL (Extract, Transform, Load) pipeline for Spotify data.

- **Extract**: Fetch data from Spotify API
- **Transform**: Clean and structure the data using Pandas
- **Load**: Insert data into PostgreSQL data warehouse

## 1. Setup and Imports

In [None]:
# Install required packages if not already installed
# !pip install spotipy pandas psycopg2-binary python-dotenv

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import pandas as pd
import psycopg2
from psycopg2 import sql
import os
from dotenv import load_dotenv
import json
from datetime import datetime

## 2. Configuration

Load environment variables for API keys and database credentials.

In [None]:
# Load environment variables
load_dotenv()

# Spotify API credentials
SPOTIFY_CLIENT_ID = os.getenv('SPOTIFY_CLIENT_ID')
SPOTIFY_CLIENT_SECRET = os.getenv('SPOTIFY_CLIENT_SECRET')

# PostgreSQL credentials
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = os.getenv('DB_PORT', '5432')
DB_NAME = os.getenv('DB_NAME', 'spotify_dw')
DB_USER = os.getenv('DB_USER', 'postgres')
DB_PASSWORD = os.getenv('DB_PASSWORD')

# Check if credentials are loaded
if not SPOTIFY_CLIENT_ID or not SPOTIFY_CLIENT_SECRET:
    raise ValueError("Spotify API credentials not found. Please set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET in .env file")

if not DB_PASSWORD:
    raise ValueError("Database password not found. Please set DB_PASSWORD in .env file")

## 3. Extract Data from Spotify API

Authenticate with Spotify API and extract data for a specific playlist or artist.

In [None]:
# Authenticate with Spotify
client_credentials_manager = SpotifyClientCredentials(client_id=SPOTIFY_CLIENT_ID, client_secret=SPOTIFY_CLIENT_SECRET)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

# Example: Extract tracks from a playlist
# Replace with your playlist ID
playlist_id = '37i9dQZF1DXcBWIGoYBM5M'  # Today's Top Hits (example)

def extract_playlist_tracks(playlist_id):
    results = sp.playlist_tracks(playlist_id)
    tracks = []
    
    while results:
        for item in results['items']:
            track = item['track']
            if track:
                track_data = {
                    'track_id': track['id'],
                    'track_name': track['name'],
                    'artist_id': track['artists'][0]['id'] if track['artists'] else None,
                    'artist_name': track['artists'][0]['name'] if track['artists'] else None,
                    'album_id': track['album']['id'],
                    'album_name': track['album']['name'],
                    'duration_ms': track['duration_ms'],
                    'popularity': track['popularity'],
                    'external_urls': track['external_urls']['spotify'],
                    'extracted_at': datetime.now().isoformat()
                }
                tracks.append(track_data)
        
        # Get next page
        results = sp.next(results) if results['next'] else None
    
    return tracks

# Extract data
raw_tracks = extract_playlist_tracks(playlist_id)
print(f"Extracted {len(raw_tracks)} tracks")

# Save raw data to JSON for inspection
with open('../data/raw_tracks.json', 'w') as f:
    json.dump(raw_tracks, f, indent=2)

## 4. Transform Data

Clean and structure the extracted data using Pandas.

In [None]:
# Load raw data into DataFrame
df_tracks = pd.DataFrame(raw_tracks)

# Data cleaning
# Remove duplicates
df_tracks = df_tracks.drop_duplicates(subset=['track_id'])

# Handle missing values
df_tracks = df_tracks.dropna(subset=['track_id', 'track_name'])

# Convert duration from ms to seconds
df_tracks['duration_sec'] = df_tracks['duration_ms'] / 1000
df_tracks = df_tracks.drop('duration_ms', axis=1)

# Convert extracted_at to datetime
df_tracks['extracted_at'] = pd.to_datetime(df_tracks['extracted_at'])

# Create separate DataFrames for artists and albums
df_artists = df_tracks[['artist_id', 'artist_name']].drop_duplicates().dropna()
df_albums = df_tracks[['album_id', 'album_name']].drop_duplicates().dropna()

# Keep only necessary columns for tracks
df_tracks_clean = df_tracks[['track_id', 'track_name', 'artist_id', 'album_id', 'duration_sec', 'popularity', 'external_urls', 'extracted_at']]

print("Transformed data shapes:")
print(f"Tracks: {df_tracks_clean.shape}")
print(f"Artists: {df_artists.shape}")
print(f"Albums: {df_albums.shape}")

# Save transformed data
df_tracks_clean.to_csv('../data/transformed_tracks.csv', index=False)
df_artists.to_csv('../data/transformed_artists.csv', index=False)
df_albums.to_csv('../data/transformed_albums.csv', index=False)

## 5. Load Data to PostgreSQL

Connect to PostgreSQL and load the transformed data.

In [None]:
# Connect to PostgreSQL
conn = psycopg2.connect(
    host=DB_HOST,
    port=DB_PORT,
    database=DB_NAME,
    user=DB_USER,
    password=DB_PASSWORD
)
cursor = conn.cursor()

# Create tables if they don't exist
create_artists_table = """
CREATE TABLE IF NOT EXISTS artists (
    artist_id VARCHAR(50) PRIMARY KEY,
    artist_name VARCHAR(255) NOT NULL
);
"""

create_albums_table = """
CREATE TABLE IF NOT EXISTS albums (
    album_id VARCHAR(50) PRIMARY KEY,
    album_name VARCHAR(255) NOT NULL
);
"""

create_tracks_table = """
CREATE TABLE IF NOT EXISTS tracks (
    track_id VARCHAR(50) PRIMARY KEY,
    track_name VARCHAR(255) NOT NULL,
    artist_id VARCHAR(50) REFERENCES artists(artist_id),
    album_id VARCHAR(50) REFERENCES albums(album_id),
    duration_sec FLOAT,
    popularity INTEGER,
    external_urls VARCHAR(500),
    extracted_at TIMESTAMP
);
"""

cursor.execute(create_artists_table)
cursor.execute(create_albums_table)
cursor.execute(create_tracks_table)
conn.commit()

# Load data
# Insert artists
for _, row in df_artists.iterrows():
    cursor.execute(
        "INSERT INTO artists (artist_id, artist_name) VALUES (%s, %s) ON CONFLICT (artist_id) DO NOTHING",
        (row['artist_id'], row['artist_name'])
    )

# Insert albums
for _, row in df_albums.iterrows():
    cursor.execute(
        "INSERT INTO albums (album_id, album_name) VALUES (%s, %s) ON CONFLICT (album_id) DO NOTHING",
        (row['album_id'], row['album_name'])
    )

# Insert tracks
for _, row in df_tracks_clean.iterrows():
    cursor.execute(
        """INSERT INTO tracks (track_id, track_name, artist_id, album_id, duration_sec, popularity, external_urls, extracted_at) 
           VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (track_id) DO NOTHING""",
        (row['track_id'], row['track_name'], row['artist_id'], row['album_id'], 
         row['duration_sec'], row['popularity'], row['external_urls'], row['extracted_at'])
    )

conn.commit()
print("Data loaded successfully")

# Close connection
cursor.close()
conn.close()

## 6. Verification

Verify that the data has been loaded correctly.

In [None]:
# Reconnect to verify
conn = psycopg2.connect(
    host=DB_HOST,
    port=DB_PORT,
    database=DB_NAME,
    user=DB_USER,
    password=DB_PASSWORD
)

# Query sample data
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM tracks")
track_count = cursor.fetchone()[0]
print(f"Total tracks in database: {track_count}")

cursor.execute("SELECT track_name, artist_name, popularity FROM tracks t JOIN artists a ON t.artist_id = a.artist_id ORDER BY popularity DESC LIMIT 5")
top_tracks = cursor.fetchall()
print("\nTop 5 tracks by popularity:")
for track in top_tracks:
    print(f"{track[0]} by {track[1]} (Popularity: {track[2]})")

cursor.close()
conn.close()