# Data Cleaning Pipeline

This notebook executes the data cleaning and merging process using the `data_cleaning` module.

In [1]:
import os 
import pandas as pd

import pathlib
import numpy as np

from tqdm import tqdm

In [2]:
from data_cleaning.artist_mapping import (
    get_unique_ids_from_column,
    get_all_combinations,
    get_artist_to_id,
    update_id_artists_with_mapping,
)
from data_cleaning.explicit_enrichment import (
    gemini_check_if_explicit,
    enrich_explicit_via_gemini,
)



In [None]:
from data_cleaning.process_charts import process_all_charts
from data_cleaning.merge import merge_data
from data_cleaning.clean_songs import (
    list_weekly_chart_files,
    extract_dates_from_filenames,
    summarize_weekly_date_gaps,
    create_song_dict,
    update_song_rows_with_dict,
    fill_with_proxy_dict_compat,
    fill_missing_from_dfs,
    prepare_df_for_parquet,
)

DATA_DIR = "data"

# files created and collected
weekly_charts_path = os.path.join(DATA_DIR, "bronze", "data")
tracks_path = os.path.join(DATA_DIR, "bronze", "tracks.csv")

# files created by merging and cleaning
songs_path = os.path.join(DATA_DIR, "silver", "combined_songs.parquet")
output_path = os.path.join(DATA_DIR, "silver", "songs_with_features.parquet")

print("Starting data processing...")
process_all_charts(weekly_charts_path, songs_path)

print("Data processing complete.")

print("Starting data merging...")
merge_data(tracks_path, songs_path, output_path)
print("Data merging complete.")


if os.path.exists(output_path):
    # read parquet
    songs = pd.read_parquet(output_path)
    print("Songs loaded successfully. ({:_} rows)".format(songs.shape[0]))
else:
    raise FileNotFoundError("Error: Output path does not exist.")



Starting data processing...
✓ File saved as: data/silver/combined_songs.parquet
Data processing complete.
Starting data merging...
✓ Merged data saved to: data/silver/songs_with_features.parquet
Data merging complete.
Songs loaded successfully. (41_995 rows)


## Verification
Check if no week was skipped during the webscraping.

In [7]:
# Example usage of verification helpers from data_cleaning.clean_songs
files = list_weekly_chart_files(weekly_charts_path)
dates = extract_dates_from_filenames(files)
summarize_weekly_date_gaps(dates)

First date: 2016-12-29
Last date: 2020-12-31
Total files: 210
Expected weeks: 210

Missing weeks:

Unexpected extra dates:


In [None]:
# Merging with kaggle introduces nan values when the track id is not in the kaggle dataframe
print(songs.isna().sum())


track_id                0
artist_names            0
track_name              0
source                  0
streams                 0
week_date               0
name                10631
popularity          10631
duration_ms         10631
explicit            10631
artists             10631
id_artists          10631
release_date        10631
danceability        10631
energy              10631
key                 10631
loudness            10631
mode                10631
speechiness         10631
acousticness        10631
instrumentalness    10631
liveness            10631
valence             10631
tempo               10631
time_signature      10631
dtype: int64


In [8]:
if "name" in songs.columns:
    # flag where name not NaN and track_name is NaN
    print("Rows where name is not included in track_name: {}/{:_}".format(songs[songs["name"].notna() & songs["track_name"].isna()].shape[0], songs.shape[0]))
    
    # Drop the "name" column as it is included in "track_name"
    songs.drop(columns=["name"], inplace=True)
    print("Column 'name' dropped successfully.")

Rows where name is not included in track_name: 0/41_995
Column 'name' dropped successfully.


## Clean songs

In [9]:
songs.columns, songs.shape

(Index(['track_id', 'artist_names', 'track_name', 'source', 'streams',
        'week_date', 'popularity', 'duration_ms', 'explicit', 'artists',
        'id_artists', 'release_date', 'danceability', 'energy', 'key',
        'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness',
        'liveness', 'valence', 'tempo', 'time_signature'],
       dtype='object'),
 (41995, 24))

