### Analyzing, Cleaning and Manipulating Datasets


In [None]:
import pandas as pd
import numpy as np

### Data Visualization Libraries

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns


### Maschine learning Libraries

In [None]:
from sklearn.linear_model import LinearRegression

### Read the Dataset

In [None]:
dataset = pd.read_csv("/Users/sophiekersten/Desktop/thesis/tracks_features.csv")


dataset.head()

In [None]:
dataset.dropna()

In [None]:
dataset['artists'].nunique()

In [None]:
dataset = dataset[
    (dataset['duration_ms'] > 60000) &  # longer than one minute
    (dataset['instrumentalness'] < 0.9) # not only instrumental songs
]
dataset

### Get Country Data from Musicbrainz - part 1 


In [None]:
import requests
import time
import json
import os
from tqdm import tqdm


all_artists = dataset['artists'].unique()


artists_part1 = all_artists[:len(all_artists)//2]
CACHE_FILE = "artist_data_part1.json"


In [None]:
def get_artist_info(artist_name):
    base_url = "https://musicbrainz.org/ws/2/artist/"
    params = {
        "query": f"artist:{artist_name}",
        "fmt": "json"
    }
    headers = {
        "User-Agent": "TrackExplorer/1.0 (sophiekersten@gmx.com)"
    }

    try:
        response = requests.get(base_url, params=params, headers=headers, timeout=10)
        if response.status_code == 200:
            data = response.json()
            if data["artists"]:
                artist_info = data["artists"][0]
                return {
                    "country": artist_info.get("country", "Unknown"),
                    "type": artist_info.get("type", "Unknown"),
                    "gender": artist_info.get("gender", "Unknown")
                }
    except requests.exceptions.Timeout:
        print(f"⏰ Timeout bei {artist_name}")
    except Exception as e:
        print(f"Fehler bei {artist_name}: {e}")

    return {
        "country": "Unknown",
        "type": "Unknown",
        "gender": "Unknown"
    }


In [None]:

for artist in tqdm(artists_part1):
    if artist not in artist_data:
        info = get_artist_info(artist)
        artist_data[artist] = info
        time.sleep(1)  

        if len(artist_data) % 500 == 0:
            with open(CACHE_FILE, "w") as f:
                json.dump(artist_data, f)


with open(CACHE_FILE, "w") as f:
    json.dump(artist_data, f)

print("part1 done and safed: 'artist_data_part1.json'")


### Get Country Data from Musicbrainz - part 2

In [None]:
import requests
import time
import json
import os
from tqdm import tqdm


all_artists = dataset['artists'].unique()


artists_part2 = all_artists[len(all_artists)//2:]
CACHE_FILE = "artist_data_part2.json"


def get_artist_info(artist_name):
    base_url = "https://musicbrainz.org/ws/2/artist/"
    params = {
        "query": f"artist:{artist_name}",
        "fmt": "json"
    }
    headers = {
        "User-Agent": "TrackExplorer/1.0 (sophiekersten@gmx.com)"
    }

    try:
        response = requests.get(base_url, params=params, headers=headers, timeout=10)
        if response.status_code == 200:
            data = response.json()
            if data["artists"]:
                artist_info = data["artists"][0]
                return {
                    "country": artist_info.get("country", "Unknown"),
                    "type": artist_info.get("type", "Unknown"),
                    "gender": artist_info.get("gender", "Unknown")
                }
    except requests.exceptions.Timeout:
        print(f"Timeout bei {artist_name}")
    except Exception as e:
        print(f"Fehler bei {artist_name}: {e}")

    return {
        "country": "Unknown",
        "type": "Unknown",
        "gender": "Unknown"
    }


if os.path.exists(CACHE_FILE):
    with open(CACHE_FILE, "r") as f:
        artist_data = json.load(f)
else:
    artist_data = {}


for artist in tqdm(artists_part2):
    if artist not in artist_data:
        info = get_artist_info(artist)
        artist_data[artist] = info
        time.sleep(1)

        if len(artist_data) % 500 == 0:
            with open(CACHE_FILE, "w") as f:
                json.dump(artist_data, f)


with open(CACHE_FILE, "w") as f:
    json.dump(artist_data, f)

print("part2 done and safed:  'artist_data_part2.json'")


## Combining part 1 & 2

In [None]:
import pandas as pd
import json
import ast


with open("artist_data_part1.json", "r") as f:
    data = json.load(f)


rows = []

for key, value in data.items():
    
    artist_list = ast.literal_eval(key)
    artist_name = artist_list[0] if artist_list else "Unknown"

    row = {
        "artist": artist_name,
        "country": value.get("country", "Unknown"),
        "type": value.get("type", "Unknown"),
        "gender": value.get("gender", "Unknown")
    }
    rows.append(row)


df_part1 = pd.DataFrame(rows)
df_part1.head()

In [None]:
import json

with open('artist_data_part2.json') as f:
    data = json.load(f)

In [None]:
import pandas as pd
import ast 

tidy_rows = []

for k, v in data.items():
    try:
        artists = ast.literal_eval(k) 
        for artist in artists:
            tidy_rows.append({
                "artist": artist,
                "gender": v.get("gender", "Unknown"),
                "country": v.get("country", "Unknown"),
                "type": v.get("type", "Unknown")
            })
    except:
        print(f"mistake {k}")

df_part2 = pd.DataFrame(tidy_rows)

In [None]:
df_all = pd.concat([df_part1, df_part2], ignore_index=True)

In [None]:
df_all.head()

In [None]:
import ast

dataset["artists"] = dataset["artists"].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)


dataset["artist"] = dataset["artists"].apply(lambda x: x[0] if isinstance(x, list) else x)


df_merged = dataset.merge(df_all, on="artist", how="left")


In [None]:
df_merged

In [None]:
filtered_df = df_merged[(df_merged["gender"] != "Unknown") & (df_merged["country"] != "Unknown")]


In [None]:
df_cleaned = filtered_df.drop_duplicates(subset=["name"])

In [None]:
len(df_cleaned)

country_counts = df_cleaned['country'].value_counts()
countries_over_500 = country_counts[country_counts > 500]
print(countries_over_500)

## Metadata

In [None]:
metadata = pd.read_csv("/Users/sophiekersten/Desktop/thesis/spotify_metadata.csv")


metadata.head()

In [None]:
df_artist_data = pd.DataFrame(artist_results)
print("rows", df_artist_data.columns.tolist())



## get popularity Score and Genre from Spotify API

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import pandas as pd
import json
import os
import time
from tqdm import tqdm
import numpy as np
import ast

# Spotify API Setup
client_id = "94dfa922c1594dceab2e83b4566f36ec"
client_secret = "d3f1d099d4294020a02e6a191a08c9a2"
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=client_id, client_secret=client_secret))

