In [None]:
%pip install -r C:\Users\ezrag\OneDrive\Documents\GitHub\spotify-listening-data\requirements.txt



In [None]:
import os
from dotenv import load_dotenv # type: ignore

# Load environment variables from .env file
load_dotenv()

SPOTIFY_CLIENT_ID = os.getenv('SPOTIFY_CLIENT_ID')
SPOTIFY_CLIENT_SECRET = os.getenv('SPOTIFY_CLIENT_SECRET')

In [3]:
# Import necessary libraries
import pandas as pd
import json
import os
import random
from datetime import datetime, timedelta
from dotenv import load_dotenv
import threading
import queue
import requests
import time

In [4]:
# Function to get user ID from input
def get_user_id():
    user_id = input("Enter the user's ID: ").lower()
    return user_id


In [5]:
# Function to get the number of data chunks from input
def get_num_chunks():
    num_chunks = int(input("Enter the number of chunks: "))
    return num_chunks

In [6]:
# Function to read and process data from multiple JSON files
def read_and_process_data(user_id, num_chunks, base_path='wrapped_files/'):
    all_data = []
    
    for i in range(num_chunks):
        json_file = os.path.join(base_path, f'{user_id}_music_{i}.json')
        print(f"Checking for file: {json_file}")
        
        if not os.path.exists(json_file):
            print(f"File not found: {json_file}")
            continue
        
        print(f"Reading data from {json_file}")
        with open(json_file, 'r', encoding='utf-8') as file:
            data_list = json.load(file)
            all_data.extend(data_list)
    
    if not all_data:
        raise ValueError("No data files were found or all were empty.")
    
    df = pd.DataFrame(all_data)
    df['user_id'] = user_id
    df['endTime'] = pd.to_datetime(df['endTime'])
    
    print(f"Data read successfully for {len(df)} records.")
    return df


In [7]:
# Function to export data to a CSV file
def export_to_csv(df, user_id):
    csv_file = f'{user_id}_listening_data.csv'
    df.to_csv(csv_file, index=False)
    print(f"Data exported to {csv_file}")


In [8]:
def track_unique_songs(df, unique_songs_file):
    # Ensure DataFrame includes necessary columns
    required_columns = ['trackName', 'artistName', 'external_urls']
    for col in required_columns:
        if col not in df.columns:
            df[col] = None
    
    # Drop duplicates within the current DataFrame
    new_unique_songs = df[required_columns].drop_duplicates()
    print(f"Tracking {len(new_unique_songs)} unique songs.")
    
    try:
        # Attempt to load existing unique songs from the CSV file
        existing_unique_songs = pd.read_csv(unique_songs_file)
        print(f"Loaded {len(existing_unique_songs)} existing unique songs.")
    except FileNotFoundError:
        # If the file does not exist, start with an empty DataFrame
        existing_unique_songs = pd.DataFrame(columns=required_columns)
        print("No existing unique songs file found. Starting fresh.")
    
    # Combine new and existing unique songs
    combined_unique_songs = pd.concat([existing_unique_songs, new_unique_songs]).drop_duplicates()
    
    # Save the combined DataFrame to the CSV file
    combined_unique_songs.to_csv(unique_songs_file, index=False)
    print(f"Updated unique songs saved to {unique_songs_file}.")


In [9]:
# Function to get Spotify access token using client credentials
def get_spotify_access_token(client_id, client_secret):
    auth_url = 'https://accounts.spotify.com/api/token'
    auth_response = requests.post(auth_url, {
        'grant_type': 'client_credentials',
        'client_id': client_id,
        'client_secret': client_secret,
    })
    
    # Parse the authentication response and extract access token
    auth_response_data = auth_response.json()
    return auth_response_data['access_token']

In [10]:
import requests
import time