In [10]:
# Create a dictionary of canonical song IDs using helper from data_cleaning.clean_songs
song_dict = create_song_dict(songs)
print(song_dict[("The Weeknd", "Blinding Lights")])

Processing rows: 100%|██████████| 41995/41995 [00:00<00:00, 56948.54it/s]

['0VjIjW4GlUZAMYd2vXMi3b', 'Republic Records', Timestamp('2020-03-20 00:00:00')]





In [11]:
# Apply the update function from data_cleaning.clean_songs
songs = update_song_rows_with_dict(songs, song_dict)

Updating songs: 100%|██████████| 41995/41995 [00:08<00:00, 4871.60it/s]

Number of songs updated: 6_458/41_995





In [None]:
# Fill missing values in columns of interest using helpers from data_cleaning.clean_songs
columns_to_fill = [
    'artist_names', 'track_name', 'source', 'duration_ms', 'explicit', 
    'popularity', 'artists', 'id_artists', 'release_date', 'danceability', 'energy', 
    'key', 'loudness', 'mode', 'speechiness', 'acousticness', 
    'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature'
]

# If we already have values for a track somewhere and in another orw it is NaN, we fill it with what we have
songs = fill_with_proxy_dict_compat(songs, columns_to_fill)


Number of rows filled: 11_069


In [13]:
print("There are still {:_} rows with NaN values".format(songs[songs.isna().any(axis=1)].shape[0]))

There are still 7_404 rows with NaN values


## Enrich still missing values

In [14]:
df_enrichment2_path = os.path.join(DATA_DIR, "bronze", "spotify_top_songs_audio_features.csv")
df_enrichment2 = pd.read_csv(df_enrichment2_path)
if "id" in df_enrichment2.columns:
    # Replace id with track_id
    df_enrichment2.rename(columns={"id": "track_id"}, inplace=True)
df_enrichment2.shape, df_enrichment2.columns 

((6513, 19),
 Index(['track_id', 'artist_names', 'track_name', 'source', 'key', 'mode',
        'time_signature', 'danceability', 'energy', 'speechiness',
        'acousticness', 'instrumentalness', 'liveness', 'valence', 'loudness',
        'tempo', 'duration_ms', 'weeks_on_chart', 'streams'],
       dtype='object'))

In [15]:
# List all dataframes in a kaggle_enrichment3_dir, then add them together to it is one big dataframe
kaggle_enrichment3_dir = os.path.join(DATA_DIR, "bronze", "kaggle_enrichment3")
import glob

# List all CSV files in the kaggle_enrichment3_dir
csv_files = glob.glob(os.path.join(kaggle_enrichment3_dir, "*.csv"))

# Read each CSV file into a DataFrame and collect them in a list
df_list = [pd.read_csv(f) for f in csv_files]

# Concatenate all DataFrames into a single big DataFrame
df_enrichment3 = pd.concat(df_list, ignore_index=True)

# Show shape and columns to confirm final structure
df_enrichment3.shape, df_enrichment3.columns


((247035, 17),
 Index(['artist_name', 'track_id', 'track_name', 'acousticness', 'danceability',
        'duration_ms', 'energy', 'instrumentalness', 'key', 'liveness',
        'loudness', 'mode', 'speechiness', 'tempo', 'time_signature', 'valence',
        'popularity'],
       dtype='object'))

In [16]:
set(df_enrichment2.columns).difference(set(columns_to_fill)), set(df_enrichment3.columns).difference(set(columns_to_fill))

({'streams', 'track_id', 'weeks_on_chart'}, {'artist_name', 'track_id'})

In [17]:
# Enrich missing values from external enrichment DataFrames using helper from data_cleaning.clean_songs
songs_gold = fill_missing_from_dfs(songs, columns_to_fill, "track_id", df_enrichment2, df_enrichment3)

