In [1]:
import requests
import base64
import json
import os
import pandas as pd
from dotenv import load_dotenv  # Load API keys securely

# Load environment variables
load_dotenv()
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")

# Function to get Spotify access token
def get_spotify_token(client_id, client_secret):
    url = "https://accounts.spotify.com/api/token"
    headers = {
        "Authorization": "Basic " + base64.b64encode(f"{client_id}:{client_secret}".encode()).decode(),
    }
    data = {"grant_type": "client_credentials"}

    response = requests.post(url, headers=headers, data=data)
    token_info = response.json()
    return token_info["access_token"]

# Function to get trending artists dynamically from Spotify charts
def get_top_artists(token, limit=10):
    url = f"https://api.spotify.com/v1/browse/new-releases?limit={limit}"
    headers = {"Authorization": f"Bearer {token}"}
    
    response = requests.get(url, headers=headers)
    albums = response.json().get("albums", {}).get("items", [])
    
    artists = []
    for album in albums:
        for artist in album["artists"]:
            artists.append(artist["name"])
    return list(set(artists))  # Remove duplicates

# Function to search for an artist
def search_artist(artist_name, token):
    url = f"https://api.spotify.com/v1/search?q={artist_name}&type=artist&limit=1"
    headers = {"Authorization": f"Bearer {token}"}
    
    response = requests.get(url, headers=headers)
    artist_data = response.json()
    
    if artist_data["artists"]["items"]:
        return artist_data["artists"]["items"][0]
    return None

# Function to get top tracks for an artist
def get_artist_top_tracks(artist_id, token, market="US"):
    url = f"https://api.spotify.com/v1/artists/{artist_id}/top-tracks?market={market}"
    headers = {"Authorization": f"Bearer {token}"}
    
    response = requests.get(url, headers=headers)
    return response.json()["tracks"]

# Function to get audio features of a track
def get_audio_features(track_id, token):
    url = f"https://api.spotify.com/v1/audio-features/{track_id}"
    headers = {"Authorization": f"Bearer {token}"}
    
    response = requests.get(url, headers=headers)
    return response.json()

# Authenticate and get token
token = get_spotify_token(CLIENT_ID, CLIENT_SECRET)

# Get trending artists dynamically
artist_names = get_top_artists(token, limit=15)

# Data collection
spotify_data = []

for artist_name in artist_names:
    artist_info = search_artist(artist_name, token)
    
    if artist_info:
        artist_id = artist_info["id"]
        artist_popularity = artist_info["popularity"]
        artist_followers = artist_info["followers"]["total"]
        genres = ", ".join(artist_info["genres"])

        top_tracks = get_artist_top_tracks(artist_id, token)
        
        for track in top_tracks:
            # Get audio features for deeper insights
            audio_features = get_audio_features(track["id"], token)
            
            spotify_data.append({
                "artist": artist_name,
                "artist_id": artist_id,
                "artist_popularity": artist_popularity,
                "artist_followers": artist_followers,
                "genres": genres,
                "track_name": track["name"],
                "track_id": track["id"],
                "track_popularity": track["popularity"],
                "release_date": track["album"]["release_date"],
                "danceability": audio_features.get("danceability"),
                "energy": audio_features.get("energy"),
                "tempo": audio_features.get("tempo"),
            })

# Convert to DataFrame and save to CSV
df = pd.DataFrame(spotify_data)
df.to_csv("spotify_trending_artists.csv", index=False)

print("✅ Data saved to spotify_trending_artists.csv")


✅ Data saved to spotify_trending_artists.csv


In [2]:
import os
import pandas as pd
from io import StringIO
import psycopg2
from dotenv import load_dotenv

load_dotenv()

DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
DB_PORT = os.getenv("DB_PORT")  # e.g., "5433"

def fix_spotify_release_date(date_str):
    """
    Convert Spotify's variable-length date string into a valid YYYY-MM-DD.
    - If '2020' => '2020-01-01'
    - If '2020-05' => '2020-05-01'
    - If '2020-05-25' => '2020-05-25'
    """
    if not date_str:
        return None
    parts = date_str.split('-')
    if len(parts) == 1:
        return date_str + '-01-01'  # e.g. '2011' -> '2011-01-01'
    elif len(parts) == 2:
        return date_str + '-01'     # e.g. '2011-06' -> '2011-06-01'
    else:
        return date_str            # e.g. '2011-06-24' stays the same

def copy_csv_to_db(csv_path, table_name):
    """
    1. Read CSV into a pandas DataFrame.
    2. Fix 'release_date' using fix_spotify_release_date().
    3. COPY the corrected data into PostgreSQL using psycopg2.
    """
    # 1. Read CSV
    df = pd.read_csv(csv_path)
    
    # 2. Fix the release_date column
    if 'release_date' in df.columns:
        df['release_date'] = df['release_date'].apply(fix_spotify_release_date)
    
    # 3. Write DataFrame to an in-memory CSV buffer
    buffer = StringIO()
    df.to_csv(buffer, index=False)
    buffer.seek(0)

    # 4. Connect to PostgreSQL
    conn = psycopg2.connect(
        host=DB_HOST,
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASS,
        port=DB_PORT
    )
    cur = conn.cursor()

    # Optionally set the search path (if your table is in a custom schema):
    cur.execute("SET search_path TO spotify_trends")

    # 5. COPY from the in-memory CSV
    copy_sql = f"""
        COPY {table_name} (
            artist,
            artist_id,
            artist_popularity,
            artist_followers,
            genres,
            track_name,
            track_id,
            track_popularity,
            release_date,
            danceability,
            energy,
            tempo
        )
        FROM STDIN
        WITH CSV
        HEADER
    """
    cur.copy_expert(copy_sql, buffer)

    conn.commit()
    cur.close()
    conn.close()
    print(f"✅ CSV data from '{csv_path}' copied into '{table_name}' successfully, with corrected release dates!")

if __name__ == "__main__":
    csv_file_path = "spotify_trending_artists.csv"
    target_table = "music_trends"
    copy_csv_to_db(csv_file_path, target_table)


✅ CSV data from 'spotify_trending_artists.csv' copied into 'music_trends' successfully, with corrected release dates!