# Function to get song details from Spotify API using search query
def get_song_details(artist_name, track_name, access_token):
    search_url = 'https://api.spotify.com/v1/search'
    headers = {
        'Authorization': f'Bearer {access_token}'
    }
    params = {
        'q': f'artist:{artist_name} track:{track_name}',
        'type': 'track',
        'limit': 1
    }
    
    # Send request to Spotify API to search for the track
    response = requests.get(search_url, headers=headers, params=params)
    response_data = response.json()
    
    print(f"Search query: artist:{artist_name} track:{track_name}")
    print("Search response data:", response_data)
    
    if response_data['tracks']['items']:
        track_info = response_data['tracks']['items'][0]
        
        # Get artist details to fetch genres in batches
        artist_ids = [artist['id'] for artist in track_info['artists']]
        artist_genres = []

        batch_size = 50
        for i in range(0, len(artist_ids), batch_size):
            batch_ids = artist_ids[i:i + batch_size]
            artist_url = f"https://api.spotify.com/v1/artists?ids={','.join(batch_ids)}"
            retries = 5
            delay = 1
            
            while retries > 0:
                artist_response = requests.get(artist_url, headers=headers)
                
                if artist_response.status_code == 200:
                    try:
                        artist_data = artist_response.json()['artists']
                        for artist in artist_data:
                            if 'genres' in artist:
                                artist_genres.extend(artist['genres'])
                        break  # Exit the retry loop if successful
                    except ValueError as e:
                        print(f"Error decoding JSON for batch {batch_ids}: {e}")
                elif artist_response.status_code == 429:
                    retry_after = int(artist_response.headers.get('Retry-After', delay))
                    print(f"Rate limited. Retrying after {retry_after} seconds.")
                    time.sleep(retry_after)
                else:
                    print(f"Request failed with status code {artist_response.status_code}")
                
                retries -= 1
                time.sleep(delay)
                delay *= 2  # Exponential backoff
            
            if retries == 0:
                print(f"Failed to fetch genres for batch {batch_ids} after {retries} attempts.")
        
        # Ensure the genres list is ordered and unique
        artist_genres = sorted(set(artist_genres))
        
        # Check if album images and external URLs are present
        album_artwork = track_info['album']['images'][0]['url'] if 'images' in track_info['album'] and track_info['album']['images'] else None
        external_urls = track_info['external_urls']['spotify'] if 'external_urls' in track_info else None
        
        song_details = {
            'spotify_id': track_info['id'],
            'album': track_info['album']['name'],
            'release_date': track_info['album']['release_date'],
            'popularity': track_info['popularity'],
            'duration_ms': track_info['duration_ms'],
            'track_number': track_info['track_number'],
            'album_artwork': album_artwork,
            'external_urls': external_urls,
            'artists_involved': [artist['name'] for artist in track_info['artists']],
            'genres': artist_genres
        }
        
        print("Song details:", song_details)
        return song_details
    else:
        print("No tracks found for the given query.")
        return None


In [11]:
# Worker function to process each song in the queue
def worker_thread(queue, unique_songs, access_token, export_interval, lock, start_time):
    while not queue.empty():
        index, row = queue.get()
        if pd.notna(row['spotify_id']):
            print(f"Skipping already updated song at index {index}.")
            queue.task_done()
            continue
        
        artist_name = row['artistName']
        track_name = row['trackName']
        song_details = get_song_details(artist_name, track_name, access_token)
        
        if song_details:
            with lock:
                unique_songs.at[index, 'spotify_id'] = song_details['spotify_id']
                unique_songs.at[index, 'album'] = song_details['album']
                unique_songs.at[index, 'release_date'] = song_details['release_date']
                unique_songs.at[index, 'popularity'] = song_details['popularity']
                unique_songs.at[index, 'duration_ms'] = song_details['duration_ms']
                unique_songs.at[index, 'track_number'] = song_details['track_number']
                unique_songs.at[index, 'album_artwork'] = song_details['album_artwork']
                unique_songs.at[index, 'external_urls'] = song_details['external_urls']
                unique_songs.at[index, 'artists_involved'] = song_details['artists_involved']
                unique_songs.at[index, 'genres'] = song_details['genres']
        
        if (index + 1) % export_interval == 0:
            with lock:
                print(f"Exporting data at index {index}. Elapsed time: {time.time() - start_time:.2f} seconds.")
                unique_songs.to_csv(unique_songs_file, index=False)
        
        queue.task_done()
        print(f"Processed index {index}")

