In [1]:
import glob
import json
import time
from SpotifyClass import SpotifyAPI
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
import pandas as pd
from DBClass import DBconnector
from sqlalchemy import text

In [None]:
data = []
file = r'C:\Users\fabio\Downloads\my_spotify_data\Spotify Extended Streaming History\*.json'
for filename in glob.glob(file):
    with open(filename, 'r', encoding='utf-8') as f:
        # Load the JSON data from the file and extend the data list
        data.extend(json.load(f))

with open('spotify_data.json', 'w', encoding='utf-8') as out_f:
    # Write the combined data to a new JSON file
    json.dump(data, out_f, indent=2, ensure_ascii=False)
    print(f"Combined {len(data)} records into spotify_data.json")


In [None]:
sp = SpotifyAPI()
aut = sp.authentication()
def get_track_data(track_name, artist_name):
    query = f'track:{track_name} artist:{artist_name}'
    result = aut.search(q=query, type='track', limit=1)
    if result['tracks']['items']:
        track = result['tracks']['items'][0]

        track_id = track['id']
        artist_ids = [artist['id'] for artist in track['artists']]
        artist_names = [artist['name'] for artist in track['artists']]
        album_image = track['album']['images'][0]['url'] if track['album']['images'] else None
        album_id = track['album']['id']
        song_url = track['external_urls']['spotify']
        song_uri = track['uri']
        album_name = track['album']['name']
        artist_url = track['artists'][0]['external_urls']['spotify'] if track['artists'] else None
        artist_uri = track['artists'][0]['uri'] if track['artists'] else None

        # Return a dictionary with the track and artist information

        return {
            'song_id': track_id,
            'artist_ids': artist_ids,
            'artist_names': artist_names,
            'album_id': album_id,
            'song_url': song_url,
            'song_uri': song_uri,
            'artist_url':artist_url,
            'artist_uri': artist_uri,
            'album_name': album_name,
            'album_url': track['album']['external_urls']['spotify'] if track['album'] else None,
            'song_image': album_image            
        }

    else:
        return None

In [None]:
#Creates a cache to store previously fetched track data
cache = {}

In [None]:
lock = Lock()
# Grab all unique track and artist combinations from the JSON file to use it as a key to fetch it in the API
unique_keys = set(
    (t,a)
    for t,a in (
    (e['master_metadata_track_name'].lower(), e['master_metadata_album_artist_name'].lower())
    for e in data
    if e.get('master_metadata_track_name') and e.get('master_metadata_album_artist_name')
)
if len(f'track:{t} artist:{a}') <= 250 and len(f'track:{t} artist:{a}') > 0
)

def safe_fetch(key):
    track_name, artist_name = key
    # Use a lock to ensure thread-safe reading to the cache
    with lock:
        if key in cache:
            return cache[key],key
        cache[key] = None  # Initialize the cache entry to None if not found
    track_data = get_track_data(track_name, artist_name)
    # Use a lock to ensure thread-safe writing to the cache
    with lock:
        if track_data:
            cache[key] = track_data
    time.sleep(0.1)  # To avoid hitting API rate limits
    return key,track_data


#Fetch data concurrently using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
    submitted_futures = {executor.submit(safe_fetch, key): key for key in unique_keys}
    print(f'Number of unique keys: {len(unique_keys)}')
    for i,future in enumerate(as_completed(submitted_futures),1):
        try:
            key,track_data = future.result()
            if track_data:
                print(f"Found {i}/{len(unique_keys)}: {key}")
            else:
                print(f"{i}/{len(unique_keys)} Not found: {key}")
        except Exception as e:
            print(f'{i}/{len(unique_keys)} Error for {submitted_futures[future]}: {e}')


In [None]:
# Convert the cache to a DataFrame
rows = []
for (track_name, artist_name), track_data in cache.items():
    if track_data:
        rows.append({
            'track_name': track_name,
            'artist_name': artist_name,
            'song_id': track_data['song_id'],
            'album_id': track_data['album_id'],
            'song_url': track_data['song_url'],
            'song_uri': track_data['song_uri'],
            'artist_url': track_data['artist_url'],
            'artist_uri': track_data['artist_uri'],
            'album_name': track_data['album_name'],
            'album_url': track_data['album_url'],
            'song_image': track_data['song_image'],
            'artist_ids': ','.join(track_data['artist_ids']),
            'artist_names': ','.join(track_data['artist_names']),
            'artists_image': track_data['artists_image']
        })

