In [19]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import logging
import numpy as np

In [6]:
logging.basicConfig(filename='artist_import.log', level=logging.INFO)


In [7]:
# Database connection parameters
DB_CONFIG = {
    'host': 'localhost',
    'port': '5432',
    'database': 'spotifypi',
    'user': 'postgres',
    'password': 'awssama69'
}


In [8]:
def truncate_string(value, max_length):
    """Truncate string to specified length if needed"""
    if value and len(value) > max_length:
        truncated = value[:max_length]
        logging.warning(f"Truncated value: {value} -> {truncated}")
        return truncated
    return value

In [10]:
# Load data from CSV files
artist_df = pd.read_csv('cleaned data\cleaned_artists.csv')
listeners_df = pd.read_csv('cleaned data\cleaned_listeners.csv')

In [11]:
# Clean and transform the data
merged_df = pd.merge(
    artist_df,
    listeners_df,
    left_on='name',
    right_on='Artist',
    how='left'
)


In [12]:
merged_df.head()

Unnamed: 0,id,followers,genres,name,popularity,genre_count,is_verified,Artist,Listeners,PkListeners,Peak,Trend_Value,Trend_Direction,At_Peak
0,1uNFoZAHBGtllmzznpCI3s,44606973,"['canadian pop', 'pop', 'post-teen pop']",Justin Bieber,100,3,True,Justin Bieber,72623228.0,75467229.0,6.0,34540.0,Flat,False
1,4q3ewBCX7sLwd24euuV69X,32244734,"['trap latino', 'latin', 'reggaeton']",Bad Bunny,98,3,True,Bad Bunny,76162057.0,83950570.0,3.0,-199052.0,Down,False
2,06HL4z0CvFAxyc27GXpf02,38869193,"['pop', 'post-teen pop']",Taylor Swift,98,2,True,Taylor Swift,101003302.0,101003302.0,2.0,889.0,Flat,False
3,3TVXtAsR1Inumwj472S9r4,54416812,"['canadian hip hop', 'toronto rap', 'pop rap',...",Drake,98,6,True,Drake,75371611.0,76391086.0,6.0,-67013.0,Down,False
4,4MCBfE4596Uoi2O4DtmEMz,16996777,"['melodic rap', 'chicago rap']",Juice Wrld,96,2,True,,,,,,,


In [13]:
# Select and rename columns
dim_artists_df = merged_df[[
    'id', 'name', 'followers', 'popularity', 'is_verified',
    'Listeners', 'PkListeners', 'Peak', 'At_Peak'
]].copy()

dim_artists_df = dim_artists_df.rename(columns={
    'id': 'artist_id',
    'Listeners': 'current_listeners',
    'PkListeners': 'peak_listeners',
    'Peak': 'peak_rank',
    'At_Peak': 'at_peak'
})

In [14]:
# Handle missing values
dim_artists_df['current_listeners'] = dim_artists_df['current_listeners'].fillna(0)
dim_artists_df['peak_listeners'] = dim_artists_df['peak_listeners'].fillna(0)
dim_artists_df['peak_rank'] = dim_artists_df['peak_rank'].fillna(0)
dim_artists_df['at_peak'] = dim_artists_df['at_peak'].fillna(False)

In [15]:
# Convert data types and truncate strings
dim_artists_df['name'] = dim_artists_df['name'].apply(lambda x: truncate_string(x, 255))
dim_artists_df['artist_id'] = dim_artists_df['artist_id'].apply(lambda x: truncate_string(x, 255))

dim_artists_df['followers'] = dim_artists_df['followers'].astype('int64')
dim_artists_df['popularity'] = dim_artists_df['popularity'].astype('int32')
dim_artists_df['current_listeners'] = dim_artists_df['current_listeners'].astype('int32')
dim_artists_df['peak_listeners'] = dim_artists_df['peak_listeners'].astype('int32')
dim_artists_df['peak_rank'] = dim_artists_df['peak_rank'].astype('int32')
dim_artists_df['is_verified'] = dim_artists_df['is_verified'].astype(bool)
dim_artists_df['at_peak'] = dim_artists_df['at_peak'].astype(bool)


In [16]:
# Connect to database and truncate the table
try:
    engine = create_engine(
        f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
    )
    
    # Clear existing data
    with engine.connect() as conn:
        conn.execute("TRUNCATE TABLE public.dim_artists RESTART IDENTITY;")
        
except Exception as error:
    print(f"Error during connection/truncation: {error}")