In [12]:
# Main function to update unique songs table with Spotify info using threading
def update_unique_songs(unique_songs_file='unique_songs.csv', export_interval=50):
    # Load unique songs data from CSV file
    unique_songs = pd.read_csv(unique_songs_file)
    
    # Check if the columns already exist, if not, create them
    if 'spotify_id' not in unique_songs.columns:
        unique_songs['spotify_id'] = None
    if 'album' not in unique_songs.columns:
        unique_songs['album'] = None
    if 'release_date' not in unique_songs.columns:
        unique_songs['popularity'] = None
    if 'duration_ms' not in unique_songs.columns:
        unique_songs['duration_ms'] = None
    if 'track_number' not in unique_songs.columns:
        unique_songs['track_number'] = None
    if 'album_artwork' not in unique_songs.columns:
        unique_songs['album_artwork'] = None
    if 'external_urls' not in unique_songs.columns:
        unique_songs['external_urls'] = None
    if 'artists_involved' not in unique_songs.columns:
        unique_songs['artists_involved'] = None
    if 'genre' not in unique_songs.columns:
        unique_songs['genre'] = None

    # Get Spotify access token
    access_token = get_spotify_access_token(SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET)
    
    # Create a queue and add songs to be processed
    q = queue.Queue()
    for index, row in unique_songs.iterrows():
        q.put((index, row))

    # Create a lock for thread-safe operations
    lock = threading.Lock()
    start_time = time.time()
    threads = []
    for _ in range(10):  # Adjust number of threads as needed
        thread = threading.Thread(target=worker_thread, args=(q, unique_songs, access_token, export_interval, lock, start_time))
        thread.start()
        threads.append(thread)
    
    # Wait for all threads to complete
    for thread in threads:
        thread.join()
    
    # Remove duplicates based on 'external_urls'
    unique_songs = drop_duplicates_by_external_urls(unique_songs)
    
    # Final export
    print(f"Final export. Total time taken: {time.time() - start_time:.2f} seconds.")
    unique_songs.to_csv(unique_songs_file, index=False)
    print(f"Unique songs table updated with Spotify info and saved to {unique_songs_file}.")

# Function to drop duplicates based on external_urls
def drop_duplicates_by_external_urls(data):
    """
    This function drops duplicate rows based on the 'external_urls' column.
    
    Parameters:
    data (pd.DataFrame): DataFrame containing the song data with 'external_urls' column.
    
    Returns:
    pd.DataFrame: DataFrame with duplicates removed based on 'external_urls'.
    """
    data = data.drop_duplicates(subset=['external_urls'])
    return data


In [13]:
# Function to fill in song info from unique songs database
def fill_song_info(listening_data, unique_songs):
    # Filter out rows where artistName is 'unknown'
    listening_data_filtered = listening_data[~listening_data['artistName'].str.lower().isin(['unknown', 'unknown artist'])]
    # Merge listening data with unique songs data on 'artistName' and 'trackName'
    filled_data = pd.merge(listening_data_filtered, unique_songs, on=['artistName', 'trackName'], how='left')
    return filled_data


In [14]:
# Function to read processed listening data
def read_processed_data(user_id):
    csv_file = f'{user_id}_listening_data.csv'  # Example file path, adjust as needed
    listening_data = pd.read_csv(csv_file)
    return listening_data

In [15]:
# Function to export filled listening data to a CSV file
def export_filled_data(filled_data, user_id):
    filled_csv_file = f'{user_id}_listening_data.csv'
    filled_data.to_csv(filled_csv_file, index=False)
    print(f"Filled listening data exported to {filled_csv_file}")

In [None]:
# Execute the main steps to read data, export to CSV, and track unique songs
user_id = get_user_id()
num_chunks = get_num_chunks()
base_path = '../wrapped_files/'  # Adjusting the relative path based on the notebook location
unique_songs_file = 'unique_songs.csv'

try:
    df = read_and_process_data(user_id, num_chunks, base_path)
    export_to_csv(df, user_id)
    track_unique_songs(df, unique_songs_file)

    print("Data processing complete!")
except ValueError as e:
    print(e)

In [None]:
import requests
unique_songs_file = 'unique_songs.csv'
# Execute the function to update unique songs table with Spotify info
update_unique_songs('unique_songs.csv')