df_cleaned['artist_ids'] = df_cleaned['artist_ids'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)

df_valid = df_cleaned[df_cleaned['artist_ids'].apply(lambda x: isinstance(x, list) and len(x) > 0)].copy()
df_valid['main_artist_id'] = df_valid['artist_ids'].apply(lambda x: x[0])

chunks = np.array_split(df_valid, 4)
part_index = 3 
df_part = chunks[part_index].copy()

artist_cache_file = f"artist_data_cache_part{part_index+1}.json"
if os.path.exists(artist_cache_file):
    with open(artist_cache_file, "r") as f:
        artist_cache = json.load(f)
else:
    artist_cache = {}

unique_artist_ids = df_part['main_artist_id'].dropna().unique().tolist()
artist_ids_to_query = [aid for aid in unique_artist_ids if aid not in artist_cache]


artist_results = []
for idx, artist_id in enumerate(tqdm(artist_ids_to_query, desc=f"Part {part_index+1}")):
    artist_data = {
        "artist_id": artist_id,
        "artist_popularity": None,
        "artist_genres": []
    }

    for attempt in range(5):
        try:
            artist = sp.artist(artist_id)
            time.sleep(1.2)  
            artist_data["artist_popularity"] = artist.get("popularity", None)
            artist_data["artist_genres"] = artist.get("genres", [])
            artist_cache[artist_id] = {
                "artist_popularity": artist_data["artist_popularity"],
                "artist_genres": artist_data["artist_genres"]
            }
            break  
        except spotipy.exceptions.SpotifyException as e:
            if e.http_status == 429:
                wait_time = int(e.headers.get("Retry-After", 10))
                print(f"🔁 Rate Limit {wait_time} Sek")
                time.sleep(wait_time)
            else:
                print(f"mistake {artist_id}: {e}")
                break
        except Exception as e:
            print(f" mistake ({attempt+1}/5) bei {artist_id}: {e}")
            time.sleep(10)
    else:
        print(f" {artist_id} mistake.")

    artist_results.append(artist_data)

    if idx % 100 == 0 and idx > 0:
        print(f"💾 save after {idx} Artists...")
        with open(artist_cache_file, "w") as f:
            json.dump(artist_cache, f, indent=2)
        print("break 30s")
        time.sleep(30)


with open(artist_cache_file, "w") as f:
    json.dump(artist_cache, f, indent=2)


all_artist_data = {
    aid: {
        "artist_popularity": data.get("artist_popularity"),
        "artist_genres": data.get("artist_genres", [])
    } for aid, data in artist_cache.items()
}

df_artist_data = pd.DataFrame([
    {"main_artist_id": aid, **data} for aid, data in all_artist_data.items()
])


df_part = df_part.merge(df_artist_data, on='main_artist_id', how='left')


df_cleaned = df_cleaned.merge(
    df_part[['main_artist_id', 'artist_popularity', 'artist_genres']],
    on='main_artist_id',
    how='left'
)


print(df_part[['name', 'main_artist_id', 'artist_popularity', 'artist_genres']].head())



In [None]:
df_part.to_csv(f"df_part_{part_index+1}_with_artist_data.csv", index=False)



