# Last.fm ETL Pipeline

This notebook implements an Extract, Transform, Load (ETL) pipeline for Last.fm listening data.
It collects data from the Last.fm API and stores it in a structured SQLite database for analysis.

## Overview
- **Extract**: Pull data from Last.fm API (listening history, artists, albums, tracks, tags)
- **Transform**: Normalize data and prepare relationships
- **Load**: Store in a relational database with proper schema
- **Analyze**: Query the database for previews of created tables



## Configuration & Dependencies

This section sets up the necessary libraries, API credentials, and configuration parameters. If required, change configuration constants (recent_tracks_limit, top_items_limit,recent_pages,time_periods) in the `config.py` file.


In [1]:
# Standard library imports
import os
import sys
import json
import time
import logging
from datetime import datetime

# Third-party imports
import importlib
import matplotlib.pyplot as plt
import pandas as pd
import requests
import seaborn as sns
import sqlite3
from tqdm.notebook import tqdm
from google.colab import drive

In [2]:
# Mount Google Drive (must happen before path setup)
drive.mount('/content/drive', force_remount=True)

# Path setup
drive_path = "/content/drive/MyDrive/Colab-Notebooks/last-fm-data"  # **REPLACE WITH YOUR ACTUAL PATH**
os.makedirs(drive_path, exist_ok=True)  # Create the directory if it doesn't exist
print(f"Drive path: {drive_path}")

# Create a data subfolder for database and logs
data_path = os.path.join(drive_path, "data")
os.makedirs(data_path, exist_ok=True)  # Create the data directory if it doesn't exist
print(f"Data path: {data_path}")

# Create config folder if it doesn't exist
config_path = os.path.join(drive_path, "config")
os.makedirs(config_path, exist_ok=True)
print(f"Config path: {config_path}")

# Add drive_path to system path for module imports
sys.path.append(drive_path)

# Set a default value for BACKUP_TO_DRIVE
BACKUP_TO_DRIVE = True

Mounted at /content/drive
Drive path: /content/drive/MyDrive/Colab-Notebooks/last-fm-data
Data path: /content/drive/MyDrive/Colab-Notebooks/last-fm-data/data
Config path: /content/drive/MyDrive/Colab-Notebooks/last-fm-data/config


In [3]:
# Define file paths
DB_PATH = os.path.join(data_path, "lastfm_data.db")
LOGS_PATH = os.path.join(data_path, "lastfm_etl.log")
SCHEMA_PATH = os.path.join(config_path, "schema.sql")

# Local module imports - must come after path setup
try:
    from utils.lastfm_api import LastFMAPI
    from utils.data_collector import DataCollector
    from utils.database_helper import DatabaseHelper
    # import lastfm username and API key
    from config.config import LASTFM_API_KEY, USERNAME, COLLECTION_SETTINGS
    print("Successfully imported all required modules")
except ImportError as e:
    print(f"Error importing modules: {e}")
    print("Make sure the python files are in the correct location!")

Successfully imported all required modules