Skipping already updated song at index 0.Skipping already updated song at index 1.
Skipping already updated song at index 2.
Skipping already updated song at index 3.
Skipping already updated song at index 4.
Skipping already updated song at index 5.
Skipping already updated song at index 6.
Skipping already updated song at index 7.
Skipping already updated song at index 8.
Skipping already updated song at index 9.
Skipping already updated song at index 10.
Skipping already updated song at index 11.
Skipping already updated song at index 12.
Skipping already updated song at index 13.
Skipping already updated song at index 14.
Skipping already updated song at index 15.
Skipping already updated song at index 16.
Skipping already updated song at index 17.
Skipping already updated song at index 18.
Skipping already updated song at index 19.
Skipping already updated song at index 20.
Skipping already updated song at index 21.
Skipping already updated song at index 22.
Skipping already updat

Exception in thread Thread-10 (worker_thread):
Traceback (most recent call last):
  File "C:\Users\ezrag\AppData\Local\Programs\Python\Python313\Lib\threading.py", line 1041, in _bootstrap_inner
    self.run()
    ~~~~~~~~^^
  File "c:\Users\ezrag\OneDrive\Documents\GitHub\spotify-listening-data\venv\Lib\site-packages\ipykernel\ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
    ~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "C:\Users\ezrag\AppData\Local\Programs\Python\Python313\Lib\threading.py", line 992, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\ezrag\AppData\Local\Temp\ipykernel_15740\664606600.py", line 12, in worker_thread
    song_details = get_song_details(artist_name, track_name, access_token)
  File "C:\Users\ezrag\AppData\Local\Temp\ipykernel_15740\3144728920.py", line 23, in get_song_details
    if response_data['tracks']['items']:
       ~~~~~~~~~~~~~^^^^^^^^^^
KeyError: 'tracks'