Total missing values *before* processing DF: 125_826
Available columns: ['artist_names', 'track_name', 'source', 'duration_ms', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']
Size of lookup dictionary : 6513


Enriching songs: 100%|██████████| 41995/41995 [00:13<00:00, 3199.04it/s]


Available columns: ['track_name', 'duration_ms', 'popularity', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']
Size of lookup dictionary : 130989


Enriching songs: 100%|██████████| 41995/41995 [00:03<00:00, 12373.35it/s]


Total missing values *after* processing DF: 33_889


In [18]:
# save songs_gold
if "popularity" in songs_gold.columns:
    songs_gold.drop(columns=["popularity"], inplace=True)


In [20]:
# Display the number of NaN values for each column in songs_gold
songs_gold_witouht_nan = songs_gold.dropna()

print(songs_gold.isna().sum())
print(songs_gold.shape[0] - songs_gold_witouht_nan.shape[0])
print("{}/{}".format(songs_gold_witouht_nan.shape[0], songs_gold.shape[0]))


track_id               0
artist_names           0
track_name             0
source                 0
streams                0
week_date              0
duration_ms            0
explicit            6966
artists             6966
id_artists          6966
release_date        7404
danceability           0
energy                 0
key                    0
loudness               0
mode                   0
speechiness            0
acousticness           0
instrumentalness       0
liveness               0
valence                0
tempo                  0
time_signature         0
dtype: int64
7404
34591/41995


In [21]:
# Get rows with NaN values in any column for songs_gold
songs_gold_with_nan = songs_gold[songs_gold.isna().any(axis=1)][["track_id", "week_date", "explicit", "artists", "id_artists", "release_date"]]
songs_gold_with_nan


Unnamed: 0,track_id,week_date,explicit,artists,id_artists,release_date
5,25sgk305KZfyuqVBQIahim,2019-01-17,,,,
11,7wFybC8jBH3zE139OpCtpG,2019-01-17,,,,
56,0jAfdqv18goRTUxm3ilRjb,2019-01-17,,,,
57,13hvHEstJ4sNbzdroPrPI3,2019-01-17,0.0,"['Dua Lipa', 'BLACKPINK']","['6M2wZ9GZgrQXHCFfjv46we', '41MozSoPIsD1dJM0CL...",
62,2FUNBaa5DwItJtYEBgAblU,2019-01-17,,,,
...,...,...,...,...,...,...
41943,7K7MUBCnzgBAvMVW2RTWNs,2019-06-13,,,,
41951,6fxVffaTuwjgEk5h9QyRjy,2019-06-13,0.0,['Ed Sheeran'],['6eUKZXaKkcviH0Ku9w2n3V'],
41978,5s8LepdwU0THzpd0M7nLsa,2019-06-13,,,,
41986,4Sokm1cWK36H2WctWWRGf1,2019-06-13,,,,


## Save new gold

In [22]:
# We decide to drop the rows without release date as they are the same rows as teh ones without explicit, artists, id_artists and release_date

In [23]:
print(songs_gold_witouht_nan.isna().sum())

track_id            0
artist_names        0
track_name          0
source              0
streams             0
week_date           0
duration_ms         0
explicit            0
artists             0
id_artists          0
release_date        0
danceability        0
energy              0
key                 0
loudness            0
mode                0
speechiness         0
acousticness        0
instrumentalness    0
liveness            0
valence             0
tempo               0
time_signature      0
dtype: int64


In [24]:
# Prepare dataframe for parquet (handles type conversions)
songs_gold_witouht_nan_parquet = prepare_df_for_parquet(songs_gold_witouht_nan)

songs_gold_witouht_nan_parquet.to_parquet(os.path.join(DATA_DIR, "gold", "songs_with_features.parquet"), index=False)


In [26]:
# Functions moved to data_cleaning.artist_mapping module
# (get_all_combinations, get_artist_to_id, update_id_artists_with_mapping)