In [17]:
# Insert data with error handling for each row
try:
    success_count = 0
    with engine.begin() as connection:
        for _, row in dim_artists_df.iterrows():
            try:
                connection.execute(
                    """INSERT INTO public.dim_artists 
                    (artist_id, name, followers, popularity, is_verified, 
                     current_listeners, peak_listeners, peak_rank, at_peak)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
                    (
                        row['artist_id'],
                        row['name'],
                        row['followers'],
                        row['popularity'],
                        row['is_verified'],
                        row['current_listeners'],
                        row['peak_listeners'],
                        row['peak_rank'],
                        row['at_peak']
                    )
                )
                success_count += 1
            except Exception as e:
                logging.error(f"Failed to insert artist {row['artist_id']} - {row['name']}: {str(e)}")
                continue

    print(f"Successfully loaded {success_count} of {len(dim_artists_df)} records into public.dim_artists table!")
    print(f"Check artist_import.log for details on any truncated or skipped records.")

except Exception as error:
    print(f"Error during data insertion: {error}")
finally:
    if 'engine' in locals():
        engine.dispose()


Successfully loaded 1048575 of 1048575 records into public.dim_artists table!
Check artist_import.log for details on any truncated or skipped records.


# bridge table

In [21]:
def extract_and_load_genres():

    all_genres = set()
    for genres_str in artist_df['genres']:
        if isinstance(genres_str, str) and genres_str != '[]':
            # Clean and split genre strings like "['pop', 'rock']"
            genres = genres_str.strip("[]").replace("'", "").replace('"', '').split(",")
            all_genres.update([g.strip() for g in genres if g.strip()])
    
    print(f"Found {len(all_genres)} unique genres")
    
    # Step 3: Connect to database
    engine = create_engine(
        f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
    )
    
    try:
        with engine.begin() as conn:
            # Step 4: Clear existing tables (in correct order due to FK constraints)
            conn.execute("DROP TABLE IF EXISTS public.bridge_artist_genres;")
            conn.execute("TRUNCATE TABLE public.dim_genres RESTART IDENTITY CASCADE;")
            
            # Recreate bridge table with proper schema
            conn.execute("""
                CREATE TABLE IF NOT EXISTS public.bridge_artist_genres (
                    artist_id VARCHAR(255) REFERENCES dim_artists(artist_id),
                    genre_id INTEGER REFERENCES dim_genres(genre_id),
                    PRIMARY KEY (artist_id, genre_id)
                )
            """)
            
            # Step 5: Insert all genres
            for genre in sorted(all_genres):
                conn.execute(
                    "INSERT INTO public.dim_genres (genre_name) VALUES (%s)",
                    (genre,)
                )
            
            print("Genre dimension populated successfully")
            
            # Step 6: Create artist-genre relationships
            print("Creating artist-genre relationships...")
            
            # Get genre mapping
            genre_map = pd.read_sql("SELECT genre_id, genre_name FROM public.dim_genres", conn)
            genre_dict = dict(zip(genre_map['genre_name'], genre_map['genre_id']))
            
            # Process each artist's genres
            for _, artist in artist_df.iterrows():
                if isinstance(artist['genres'], str) and artist['genres'] != '[]':
                    genres = artist['genres'].strip("[]").replace("'", "").replace('"', '').split(",")
                    for genre in genres:
                        genre = genre.strip()
                        if genre in genre_dict:
                            try:
                                conn.execute(
                                    """INSERT INTO public.bridge_artist_genres
                                    (artist_id, genre_id)
                                    VALUES (%s, %s)
                                    ON CONFLICT DO NOTHING""",
                                    (artist['id'], genre_dict[genre])
                                )
                            except Exception as e:
                                print(f"Error inserting {artist['id']}, {genre}: {e}")
                                continue
            
            print("Artist-genre relationships created successfully")
    
    except Exception as e:
        print(f"Error: {e}")
        raise
    finally:
        engine.dispose()

if __name__ == "__main__":
    extract_and_load_genres()

Found 5371 unique genres
Genre dimension populated successfully
Creating artist-genre relationships...
Artist-genre relationships created successfully


In [22]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import re

# Database configuration
DB_CONFIG = {
    'host': 'localhost',
    'port': '5432',
    'database': 'spotifypi',
    'user': 'postgres',
    'password': 'awssama69'
}

def clean_genre_name(genre):
    """Clean and validate genre names"""
    if not isinstance(genre, str):
        return None
        
    genre = genre.strip()
    genre = re.sub(r'[^\w\s-]', '', genre)  # Remove special chars except spaces and hyphens
    
    # Skip invalid/empty genres
    if not genre or genre.lower() in ['unknown', 'none', 'null', '']:
        return None
        
    return genre[:255]  # Truncate to max length

def clean_artist_name(name):
    """Clean artist names and handle encoding issues"""
    if not isinstance(name, str):
        return "Unknown Artist"
    
    # Fix encoding issues
    name = name.encode('ascii', 'ignore').decode('ascii')
    name = re.sub(r'[^\w\s-]', '', name)
    
    return name[:255] or "Unknown Artist"

def extract_and_load_genres():
    # Step 1: Load artist data from CSV with proper encoding
    try:
        artist_df = pd.read_csv('artists.csv', encoding='utf-8')
    except UnicodeDecodeError:
        artist_df = pd.read_csv('artists.csv', encoding='latin1')
    
    # Clean artist names
    artist_df['name'] = artist_df['name'].apply(clean_artist_name)
    
    # Step 2: Extract and clean unique genres
    all_genres = set()
    for genres_str in artist_df['genres']:
        if isinstance(genres_str, str) and genres_str.strip() not in ['[]', '']:
            # Clean and split genre strings
            genres = re.findall(r"['\"]([^'\"]+)['\"]", genres_str)
            for genre in genres:
                cleaned_genre = clean_genre_name(genre)
                if cleaned_genre:
                    all_genres.add(cleaned_genre)
    
    print(f"Found {len(all_genres)} valid genres after cleaning")
    
    # Step 3: Connect to database
    engine = create_engine(
        f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
    )
    
    try:
        with engine.begin() as conn:
            # Step 4: Clear existing tables
            conn.execute("TRUNCATE TABLE public.bridge_artist_genres;")
            conn.execute("TRUNCATE TABLE public.dim_genres RESTART IDENTITY CASCADE;")
            
            # Step 5: Insert all valid genres
            for genre in sorted(all_genres):
                conn.execute(
                    "INSERT INTO public.dim_genres (genre_name) VALUES (%s)",
                    (genre,)
                )
            
            print("Genre dimension populated with clean data")
            
            # Step 6: Create artist-genre relationships
            print("Creating artist-genre relationships...")
            
            # Get genre mapping
            genre_map = pd.read_sql("SELECT genre_id, genre_name FROM public.dim_genres", conn)
            genre_dict = dict(zip(genre_map['genre_name'], genre_map['genre_id']))
            
            # Process each artist's genres
            inserted_relations = 0
            for _, artist in artist_df.iterrows():
                if isinstance(artist['genres'], str) and artist['genres'].strip() not in ['[]', '']:
                    genres = re.findall(r"['\"]([^'\"]+)['\"]", artist['genres'])
                    for genre in genres:
                        cleaned_genre = clean_genre_name(genre)
                        if cleaned_genre and cleaned_genre in genre_dict:
                            try:
                                conn.execute(
                                    """INSERT INTO public.bridge_artist_genres
                                    (artist_id, genre_id)
                                    VALUES (%s, %s)
                                    ON CONFLICT DO NOTHING""",
                                    (artist['id'], genre_dict[cleaned_genre])
                                )
                                inserted_relations += 1
                            except Exception as e:
                                print(f"Error inserting {artist['id']}, {cleaned_genre}: {e}")
                                continue
            
            print(f"Created {inserted_relations} artist-genre relationships")
    
    except Exception as e:
        print(f"Error: {e}")
        raise
    finally:
        engine.dispose()

if __name__ == "__main__":
    extract_and_load_genres()

Found 5364 valid genres after cleaning
Genre dimension populated with clean data
Creating artist-genre relationships...
Created 432215 artist-genre relationships


# fact table

In [9]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import numpy as np

DB_CONFIG = {
    'host': 'localhost',
    'port': '5432',
    'database': 'spotifypi',
    'user': 'postgres',
    'password': 'awssama69'
}

def calculate_artist_performance():
    """Calculate and load artist performance metrics into fact table"""
    engine = create_engine(
        f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
    )
    
    try:
        with engine.begin() as conn:
            # Clear existing fact data
            conn.execute("TRUNCATE TABLE public.fact_artistperformance;")
            
            # Get artist performance metrics with genre arrays
            query = """
            WITH artist_stats AS (
                SELECT 
                    a.artist_id,
                    a.name,
                    a.popularity,
                    a.is_verified,
                    a.at_peak,
                    COUNT(DISTINCT g.genre_id) AS genre_count,
                    ARRAY_AGG(DISTINCT g.genre_name) AS genre_names,
                    ARRAY_AGG(DISTINCT g.genre_id) AS genre_ids
                FROM 
                    public.dim_artists a
                LEFT JOIN 
                    public.bridge_artist_genres ag ON a.artist_id = ag.artist_id
                LEFT JOIN 
                    public.dim_genres g ON ag.genre_id = g.genre_id
                GROUP BY 
                    a.artist_id, a.name, a.popularity, a.is_verified, a.at_peak
            )
            SELECT 
                'fact_' || artist_id AS fact_id,
                artist_id,
                -- Genre diversity (normalized 0-1 scale)
                LEAST(genre_count / 10.0, 1.0) AS genre_diversity_score,
                -- Average popularity (0-100 scale)
                popularity AS avg_artist_popularity,
                -- Verified ratio (1 if verified, 0 otherwise)
                CASE WHEN is_verified THEN 1.0 ELSE 0.0 END AS verified_artist_ratio,
                -- Placeholder prediction accuracy (replace with real calculation)
                0.7 + RANDOM() * 0.25 AS listen_prediction_accuracy,
                -- Count of peak performers (1 if at peak, 0 otherwise)
                CASE WHEN at_peak THEN 1 ELSE 0 END AS artists_at_peak_count,
                -- Array of genre names
                genre_names AS genres,
                -- Array of genre IDs
                genre_ids AS genre_ids
            FROM 
                artist_stats
            """
            
            # Execute and fetch all rows
            result = conn.execute(query)
            rows = result.fetchall()
            
            # Insert into fact table
            for row in rows:
                conn.execute(
                    """INSERT INTO public.fact_artistperformance
                    (fact_id, artist_id, genre_diversity_score, avg_artist_popularity,
                     verified_artist_ratio, listen_prediction_accuracy,
                     artists_at_peak_count, genres)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
                    row[:8]  # Exclude genre_ids which we're not inserting
                )
            
            print(f"Inserted {len(rows)} records into fact_artistperformance")
            
            # Update with real metrics if available (optional)
            update_real_metrics(conn)
    
    except Exception as e:
        print(f"Error populating fact table: {e}")
        raise
    finally:
        engine.dispose()