In [4]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    force=True,  #  Add force=True
    handlers=[
        logging.FileHandler(LOGS_PATH),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger('lastfm_etl')

logger.info("Logger is now active!")

2025-05-08 15:33:55,232 - INFO - Logger is now active!


In [5]:
# Function to recreate the database with the updated schema
def recreate_database():
    """Recreate the database with the updated schema."""
    logger.info("Recreating database with updated schema...")

    # Check if database exists and create a backup if it does
    if os.path.exists(DB_PATH):
        # Create a backup with timestamp
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_path = os.path.join(data_path, f"lastfm_data_backup_{timestamp}.db")

        # Copy the existing database to backup
        try:
            import shutil
            shutil.copy2(DB_PATH, backup_path)
            logger.info(f"Created backup of existing database at {backup_path}")
        except Exception as e:
            logger.warning(f"Could not create database backup: {e}")

        # Delete the existing database
        try:
            os.remove(DB_PATH)
            logger.info(f"Deleted existing database: {DB_PATH}")
        except Exception as e:
            logger.error(f"Failed to delete existing database: {e}")
            return False

    # Initialize database with the updated schema
    db_helper = DatabaseHelper(DB_PATH)
    success = db_helper.initialize_database(schema_file=SCHEMA_PATH)

    if success:
        logger.info("Database successfully recreated with the new schema!")
    else:
        logger.error("Failed to recreate database.")

    return success

# Ask the user if they want to recreate the database
recreate_db = input("Do you want to recreate the database with the updated schema? (y/n): ")
if recreate_db.lower() == 'y':
    recreate_database()

Do you want to recreate the database with the updated schema? (y/n): y
2025-05-08 15:34:00,082 - INFO - Recreating database with updated schema...
Database initialized successfully
2025-05-08 15:34:00,765 - INFO - Database successfully recreated with the new schema!


In [6]:
def backup_to_drive():
    """Backup database and logs to Google Drive"""
    if not BACKUP_TO_DRIVE:
        return

    try:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        # Copy database
        if os.path.exists(DB_PATH):
            backup_db_path = os.path.join(data_path, f"data_{timestamp}.db")
            os.system(f"cp {DB_PATH} '{backup_db_path}'")
            logger.info(f"Database backed up to: {backup_db_path}")

        # Copy logs
        if os.path.exists(LOGS_PATH):
            backup_log_path = os.path.join(data_path, f"lastfm_etl_{timestamp}.log")
            os.system(f"cp {LOGS_PATH} '{backup_log_path}'")
            logger.info(f"Logs backed up to: {backup_log_path}")

    except Exception as e:
        logger.error(f"Failed to backup files to Google Drive: {e}")

## Pipeline Execution

Running the complete ETL pipeline to collect Last.fm data.

In [7]:
def run_etl_pipeline():
    """Run the complete ETL pipeline"""
    start_time = time.time()
    logger.info("Starting Last.fm ETL pipeline")

    try:
        # Initialize API and collector
        lastfm_api = LastFMAPI(LASTFM_API_KEY)
        collector = DataCollector(lastfm_api, USERNAME)
        db_helper = DatabaseHelper(DB_PATH)
        print(db_helper)  # Print the object
        print(type(db_helper))

        # Initialize database with schema
        logger.info("Initializing database...")
        db_helper.initialize_database(schema_file=SCHEMA_PATH)

        # Collect data from Last.fm API
        logger.info("Collecting data from Last.fm API...")
        collected_data = collector.collect_library_data(
            recent_tracks_limit=COLLECTION_SETTINGS['recent_tracks_limit'],
            top_items_limit=COLLECTION_SETTINGS['top_items_limit'],
            recent_pages=COLLECTION_SETTINGS['recent_pages'],
            time_periods=COLLECTION_SETTINGS['time_periods']
        )
        #  Enhanced Debugging: Inspect collected_data
        logger.info("--- Debugging collected_data ---")
        for key, value in collected_data.items():
            logger.info(f"Key: {key}, Type: {type(value)}, Length: {len(value) if isinstance(value, list) else 'N/A'}")
            if key in ['albums', 'tracks']:  #  Focus on albums and tracks
                for item in value[:2]:  #  Print the first 2 items as examples
                    logger.info(f"  Example {key[:-1]}: {item}")


        # Process and load data into database
        logger.info("Inserting collected data into database...")
        stats = db_helper.process_collected_data(collected_data)

        if stats:
            logger.info("ETL pipeline completed successfully!")
            logger.info(f"Inserted: {stats['artists']} artists, {stats['albums']} albums, "
                        f"{stats['tracks']} tracks, {stats['tags']} tags, "
                        f"{stats['history_items']} listening history records")
        else:
            logger.error("Failed to process data")

        # Backup to Google Drive
        if BACKUP_TO_DRIVE:
            backup_to_drive()

        # Calculate runtime
        runtime = time.time() - start_time
        logger.info(f"Total runtime: {runtime:.2f} seconds ({runtime/60:.2f} minutes)")
        return True

    except Exception as e:
        logger.error(f"ETL pipeline failed: {e}")
        # Try to backup any data collected to this point
        try:
            if 'collected_data' in locals():
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                error_data_path = os.path.join(data_path, f"failed_run_data_{timestamp}.json")
                with open(error_data_path, 'w') as f:
                    json.dump(collected_data, f)
                logger.info(f"Partial data saved to {error_data_path}")
        except Exception as backup_error:
            logger.error(f"Failed to save partial data: {backup_error}")
        return False
    finally: # Ensure this block runs even if there's an error
        # Close the handlers
        for handler in logger.handlers[:]:  # Iterate over a copy of the handlers list
            handler.close()
            logger.removeHandler(handler)  # Clean up the logger as well

In [8]:
print("\n" + "="*60)
print("       LAST.FM DATA ETL PIPELINE - STARTING EXECUTION       ")
print("="*60 + "\n")

# Check if all required classes are defined
required_classes = ['LastFMAPI', 'DataCollector', 'DatabaseHelper']
missing_classes = [cls for cls in required_classes if cls not in globals()]

if missing_classes:
    print(f"ERROR: Missing required class definitions: {', '.join(missing_classes)}")
    print("Please make sure you've run the cells defining these classes first.")
else:
    # Execute the ETL pipeline
    success = run_etl_pipeline()

    # Show completion message
    print("\n" + "="*60)
    if success:
        print("       ETL PIPELINE EXECUTION COMPLETED SUCCESSFULLY       ")
    else:
        print("       ETL PIPELINE EXECUTION COMPLETED WITH ERRORS       ")
    print("="*60 + "\n")


       LAST.FM DATA ETL PIPELINE - STARTING EXECUTION       

2025-05-08 15:34:17,107 - INFO - Starting Last.fm ETL pipeline
<utils.database_helper.DatabaseHelper object at 0x7ac3a299c090>
<class 'utils.database_helper.DatabaseHelper'>
2025-05-08 15:34:17,109 - INFO - Initializing database...
Database initialized successfully
2025-05-08 15:34:17,123 - INFO - Collecting data from Last.fm API...


Fetching recent tracks:   0%|          | 0/10 [00:00<?, ?it/s]

Collected 2000 listening history records
Collecting details for 659 artists...


Artists:   0%|          | 0/659 [00:00<?, ?it/s]

2025-05-08 15:57:58,533 - INFO - Slow request to tag.getInfo: 3.71 seconds
2025-05-08 16:07:47,794 - INFO - Slow request to tag.getInfo: 4.55 seconds
Collecting details for 871 albums (first pass)...


Albums:   0%|          | 0/871 [00:00<?, ?it/s]

2025-05-08 16:25:46,422 - INFO - Resource not found for album.getInfo: 404 Client Error: Not Found for url: http://ws.audioscrobbler.com/2.0/?artist=William+Byrd&album=Peaceful+Voices&method=album.getInfo&api_key=a10cb74425380c7637145ae910661728&format=json
2025-05-08 16:26:45,664 - ERROR - HTTP error for album.getInfo: 500 Server Error: Internal Server Error for url: http://ws.audioscrobbler.com/2.0/?artist=Arny+Margret&album=Day+Old+Thoughts&method=album.getInfo&api_key=a10cb74425380c7637145ae910661728&format=json
Error calling get_album_info: 500 Server Error: Internal Server Error for url: http://ws.audioscrobbler.com/2.0/?artist=Arny+Margret&album=Day+Old+Thoughts&method=album.getInfo&api_key=a10cb74425380c7637145ae910661728&format=json
2025-05-08 16:27:10,884 - INFO - Resource not found for album.getInfo: 404 Client Error: Not Found for url: http://ws.audioscrobbler.com/2.0/?artist=Kylie+Minogue&album=Natale+Vintage&method=album.getInfo&api_key=a10cb74425380c7637145ae910661728&fo

Tracks:   0%|          | 0/1666 [00:00<?, ?it/s]

2025-05-08 16:36:07,966 - INFO - Slow request to track.getTopTags: 4.95 seconds
Found 348 album references from tracks that weren't collected in first pass
Collecting details for these missing albums (second pass)...


Missing Albums:   0%|          | 0/348 [00:00<?, ?it/s]

2025-05-08 17:17:09,774 - INFO - Resource not found for album.getInfo: 404 Client Error: Not Found for url: http://ws.audioscrobbler.com/2.0/?artist=Wuicho+kun&album=Navisad&method=album.getInfo&api_key=a10cb74425380c7637145ae910661728&format=json
2025-05-08 17:18:04,869 - INFO - Resource not found for album.getInfo: 404 Client Error: Not Found for url: http://ws.audioscrobbler.com/2.0/?artist=Lori+Mechem&album=Christmas+%26+Cocktails+-+An+Intoxicating+Collection+of+Jazz+for+Holiday+Entertaining&method=album.getInfo&api_key=a10cb74425380c7637145ae910661728&format=json
2025-05-08 17:19:39,334 - INFO - Resource not found for album.getInfo: 404 Client Error: Not Found for url: http://ws.audioscrobbler.com/2.0/?artist=Domenico+Scarlatti&album=The+Ghosts+of+Hamlet%3A+Lost+Arias+from+Italian+Baroque+Operas&method=album.getInfo&api_key=a10cb74425380c7637145ae910661728&format=json
2025-05-08 17:22:01,670 - INFO - Slow request to album.getTopTags: 3.31 seconds
2025-05-08 17:22:29,072 - INFO - R

## Data Analysis Examples

Just some code to check format of the tables.

In [9]:
# Connect to the database
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

# Check which tables exist in the database
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print("Available tables in the database:")
table_names = []
for table in tables:
    table_name = table[0]
    table_names.append(table_name)
    print(f"- {table_name}")

# Print the first 5 rows of each table
print("\n" + "="*50)
print("TABLE PREVIEWS")
print("="*50)

for table_name in table_names:
    try:
        # Get column names
        cursor.execute(f"PRAGMA table_info({table_name})")
        columns = [col[1] for col in cursor.fetchall()]

        # Get first 5 rows
        query = f"SELECT * FROM {table_name} LIMIT 5"
        df = pd.read_sql_query(query, conn)

        print(f"\n{'-'*50}")
        print(f"TABLE: {table_name}")
        print(f"Columns: {', '.join(columns)}")
        print(f"Row count: {pd.read_sql_query(f'SELECT COUNT(*) FROM {table_name}', conn).iloc[0, 0]}")
        print(f"{'-'*50}")

        if not df.empty:
            print(df.head())
        else:
            print("(Table is empty)")
    except Exception as e:
        print(f"Error reading table {table_name}: {e}")

# Close the connection
conn.close()
print("\nDatabase connection closed.")

Available tables in the database:
- artists
- albums
- tracks
- tags
- artist_tags
- album_tags
- track_tags
- artist_similar
- track_similar
- user_listening_history
- user_top_artists
- user_top_albums
- user_top_tracks

TABLE PREVIEWS

--------------------------------------------------
TABLE: artists
Columns: artist_id, name, mbid, url, image_small, image_medium, image_large, listeners, playcount, bio_summary, bio_content, created_at, updated_at
Row count: 659
--------------------------------------------------
   artist_id             name                                  mbid  \
0          1  exploding heart                                         
1          2            Nouri  66e87b9b-c7fd-49c3-ba0c-00d63e86e73a   
2          3      Sofie Royer                                         
3          4          Okonski                                         
4          5     Leoš Janáček  edfced8a-01bc-4c22-96b2-da58edd6b8af   

                                                 url  