# Pull Spotify History

> Consonlidate history JSON files and gather metadata from the Spotify API

In [None]:
# | default_exp core

In [None]:
# | hide
from nbdev.showdoc import *

In [None]:
# | export
import pandas as pd
import re
import time
import requests
import json
import spotipy


from pathlib import Path
from typing import List, Dict, Optional
from spotipy.oauth2 import SpotifyClientCredentials
from dotenv import load_dotenv

In [None]:
pd.set_option("display.max_rows", 500)
pd.set_option("display.max_columns", 500)
pd.set_option("display.width", 1000)

In [None]:
def get_spotipy_obj():
    load_dotenv()
    return spotipy.Spotify(auth_manager=SpotifyClientCredentials())

## Extract Streaming History

> Converting History JSON Files into Pandas DF

Spotify provides a user's history in a series of JSON Files. Some years have multiple files. I wrote a simple function to consolidate this history into a Dict with each year as the key. See below


In [None]:
# | export
# |hide
def extract_streaming_history(
    data_folder: Path,  # Path to the folder containing the streaming history files
) -> Dict[str, pd.DataFrame]:  # Dictionary containing DataFrames for each year

    def get_json_files(data_folder: Path) -> List[Path]:
        """
        Get all the json files in the streaming_history folder.
        """

        json_files = []

        for file in data_folder.iterdir():
            if file.suffix == ".json":
                json_files.append(file)

        return json_files

    def extract_year_from_filename(filename: str) -> str:
        """
        Extract the year from a filename. The year should be a single year,
        not a range of years. For example, 2021-2022 should be 2021.
        """

        year = re.search(r"\d{4}", filename).group()

        return year

    year_to_df = {}

    paths = get_json_files(data_folder)

    for path in paths:
        # Extract the valid year from the filename
        year = extract_year_from_filename(path.name)
        if year is None:
            continue

        # Read the JSON file into a DataFrame
        df = pd.read_json(path)

        # Append the DataFrame to the existing DataFrame for the year, or create a new entry
        if year in year_to_df:
            year_to_df[year] = pd.concat(
                [year_to_df[year], df], ignore_index=True)
        else:
            year_to_df[year] = df

    return year_to_df

In [None]:
streaming_history = extract_streaming_history(Path("streaming_history"))

For this excercise I'm going to only include music from my history that I've considered 'played'. To do so I'll filter the data here instead of adding a column in my Database that differentiates between played & unplayed.
<br>
<br>
In a real-life scenerio I'm more hesitant to throw away information but I didn't intend to look at unplayed tracks and it wouldn't have been wasted space on my db :)

In [None]:
# | export
def clean_streaming_history(
    streaming_history,  # Dictionary containing DataFrames for each year
    # Minimum percentage of the song that must be played to be included in the analysis
    min_percent_played: float = 0.9,
) -> pd.DataFrame:  # Streaming History DataFrame
    """
    Clean the raw streaming history data, standardize column names,
    remove podcast data, remove songs that were not played to completion
    """
    clean_streaming_history = pd.DataFrame()
    for k in streaming_history.keys():
        clean_streaming_history = pd.concat(
            [clean_streaming_history, streaming_history[k]], ignore_index=True
        )

    clean_streaming_history["ts"] = pd.to_datetime(
        clean_streaming_history["ts"], utc=True
    )
    clean_streaming_history = clean_streaming_history.sort_values("ts").reset_index(
        drop=True
    )

    # Adding Data Fields for ease of use
    clean_streaming_history["month"] = clean_streaming_history["ts"].dt.month
    clean_streaming_history["year"] = clean_streaming_history["ts"].dt.year

    clean_streaming_history = clean_streaming_history.rename(
        columns={
            "master_metadata_track_name": "song",
            "master_metadata_album_artist_name": "artist",
            "master_metadata_album_album_name": "album",
            "spotify_track_uri": "URI",
        }
    )

    # Remove anything that's not a song
    clean_streaming_history = clean_streaming_history[
        ~clean_streaming_history.URI.isna()
    ]

    # Extract the track_id
    clean_streaming_history["track_id"] = [
        uri.replace("spotify:track:", "") for uri in clean_streaming_history["URI"]
    ]

    # Approixmate the song duration, add to the dataframe
    approximate_durations = (
        clean_streaming_history.loc[
            clean_streaming_history.reason_end == "trackdone", ["track_id", "ms_played"]
        ]
        .groupby("track_id")["ms_played"]
        .agg(lambda x: x.mode()[0])
        .reset_index()
    )
    approximate_durations = approximate_durations.rename(
        columns={"ms_played": "duration"}
    )
    clean_streaming_history = clean_streaming_history.merge(
        approximate_durations, on="track_id", how="left"
    )
    clean_streaming_history = clean_streaming_history[
        ~clean_streaming_history.duration.isna()
    ].reset_index(drop=True)

    # Adding percent was played and filtering by the given value
    clean_streaming_history["percent_played"] = clean_streaming_history.apply(
        lambda row: row["ms_played"] / row["duration"] if row["duration"] != 0 else 0,
        axis=1,
    )

    clean_streaming_history = clean_streaming_history[
        clean_streaming_history.percent_played >= min_percent_played
    ].reset_index(drop=True)

    return clean_streaming_history