In [None]:
df_part4 = pd.read_csv("df_part_4_with_artist_data.csv")

df_part4

In [None]:
df_part3 = pd.read_csv("df_part_3_with_artist_data.csv")

df_part3.head()

In [None]:
df_part1 = pd.read_csv("df_part_1_with_artist_data.csv")

df_part1.head()

In [None]:
df_part2 = pd.read_csv("df_part_2_with_artist_data.csv")

df_part2.head()

## Getting the Lyrics from Genius API

In [None]:
import lyricsgenius

genius = lyricsgenius.Genius("3WzY80WxnfShX6S_gf5CVS6U7d6CmrOpueI7bm7T_RBveLSSwWoqEGOZxMRQEr1W")
genius.skip_non_songs = True
genius.excluded_terms = ["(Remix)", "(Live)"]
genius.verbose = False



In [None]:
def get_lyrics(title, artist):
    try:
        song = genius.search_song(title, artist)
        if song and song.lyrics:
            return song.lyrics
    except Exception as e:
        print(f"Fehler bei {title} - {artist}: {e}")
    return None



In [None]:
from tqdm import tqdm
import time  

lyrics_dict = {}

for i, row in tqdm(full_dataset.iterrows(), total=full_dataset.shape[0]):
    track = row['name']
    artist = row['artists'][0] if isinstance(row['artists'], list) else row['artists']
    
    if (track, artist) not in lyrics_dict:  
        lyrics = get_lyrics(track, artist)
        lyrics_dict[(track, artist)] = lyrics
        



In [None]:
def get_cached_lyrics(row):
    track = row['name']
    artist = row['artists'][0] if isinstance(row['artists'], list) else row['artists']
    return lyrics_dict.get((track, artist), None)

full_dataset['lyrics'] = full_dataset.apply(get_cached_lyrics, axis=1)


In [None]:
full_dataset

In [None]:
import lyricsgenius
import pandas as pd
from tqdm import tqdm
import time
import json
import os


genius = lyricsgenius.Genius("3WzY80WxnfShX6S_gf5CVS6U7d6CmrOpueI7bm7T_RBveLSSwWoqEGOZxMRQEr1W")
genius.skip_non_songs = True
genius.excluded_terms = ["(Remix)", "(Live)"]
genius.verbose = False


CACHE_FILE = "lyrics_cache.json"

if os.path.exists(CACHE_FILE):
    with open(CACHE_FILE, "r") as f:
        lyrics_dict = json.load(f)
  
    lyrics_dict = {tuple(eval(k)): v for k, v in lyrics_dict.items()}
else:
    lyrics_dict = {}


def get_lyrics(title, artist, retries=3, wait_time=60):
    attempt = 0
    while attempt < retries:
        try:
            song = genius.search_song(title, artist)
            if song and song.lyrics:
                return song.lyrics
            else:
                return None
        except Exception as e:
            print(f"⚡ Fehler bei {title} - {artist}: {e}")
            attempt += 1
            if attempt < retries:
                print(f"new try {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print(f" {title} - {artist}.")
                return None


for i, row in tqdm(df_cleaned.iterrows(), total=df_cleaned.shape[0]):
    track = row['name']
    artist = row['artists'][0] if isinstance(row['artists'], list) else row['artists']
    
    key = (track, artist)
    
    if key not in lyrics_dict:
        lyrics = get_lyrics(track, artist)
        lyrics_dict[key] = lyrics
        
        time.sleep(1)  
        
     
        if i % 500 == 0:
            print(f"safe {i} Songs...")
            save_cache = {str(k): v for k, v in lyrics_dict.items()}
            with open(CACHE_FILE, "w") as f:
                json.dump(save_cache, f, indent=2)


save_cache = {str(k): v for k, v in lyrics_dict.items()}
with open(CACHE_FILE, "w") as f:
    json.dump(save_cache, f, indent=2)


def get_cached_lyrics(row):
    track = row['name']
    artist = row['artists'][0] if isinstance(row['artists'], list) else row['artists']
    return lyrics_dict.get((track, artist), None)

df_cleaned['lyrics'] = df_cleaned.apply(get_cached_lyrics, axis=1)

print("lyrices succesfull")



In [None]:
df_part1 = pd.read_csv("df_part_1_with_artist_data.csv")



df_part2 = pd.read_csv("df_part_2_with_artist_data.csv")



df_part3 = pd.read_csv("df_part_3_with_artist_data.csv")



df_part4 = pd.read_csv("df_part_4_with_artist_data.csv")



In [None]:
df_artist_all = pd.concat([df_part1, df_part2, df_part3, df_part4], ignore_index=True)


In [None]:

df_artist_all['artist_genres'] = df_artist_all['artist_genres'].apply(
    lambda x: eval(x) if isinstance(x, str) else x
)

df_with_genres = df_artist_all[df_artist_all['artist_genres'].apply(lambda x: bool(x) and len(x) > 0)]

print(f"amount artist with genre {df_with_genres.shape[0]}")


In [None]:
df_with_genres.head()