Search query: artist:405Okced track:Lifelight (Super Smash Bros. Ultimate Main Theme)
Search response data: {'tracks': {'href': 'https://api.spotify.com/v1/search?offset=0&limit=1&query=artist%3A405Okced%20track%3ALifelight%20%28Super%20Smash%20Bros.%20Ultimate%20Main%20Theme%29&type=track', 'limit': 1, 'next': None, 'offset': 0, 'previous': None, 'total': 0, 'items': []}}
No tracks found for the given query.
Processed index 476
Skipping already updated song at index 1338.
Skipping already updated song at index 1339.
Skipping already updated song at index 1340.
Search query: artist:Crystal Waters track:100% Pure Love
Search response data: {'error': {'status': 400, 'message': 'Invalid string'}}
Search query: artist:遠藤正明 track:英雄
Search response data: {'tracks': {'href': 'https://api.spotify.com/v1/search?offset=0&limit=1&query=artist%3A%E9%81%A0%E8%97%A4%E6%AD%A3%E6%98%8E%20track%3A%E8%8B%B1%E9%9B%84&type=track', 'limit': 1, 'next': None, 'offset': 0, 'previous': None, 'total': 0, 'item

In [None]:
import pandas as pd

# Load the unique songs database
unique_songs_file = 'unique_songs.csv'
unique_songs = pd.read_csv(unique_songs_file)

# Sort the database by artistName
sorted_unique_songs = unique_songs.sort_values(by='artistName')

# Save the sorted database to a new CSV file
sorted_unique_songs_file = 'sorted_unique_songs.csv'
sorted_unique_songs.to_csv(sorted_unique_songs_file, index=False)

print(f"Sorted unique songs database saved to {sorted_unique_songs_file}.")

In [None]:
# Load unique songs data
unique_songs_file = 'unique_songs.csv'
unique_songs = pd.read_csv(unique_songs_file)

# Get user ID and read processed listening data
user_id = get_user_id()
try:
    listening_data = read_processed_data(user_id)
    
    # Fill in song info from unique songs database
    filled_listening_data = fill_song_info(listening_data, unique_songs)
    
    # Export the filled listening data to a new CSV file
    export_filled_data(filled_listening_data, user_id)

    print("Data processing complete!")
except FileNotFoundError:
    print(f"Processed data file not found for user ID: {user_id}")


<h1>Analysis of Filled Listening Data</h1>




**Imports and Setup**


In [None]:
import os
import random
from io import BytesIO

import matplotlib.pyplot as plt
import noise
import numpy as np
import pandas as pd
import requests
from PIL import Image, ImageDraw, ImageFont

# Ensure all necessary packages are installed
# If not, you can install them using pip
# pip install pandas requests matplotlib Pillow noise

In [None]:
import pandas as pd # type: ignore

# Function to read filled listening data
def read_filled_listening_data(file_path):
    df = pd.read_csv(file_path)
    return df



**Color Generation Functions**


In [None]:
# Function to generate a random color
def generate_random_color():
    color = (random.randint(100, 255), random.randint(100, 255), random.randint(100, 255))
    print(f"Generated random color: {color}")
    return color

# Function to generate a color close to a given color
def generate_similar_color(color, variance=50):
    r = min(max(color[0] + random.randint(-variance, variance), 0), 255)
    g = min(max(color[1] + random.randint(-variance, variance), 0), 255)
    b = min(max(color[2] + random.randint(-variance, variance), 0), 255)
    similar_color = (r, g, b)
    print(f"Generated color similar to {color}: {similar_color}")
    return similar_color



**Abstract Background Generation**


In [None]:
# Function to generate an abstract background with dynamic colors
def generate_abstract_background(width=1080, height=1920):
    print(f"Generating abstract background of size {width}x{height}")
    start_color = generate_random_color()
    end_color = generate_similar_color(start_color)
    
    # Create a gradient based on the generated colors
    gradient = np.linspace(start_color, end_color, width).astype(int)
    gradient_cmap = plt.cm.colors.ListedColormap(gradient / 255.0)

    x = np.linspace(-5, 5, width)
    y = np.linspace(-5, 5, height)
    X, Y = np.meshgrid(x, y)
    
    Z = np.sin(X**2 + Y**2) * np.cos(Y**2 - X**2)
    
    plt.figure(figsize=(width / 100, height / 100), dpi=100)
    plt.imshow(Z, cmap=gradient_cmap, interpolation='bilinear')
    plt.axis('off')
    plt.savefig('abstract_background.png', bbox_inches='tight', pad_inches=0)
    plt.close()

    background = Image.open('abstract_background.png')
    background = background.resize((width, height))
    print("Abstract background generated and saved as 'abstract_background.png'")
    return background

# Function to generate Perlin noise
def generate_perlin_noise(width, height, scale=100, seed=random.randint(0,500)):
    print(f"Generating Perlin noise of size {width}x{height} with scale {scale} and seed {seed}")
    shape = (width, height)
    world = np.zeros(shape)
    for i in range(shape[0]):
        for j in range(shape[1]):
            world[i][j] = noise.pnoise2(i / scale, j / scale, octaves=6, persistence=0.5, lacunarity=2.0, repeatx=1024, repeaty=1024, base=seed)
    
    norm_world = (world - np.min(world)) / (np.max(world) - np.min(world))
    print("Perlin noise generated")
    return norm_world

# Function to generate an abstract background with Perlin noise
def generate_abstract_background_with_noise(width=1080, height=1920):
    print(f"Generating abstract background with Perlin noise of size {width}x{height}")
    noise_pattern = generate_perlin_noise(width, height)
    
    start_color = generate_random_color()
    end_color = generate_similar_color(start_color)
    gradient = np.linspace(start_color, end_color, width).astype(int)
    gradient_cmap = plt.cm.colors.ListedColormap(gradient / 255.0)

    plt.figure(figsize=(width / 100, height / 100), dpi=100)
    plt.imshow(noise_pattern, cmap=gradient_cmap, interpolation='bilinear')
    plt.axis('off')
    plt.savefig('abstract_background_with_noise.png', bbox_inches='tight', pad_inches=0)
    plt.close()

    background = Image.open('abstract_background_with_noise.png')
    background = background.resize((width, height))
    print("Abstract background with Perlin noise generated and saved as 'abstract_background_with_noise.png'")
    return background



**Text Drawing Function**


In [None]:
# Function to draw wrapped text
def draw_wrapped_text(draw, text, position, font, max_width, fill):
    print(f"Drawing wrapped text: {text}")
    lines = []
    words = text.split()
    while words:
        line = ''
        while words and font.getbbox(line + words[0])[2] <= max_width:
            line += (words.pop(0) + ' ')
        lines.append(line)
    y_offset = position[1]
    for line in lines:
        draw.text((position[0], y_offset), line, font=font, fill=fill)
        y_offset += font.getbbox(line)[3]  # Use getbbox for line height
    print(f"Wrapped text drawn at position {position}")
    return y_offset



**Album Art Download!**


In [None]:
# Function to fetch and download all popular album art for each artist
def download_all_album_art(df, top_artists):
    print("Downloading album art for top artists")
    album_art = {}
    for artist in top_artists:
        artist_data = df[df['artistName'] == artist]
        if artist_data.empty:
            continue
        
        art_urls = artist_data['album_artwork'].value_counts().index.tolist()
        downloaded = False
        for art_url in art_urls:
            try:
                response = requests.get(art_url)
                img = Image.open(BytesIO(response.content))
                
                img_path = os.path.join("albums", f'{artist}_album_art.jpg')
                img.save(img_path)
                
                album_art[artist] = img_path
                downloaded = True
                print(f"Downloaded album art for {artist}: {img_path}")
                break
            except Exception as e:
                print(f"Error downloading {art_url} for {artist}: {e}")
                continue
        if not downloaded:
            print(f"Could not download album art for {artist}")
    return album_art

In [None]:
from PIL import Image, ImageDraw, ImageFont

# Function to create and save layout images
def create_layout_image(title, top_artists, album_art, file_name, user_id, background):
    print(f"Creating layout image: {file_name}")
    width, height = background.size
    image = background.copy()
    draw = ImageDraw.Draw(image)

    # Define fonts
    try:
        font = ImageFont.truetype("arial.ttf", 40)
        title_font = ImageFont.truetype("arial.ttf", 60)
        user_id_font = ImageFont.truetype("arial.ttf", 30)
    except IOError:
        # In case the fonts are not available on the system
        font = ImageFont.load_default()
        title_font = ImageFont.load_default()
        user_id_font = ImageFont.load_default()
    
    # Draw title and user ID
    draw.text((width / 2, 50), title, font=title_font, fill="white", anchor="mm")
    draw.text((width / 2, 150), f"User: {user_id}", font=user_id_font, fill="white", anchor="mm")

    y_offset = 250
    x_offset = 50

    for rank, (artist, value) in enumerate(top_artists.items(), start=1):
        if artist not in album_art:
            continue
        
        art = Image.open(album_art[artist]).resize((100, 100))
        image.paste(art, (x_offset, y_offset))
        
        text = f"{rank}. {artist}: {value}"
        draw.text((x_offset + 120, y_offset + 30), text, font=font, fill="white")
        y_offset += 120

    image.save(file_name)
    print(f"Layout image saved as {file_name}")




**<h2>Data Processing Functions</h2>**


<h3>Data Preparation and Calculation Functions</h3>

In [None]:
import pandas as pd

# Function to read filled listening data
def read_filled_listening_data(file_path):
    print(f"Reading filled listening data from {file_path}")
    df = pd.read_csv(file_path)
    print("Filled listening data read successfully")
    return df

# Function to calculate percentage listened for each track
def calculate_percentage_listened(df):
    print("Calculating percentage listened for each track")
    df['percentage_listened'] = df['msPlayed'] / df['duration_ms']
    print("Percentage listened calculated")
    return df


<h3>Artist Analysis Functions</h3>

In [None]:
import pandas as pd

# Helper function to expand artist information for each row
def expand_artists_involved(df):
    print("Expanding artists involved in each track")
    artist_expanded_df = df.explode('artists_involved')
    print("Artists involved expanded successfully")
    return artist_expanded_df

# Function to calculate top listened-to artists by listening time
def top_artists_by_time(df, top_n=10):
    print(f"Calculating top {top_n} artists by listening time")
    df_expanded = expand_artists_involved(df)
    artist_time = df_expanded.groupby('artists_involved')['msPlayed'].sum().sort_values(ascending=False).head(top_n)
    artist_time_seconds = artist_time / 1000  # Convert milliseconds to seconds
    print("Top artists by listening time calculated")
    return artist_time_seconds

# Function to calculate top listened-to artists by count
def top_artists_by_count(df, top_n=10):
    print(f"Calculating top {top_n} artists by count")
    df_expanded = expand_artists_involved(df)
    artist_count = df_expanded['artists_involved'].value_counts().head(top_n)
    print("Top artists by count calculated")
    return artist_count

# Function to calculate top listened-to artists by weighted listening time
def top_artists_by_weighted_time(df, top_n=10):
    print(f"Calculating top {top_n} artists by weighted listening time")
    df_expanded = expand_artists_involved(df)
    artist_weighted_time = df_expanded.groupby('artists_involved')['percentage_listened'].sum().sort_values(ascending=False).head(top_n)
    print("Top artists by weighted listening time calculated")
    return artist_weighted_time

# Function to calculate total listening time per artist
def total_listening_time_per_artist(df, top_n=10):
    df_expanded = expand_artists_involved(df)
    artist_time = df_expanded.groupby('artists_involved')['msPlayed'].sum().sort_values(ascending=False).head(top_n)
    return artist_time

<h3>Song Analysis Functions</h3>

In [None]:
# Function to calculate top listened-to songs by play count
def top_songs_by_count(df, top_n=10):
    print(f"Calculating top {top_n} songs by count")
    song_count = df['trackName'].value_counts().head(top_n)
    print("Top songs by count calculated")
    return song_count

# Function to calculate average listening time per song
def average_listening_time_per_song(df, top_n=10):
    print(f"Calculating average listening time per song")
    song_time = df.groupby('trackName')['msPlayed'].mean().sort_values(ascending=False).head(top_n)
    print("Average listening time per song calculated")
    return song_time


<h3>Album Analysis Functions</h3>

In [None]:
# Function to calculate total listening time per album
def total_listening_time_per_album(df, top_n=10):
    print(f"Calculating total listening time per album")
    album_time = df.groupby('album')['msPlayed'].sum().sort_values(ascending=False).head(top_n)
    print("Total listening time per album calculated")
    return album_time


In [None]:
# Function to print column names
def print_column_names(df):
    print("Column names in the filled listening data:")
    for column in df.columns:
        print(column)




In [None]:
def main():
    # Generate Instagram story-sized abstract background with Perlin noise and custom colormap
    background = generate_abstract_background_with_noise(1080, 1920)

    # Load unique songs data
    unique_songs_file = 'unique_songs.csv'
    unique_songs = pd.read_csv(unique_songs_file)

    # Get user ID and construct the file path
    user_name = get_user_id()
    file_path = f'{user_name}_listening_data.csv'

    try:
        # Read the filled listening data
        filled_listening_data = read_filled_listening_data(file_path)
        
        # Calculate percentage listened for each track
        filled_listening_data = calculate_percentage_listened(filled_listening_data)
        
        # Calculate top listened-to artists
        top_artists_count = top_artists_by_count(filled_listening_data).head(5)
        top_artists_time = top_artists_by_time(filled_listening_data).head(5)
        top_artists_weighted_time = top_artists_by_weighted_time(filled_listening_data).head(5)
        
        # Combine all top artists to ensure all album art is downloaded
        all_top_artists = top_artists_count.index.union(top_artists_time.index).union(top_artists_weighted_time.index)
        
        # Download the most common album art for each artist
        album_art = download_all_album_art(filled_listening_data, all_top_artists)
        
        # Create layout images with user ID in the file name and abstract background
        create_layout_image("Top Artists by Count", top_artists_count, album_art, f"{user_name}_spotify_wrapped_top_artists_count.png", user_name, background)
        create_layout_image("Top Artists by Listening Time (minutes)", {k: v / 60 for k, v in top_artists_time.items()}, album_art, f"{user_name}_spotify_wrapped_top_artists_time.png", user_name, background)
        create_layout_image("Top Artists by Weighted Listening Time", top_artists_weighted_time, album_art, f"{user_name}_spotify_wrapped_top_artists_weighted_time.png", user_name, background)
        
        print("Data processing and layout creation complete!")
    except FileNotFoundError:
        print(f"File not found: {file_path}")

# Run the main function
if __name__ == '__main__':
    main()