In [10]:
def update_real_metrics(conn):
    """Optional: Update with real metrics from tracks/streams if available"""
    try:
        # Example: Update prediction accuracy based on actual streams
        conn.execute("""
            UPDATE public.fact_artistperformance f
            SET listen_prediction_accuracy = subq.accuracy_score
            FROM (
                SELECT 
                    a.artist_id,
                    -- Replace with your actual accuracy calculation
                    CASE 
                        WHEN AVG(s.streams) > 1000000 THEN 0.9
                        WHEN AVG(s.streams) > 100000 THEN 0.8
                        ELSE 0.6
                    END AS accuracy_score
                FROM 
                    public.dim_artists a
                JOIN 
                    public.fact_streams s ON a.artist_id = s.artist_id
                GROUP BY 
                    a.artist_id
            ) subq
            WHERE f.artist_id = subq.artist_id
        """)
        print("Updated fact table with real metrics")
    except Exception as e:
        print(f"Couldn't update real metrics (maybe tables don't exist yet): {e}")

if __name__ == "__main__":
    calculate_artist_performance()

Inserted 1048575 records into fact_artistperformance
Couldn't update real metrics (maybe tables don't exist yet): (psycopg2.errors.UndefinedTable) relation "public.fact_streams" does not exist
LINE 16:                     public.fact_streams s ON a.artist_id = s...
                             ^