I will be using a cutoff of 70% to differentiate between 'played' vs. 'unplayed' 
<br><br>
There wasn't a scientific approach to this, I just listen to a lot of music and trusted my intiution :)

In [None]:
clean_history = clean_streaming_history(streaming_history, 0.7)
clean_history.head(2)

## Exploring Spotify API Data

> Finding metadata that will enrich Spotify History

In [None]:
# |hide
raw_track_metadata = {}
raw_artist_metadata = {}
raw_audio_features = {}
track_metadata = {}
artist_metadata = {}
album_metadata = {}

In [None]:
sp = get_spotipy_obj()

To explore the Spotify API I will be looking at metadata from 2 of my All-Time favorite songs: <br>
- Devil in A New Dress by Kanye West (1UGD3lW3tDmgZfAVDh6w7r) <br>
- 1 Train by A$AP Rocky (7AijU6oTPGmG64uWf63Qvc) <br>

In [None]:
track_ids = ["7AijU6oTPGmG64uWf63Qvc", "1UGD3lW3tDmgZfAVDh6w7r"]
tracks = sp.tracks(track_ids)["tracks"]

In [None]:
current_track = 0
train_1 = tracks[current_track]
if train_1["id"] not in raw_track_metadata:
    raw_track_metadata[train_1["id"]] = train_1

In [None]:
# |echo: false
list(raw_track_metadata[train_1["id"]].keys())

#### Album Metadata

> Key Data Points: Label, Popularity, Album Tracks, Release Date


In [None]:
# |echo:false
list(raw_track_metadata[train_1["id"]]["album"].keys())

Since I don't want all the metadata associated with the album, I'm going to extract only the metadata I want

In [None]:
def get_album_data(track, album_metadata: Dict[str, Dict]):
    if track["album"]["id"] in album_metadata:
        return album_metadata[track["album"]["id"]]
    album = {
        "name": track["album"]["name"],
        "id": track["album"]["id"],
        "artist": ";;".join([artist["name"] for artist in track["album"]["artists"]]),
        "artist_id": ";;".join([artist["id"] for artist in track["album"]["artists"]]),
        "external_url": track["album"]["external_urls"].get("spotify"),
        "href": track["album"]["href"],
        "images": json.dumps(track["album"]["images"]),
        "release_date": track["album"]["release_date"],
        "release_date_precision": track["album"]["release_date_precision"],
        "total_tracks": track["album"]["total_tracks"],
        "type": track["album"]["type"],
        "uri": track["album"]["uri"],
    }
    album_metadata[album["id"]] = album
    return album

In [None]:
train_1_album = get_album_data(train_1, album_metadata)
train_1_album