df_dim = pd.DataFrame(rows)
print(f"Dimensions DataFrame created with {len(df_dim)} rows.")
df_fact = pd.DataFrame(data)
df_fact.rename(columns={
    'master_metadata_track_name': 'track_name',
    'master_metadata_album_artist_name': 'artist_name'
}, inplace=True)

print(f"Fact DataFrame created with {len(df_fact)} rows.")
# Normalize join keys
df_fact['track_name'] = df_fact['track_name'].fillna('').str.strip().str.lower()
df_fact['artist_name'] = df_fact['artist_name'].fillna('').str.strip().str.lower()

df_dim['track_name'] = df_dim['track_name'].str.lower()
df_dim['artist_name'] = df_dim['artist_name'].str.lower()

merged_df = pd.merge(df_fact, df_dim, on=['track_name', 'artist_name'], how='left')

In [None]:
merged_df.rename(columns={'ms_played':'duration','ts':'timestamp'}, inplace=True)

merged_df['user_id'] = 'lw22n4qev3v2uc0ugjbs6r2v3'
# Remove 'T' and 'Z', then convert to datetime
merged_df['timestamp'] = merged_df['timestamp'].str.replace('T', ' ', regex=False).str.replace('Z', '', regex=False)
merged_df['timestamp'] = pd.to_datetime(merged_df['timestamp'], format='%Y-%m-%d %H:%M:%S')

In [None]:
d_songs_df = merged_df[['song_id', 'track_name','album_url', 'duration', 'album_name', 'song_image', 'song_url', 'song_uri', 'album_id']].copy()
d_songs_df = d_songs_df.rename(columns={'track_name': 'song_name'})
d_songs_df = d_songs_df.drop_duplicates(subset=['song_id'])


In [None]:
artist_rows = []
#Prepare the artist rows to insert into the database
# This will create a DataFrame with unique artist IDs and names, along with their URLs
for _, row in merged_df.iterrows():
    artist_ids = str(row['artist_ids']) if pd.notnull(row['artist_ids']) else []
    artist_names = str(row['artist_names']) if pd.notnull(row['artist_names']) else []
    artist_ids = artist_ids.split(',') if artist_ids else []
    artist_names = artist_names.split(',') if artist_names else []
    for artist_id, artist_name in zip(artist_ids, artist_names):
        artist_rows.append({
            'artist_id': artist_id.strip(),
            'artist_name': artist_name.strip(),
            'artist_url': row['artist_url'],      # If you want to use the first artist's URL
            'artist_uri': row['artist_uri']
                            })
d_artists_df = pd.DataFrame(artist_rows).drop_duplicates(subset=['artist_id'])
#This part of the code will fetch the artist images from the Spotify API and add them to the DataFrame
# It uses a thread pool to fetch images concurrently, which speeds up the process significantly, just like the track data fetching before it.
def get_artist_image(artist_id):
    try:
        artist = aut.artist(artist_id)
        if artist and 'images' in artist and artist['images']:
            return artist['images'][0]['url']
        else:
            return None
    except Exception as e:
        print(f"Error fetching image for artist {artist_id}: {e}")
        return None
def safe_fetch_image(artist_id):
    image_url = get_artist_image(artist_id)
    time.sleep(0.3)  # To avoid hitting API rate limits
    return artist_id, image_url

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(safe_fetch_image, artist_id): artist_id for artist_id in d_artists_df['artist_id']}
    for future in as_completed(futures):
        artist_id = futures[future]
        try:
            artist_id, image_url = future.result()
            if image_url:
                print(f"Found image for artist {artist_id}")
                d_artists_df.loc[d_artists_df['artist_id'] == artist_id, 'artist_image'] = image_url
            else:
                print(f"No image found for artist {artist_id}")
        except Exception as e:
            print(f"Error for artist {artist_id}: {e}")



In [None]:
junction_rows = []
# Prepare the junction table data for insertion in the DB
for _, row in merged_df.iterrows():
    artist_ids = str(row['artist_ids']) if pd.notnull(row['artist_ids']) else ''
    artist_ids = artist_ids.split(',') if artist_ids else []
    for artist_id in artist_ids:
        junction_rows.append({
            'song_id': row['song_id'],
            'artist_id': artist_id.strip()
        })
d_songs_artists_df = pd.DataFrame(junction_rows).drop_duplicates()

In [None]:
# Create the fact table DataFrame to insert into the database
f_played_df = merged_df[['song_id', 'track_name', 'user_id', 'timestamp', 'duration']].copy()
f_played_df = f_played_df.rename(columns={'track_name': 'song_name'})
f_played_df['dt_listened'] = f_played_df['timestamp'].dt.date.astype(str)