[SQL: 
            UPDATE public.fact_artistperformance f
            SET listen_prediction_accuracy = subq.accuracy_score
            FROM (
                SELECT 
                    a.artist_id,
                    -- Replace with your actual accuracy calculation
                    CASE 
                        WHEN AVG(s.streams) > 1000000 THEN 0.9
                        WHEN AVG(s.streams) > 100000 THEN 0.8
                        ELSE 0.6
                    END AS accuracy_score
                FROM 
                    public.dim_artists a
                JOIN 
                    public.fact_streams s ON a.artist_id = s.artist_id
                GROUP BY 
                    a.arti

In [17]:
streams_df = artist_df = pd.read_csv('cleaned data/cleaned_spotify_streams.csv', encoding='latin-1')
streams_df.head()

Unnamed: 0,Artist,Title,Streams,Daily
0,The Weeknd,Blinding Lights,3783983806,1736378
1,Ed Sheeran,Shape of You,3616649759,1135140
2,Lewis Capaldi,Someone You Loved,2958875491,1522093
3,Post Malone,Sunflower,2898311408,1871177
4,Tones And I,Dance Monkey,2896024418,666818


In [22]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

def calculate_artist_performance():
    """Calculate and load artist performance metrics with actual stream data"""
    engine = create_engine(
        f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
    )
    
    try:        
        # Step 2: Clean artist names in streams data to match dim_artists
        def clean_name(name):
            if pd.isna(name):
                return None
            return str(name).strip().lower()
        
        streams_df['Artist'] = streams_df['Artist'].apply(clean_name)
        
        # Step 3: Aggregate stream data by artist
        artist_streams = streams_df.groupby('Artist').agg({
            'Streams': 'sum',
            'Daily': 'mean'
        }).reset_index()
        
        # Step 4: Connect to database and process
        with engine.begin() as conn:
            # Clear existing fact data
            conn.execute("TRUNCATE TABLE public.fact_artistperformance;")
            
            # Get artist metrics with artist names cleaned
            query = """
            WITH artist_stats AS (
                SELECT 
                    a.artist_id,
                    LOWER(TRIM(a.name)) AS clean_name,
                    a.popularity,
                    a.is_verified,
                    a.at_peak,
                    COUNT(DISTINCT g.genre_id) AS genre_count,
                    ARRAY_AGG(DISTINCT g.genre_name) AS genre_names
                FROM 
                    public.dim_artists a
                LEFT JOIN 
                    public.bridge_artist_genres ag ON a.artist_id = ag.artist_id
                LEFT JOIN 
                    public.dim_genres g ON ag.genre_id = g.genre_id
                GROUP BY 
                    a.artist_id, a.name, a.popularity, a.is_verified, a.at_peak
            )
            SELECT 
                'fact_' || artist_id AS fact_id,
                artist_id,
                clean_name,
                -- Genre diversity (normalized 0-1 scale)
                LEAST(genre_count / 10.0, 1.0) AS genre_diversity_score,
                -- Average popularity (0-100 scale)
                popularity AS avg_artist_popularity,
                -- Verified ratio (1 if verified, 0 otherwise)
                CASE WHEN is_verified THEN 1.0 ELSE 0.0 END AS verified_artist_ratio,
                -- Placeholder prediction accuracy (will update later)
                0.0 AS listen_prediction_accuracy,
                -- Count of peak performers (1 if at peak, 0 otherwise)
                CASE WHEN at_peak THEN 1 ELSE 0 END AS artists_at_peak_count,
                -- Array of genre names
                genre_names AS genres
            FROM 
                artist_stats
            """
            
            # Create temporary table with all artist metrics
            fact_df = pd.read_sql(query, conn)
            
            # Merge with stream data on cleaned names
            fact_df = fact_df.merge(
                artist_streams,
                left_on='clean_name',
                right_on='Artist',
                how='left'
            )
            
            # Calculate prediction accuracy based on streams
            def calculate_accuracy(row):
                if pd.isna(row['Streams']) or pd.isna(row['Daily']):
                    return 0.7  # Default for artists without stream data
                
                # Normalize streams (example calculation)
                stream_score = min(row['Streams'] / 1000000000, 1.0)  # Cap at 1B streams
                daily_score = min(row['Daily'] / 1000000, 1.0)        # Cap at 1M daily
                
                # Weighted accuracy score
                return 0.6 + (0.3 * stream_score) + (0.1 * daily_score)
            
            fact_df['listen_prediction_accuracy'] = fact_df.apply(calculate_accuracy, axis=1)
            
            # Insert into fact table
            for _, row in fact_df.iterrows():
                conn.execute(
                    """INSERT INTO public.fact_artistperformance
                    (fact_id, artist_id, genre_diversity_score, avg_artist_popularity,
                     verified_artist_ratio, listen_prediction_accuracy,
                     artists_at_peak_count, genres)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
                    (
                        row['fact_id'],
                        row['artist_id'],
                        row['genre_diversity_score'],
                        row['avg_artist_popularity'],
                        row['verified_artist_ratio'],
                        row['listen_prediction_accuracy'],
                        row['artists_at_peak_count'],
                        row['genres']
                    )
                )
            
            print(f"Successfully loaded {len(fact_df)} records into fact_artistperformance")
            print(f"Artists with stream data: {fact_df['Streams'].notna().sum()}")
    
    except Exception as e:
        print(f"Error populating fact table: {e}")
        raise
    finally:
        engine.dispose()

if __name__ == "__main__":
    calculate_artist_performance()

Successfully loaded 1048575 records into fact_artistperformance
Artists with stream data: 1071