#### Genres

> Standardizing Genres

Spotify provides genres at the Artist level and these genres can be all over the place. Let's take a look at some examples

> Jay-Z

In [None]:
jay = sp.artist("3nFkdlSjzX9mRTtwJOzDYB")
jay["genres"]

Jay-z is associated with 5 genres! When I think about Jay-Z I think East Coast Hip Hip but should that be his main genre?
<br><br>

I have 2 things 2 consider here:
- How to bucket artist into broad groups (rappers vs. rock stars)
- And how to bucket them into the sub-genres that I associate with them (east coast hip hop vs. west coast hip hop)

<br>

Let's take a look at 2 more examples:
> Pink Floyd

In [None]:
pink = sp.artist("0k17h0D3J5VfsdmQ1iZtE9")
pink["genres"]

> Khruangbin

In [None]:
khruangbin = sp.artist("2mVVjNmdjXZZDvhgQWiakk")
khruangbin["genres"]

In the case of Pink Floyd the genres that I want to associate with them is in the Spotify Response (Rock and Classic Rock). Khruangbin is a completly different story however, I don't think they should be defined as either of their genres. In MY opinion they are should be bucketed as indie. 
<br><br>
What I noticed when looking through the data is that certain genres--like rap--came through well in the Spotify data while others--like indie--did not. Since my goal is to reflect my spotify history as I see it I have to map the data.
<br><br>
In a ideal world I would have created a AI model or something of the like to map the genres. However, I encoutered this issue early in the development process while I still had many other items to build (ie. the website to display the data). Perfection is the enemy of progres, in the future I can/will create a more programattic solution to the problem but for now I just manually mapped the genres and stored the results in a csv :)

In [None]:
genre_mapping = pd.read_csv("genre_matching.csv")

In [None]:
# |hide
genre_mapping["genres"] = genre_mapping["genres"].str.lower().str.strip()
genre_mapping["main_genre"] = genre_mapping["main_genre"].str.lower().str.strip()
genre_mapping["secondary_genre"] = (
    genre_mapping["secondary_genre"].str.lower().str.strip()
)

Since I didn't want to go through all the genres (there are ≈ 1800) I went through top 300 or so and wrote a regex to match others based on certain keywords

In [None]:
def consolidate_main_genre(genre: str):
    # If Genre Contains the full string rap or the full string hip hop, return rap
    if re.search(r"\brap\b", genre) or re.search(r"\bhip hop\b", genre):
        return "rap"
    # If Genre Contains the full string rock, return rock
    elif re.search(r"\brock\b", genre):
        return "rock"
    # If Genre Contains the full string soul, return soul
    elif re.search(r"\bsoul\b", genre):
        return "soul"
    # If Genre Contains the full string pop, return pop
    elif re.search(r"\bpop\b", genre):
        return "pop"
    # If Genre Contains the full string country, return country
    elif re.search(r"\bcountry\b", genre):
        return "country"
    # If Genre Contains the full string jazz, instrumental, blues, classical, lo-fi, lofi, ambient, return the focus
    elif re.search(
        r"\bjazz\b|\binstrumental\b|\bblues\b|\bclassical\b|\blo-fi\b|\blofi\b|\bambient\b",
        genre,
    ):
        return "focus"

In [None]:
possible_genres = genre_mapping[genre_mapping.genres.isin(jay["genres"])]
possible_genres

In addition to mapping genres to a respective 'main_genre' I also added / neglected certain genres as a proper 'secondary_genre' depending on how descriptive I found them. 
<br><br>
Putting it all together

In [None]:
def consolidate_genres(genres: List[str]) -> Dict:
    possible_genres = genre_mapping[genre_mapping.genres.isin(genres)]
    main_genre = ""
    secondary_genre = ""
    if possible_genres.shape[0] > 0:
        if possible_genres["main_genre"].dropna().shape[0] > 0:
            main_genre = possible_genres["main_genre"].dropna().iloc[0]
        else:
            main_genre = consolidate_main_genre(
                possible_genres["genres"].dropna().iloc[0]
            )

        if possible_genres["secondary_genre"].dropna().shape[0] > 0:
            secondary_genre = possible_genres["secondary_genre"].dropna().iloc[0]
    return {
        "main_genre": main_genre,
        "secondary_genre": secondary_genre,
        "genres": ";;".join(genres),
    }