In [None]:
#Queries to insert data into the database
d_songs_query = """INSERT IGNORE INTO d_songs (song_id, song_name, duration, album_name, song_image, song_url, song_uri, album_url, album_id)
            VALUES (:song_id, :song_name, :duration, :album_name, :song_image, :song_url, :song_uri, :album_url, :album_id)"""
d_artists_query = """INSERT IGNORE INTO d_artists (artist_id, artist_name, artist_url, artist_uri, artist_image)
            VALUES (:artist_id, :artist_name, :artist_url, :artist_uri, :artist_image)"""
d_songs_artists_query = """INSERT IGNORE INTO d_songs_artists (song_id, artist_id)
            VALUES (:song_id, :artist_id)"""
f_played_query = """INSERT IGNORE INTO f_played (song_id, song_name, user_id, timestamp, duration, dt_listened)
            VALUES (:song_id, :song_name, :user_id, :timestamp, :duration, :dt_listened)"""
DB = DBconnector()
conection = DB.get_connection()
d_songs_df = d_songs_df.where(pd.notnull(d_songs_df), None)
d_artists_df = d_artists_df.where(pd.notnull(d_artists_df), None)
d_songs_artists_df = d_songs_artists_df.where(pd.notnull(d_songs_artists_df), None)
f_played_df = f_played_df.where(pd.notnull(f_played_df), None)
with conection.begin() as conn:
    conn.execute(text(d_songs_query), d_songs_df.to_dict(orient='records'))
    conn.execute(text(d_artists_query), d_artists_df.to_dict(orient='records'))
    conn.execute(text(d_songs_artists_query), d_songs_artists_df.to_dict(orient='records'))
    conn.execute(text(f_played_query), f_played_df.to_dict(orient='records'))

In [None]:
DB = DBconnector()
conection = DB.get_connection()
with conection.begin() as conn:
    result = conn.execute(text("SELECT artist_id from d_artists WHERE artist_image IS NULL"))
    result_df = pd.DataFrame(result.fetchall(), columns=['artist_id'])
    result_df['artist_id'] = result_df['artist_id'].astype(str)
    
sp = SpotifyAPI()
aut = sp.authentication()
def get_artist_image(artist_id):
    try:
        artist = aut.artist(artist_id)
        if artist and 'images' in artist and artist['images']:
            return artist['images'][0]['url']
        else:
            return None
    except Exception as e:
        print(f"Error fetching image for artist {artist_id}: {e}")
        return None
def safe_fetch_image(artist_id):
    image_url = get_artist_image(artist_id)
    time.sleep(0.3)  # To avoid hitting API rate limits
    return artist_id, image_url

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = {executor.submit(safe_fetch_image, artist_id): artist_id for artist_id in result_df['artist_id']}
    for future in as_completed(futures):
        artist_id = futures[future]
        try:
            artist_id, image_url = future.result()
            if image_url:
                print(f"Found image for artist {artist_id}")
            else:
                print(f"No image found for artist {artist_id}")
        except Exception as e:
            print(f"Error for artist {artist_id}: {e}")
            
    

Found image for artist 03hSZAtyBlgYcfb02Nhh6q
Found image for artist 03T8GHHcCEtcfnjbP5aHLg
Found image for artist 03uMw43UVu9MsQCcHVSGjX
Found image for artist 03wlYZ0ebYVTEunPQXmfMv
Found image for artist 048LktY5zMnakWq7PTtFrz
Found image for artist 04a6mc5vUtj2pMzteToeH7
Found image for artist 04ei5kNgmDuNAydFhhIHnD
Found image for artist 04B7eODlw6ec2aFdTYzXvV
Found image for artist 04qByqUf8qEushGa1aQQ8V
Found image for artist 04gDigrS5kc9YWfZHwBETP
Found image for artist 04ujAxwTIuRd5Unjsf6nIw
Found image for artist 04svf3dGgcwEVgJ0MlV0M2
Found image for artist 05aVtfDzBvg9eVu9MAZPGD
Found image for artist 04VwrPirvx6CXRzbEjofQP
Found image for artist 05bZ3QOlK7vfflzspO0Jk5
Found image for artist 05fG473iIaoy82BF1aGhL8
Found image for artist 05mc76JwnbXcKEfgBNmBoI
Found image for artist 05mFGgWzooDQMWGNcNsM05
Found image for artist 05qCf6M7E7AxizHVmrcPqh
Found image for artist 06CW6ZcdRZc14gC6wr3bt6
Found image for artist 06dPHHGcFjSDdj2ybbf7vD
Found image for artist 06e0gXtUpvg