In [None]:
consolidate_genres(jay["genres"])

While this is a good start, there's stil one scenerio that isn't being accounted for:
- What if the artist API request doens't return ANY genres

<br>
Let's take a look at Santa Esmaralda

In [None]:
santa = sp.artist("0iGmfKLgK5eSMgHp8YgLnS")
santa["genres"]

Santa doesn't have any genres!!! 
<br><br>
The solution here is simple. While Santa doesn't have any genres HIMSELF, spotify does provide a endpoint for RELATED artist. I can find out the genres of his related artist and use this as a way to approximate Santa's genre

In [None]:
if not santa.get("genres"):
    related_artists = sp.artist_related_artists("0iGmfKLgK5eSMgHp8YgLnS")
    related_genres = {}
    for art in related_artists["artists"]:
        if art.get("genres"):
            for genre in art["genres"]:
                if genre not in related_genres:
                    related_genres[genre] = 0
                related_genres[genre] += 1
related_genres

Consolidating the genres for Santa

In [None]:
related_genres_list = list(related_genres.items())
# Sort related genres by count
related_genres_list.sort(key=lambda x: x[1], reverse=True)
related_genres_list = [x[0] for x in related_genres_list]
consolidate_genres(related_genres_list[:5])

> Proceduralizing

In [None]:
def get_artist_genres(artist) -> Dict[str, int]:
    if artist.get("genres"):
        consolidated_genres = consolidate_genres(artist["genres"])
        return {
            "genres": consolidated_genres["genres"],
            "main_genre": (
                consolidated_genres["main_genre"]
                if consolidated_genres["main_genre"]
                else consolidated_genres["genres"][-1]
            ),
            "secondary_genre": (
                consolidated_genres["secondary_genre"]
                if consolidated_genres["secondary_genre"]
                else consolidated_genres["genres"].split(";;")[0]
            ),
        }
    else:
        sp = get_spotipy_obj()
        related_artists = sp.artist_related_artists(artist["id"])
        related_genres = {}
        for art in related_artists["artists"]:
            if art.get("genres"):
                for genre in art["genres"]:
                    if genre not in related_genres:
                        related_genres[genre] = 0
                    related_genres[genre] += 1
        related_genres_list = list(related_genres.items())
        related_genres_list.sort(key=lambda x: x[1], reverse=True)
        related_genres_list = [x[0] for x in related_genres_list]
        return consolidate_genres(related_genres_list[:5])

In [None]:
get_artist_genres(jay)

#### Artist

In [None]:
# |echo: false
list(jay.keys())

In [None]:
def get_artist_data(artist_id: str, raw_artist_metadata: Dict = {}) -> Dict:
    sp = get_spotipy_obj()
    artist = sp.artist(artist_id)
    artist_genres = get_artist_genres(artist)
    if artist["id"] not in raw_artist_metadata:
        raw_artist_metadata[artist["id"]] = artist
    return {
        "id": artist["id"],
        "name": artist["name"],
        "external_url": artist["external_urls"].get("spotify"),
        "followers": artist["followers"]["total"],
        "genres": artist_genres["genres"],
        "href": artist["href"],
        "images": json.dumps(artist["images"]),
        "popularity": artist["popularity"],
        "type": artist["type"],
        "uri": artist["uri"],
        "main_genre": artist_genres["main_genre"],
        "secondary_genre": artist_genres["secondary_genre"],
    }

Getting Rocky's Record

In [None]:
rocky_id = "13ubrt8QOOCPljQ2FL1Kca"

In [None]:
rocky_record = get_artist_data(rocky_id)
rocky_record

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()