<a href="https://colab.research.google.com/github/ciaraadkins/my-projects/blob/main/Spotify_Top_Genres_of_2023.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Data source: [Kaggle | Most Streamed Spotify Songs 2023](https://www.kaggle.com/datasets/nelgiriyewithana/top-spotify-songs-2023)

My GDrive: [GSheets | spotify-2023](https://docs.google.com/spreadsheets/d/16ndebWq3v0DqX1BtKimhL8zqLt7WtXCbY82PQZOjcqE/edit?usp=sharing)

# **Initialization**

Initialize all the things.

In [None]:
# !pip install spotipy
!pip install unidecode



In [None]:
import time
import math
import random as rand
import numpy as np
import pandas as pd
import os
import zipfile
import re
import urllib.parse
import spotipy
import requests
import json

In [None]:
# from spotipy.oauth2 import SpotifyClientCredentials
from unidecode import unidecode
from tenacity import (retry, stop_after_attempt, wait_fixed,
                      retry_if_exception_type)
from bs4 import BeautifulSoup

In [None]:
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default

In [None]:
# musixmatch api key
os.environ['musixmatch_apikey'] = ''

In [None]:
# # https://developer.spotify.com/
# os.environ['spotify_client_id'] = ''
# os.environ['spotify_client_secret'] = ''

# # spotify credentials
# grant_type = 'client_credentials'
# body_params = {'grant_type' : grant_type}

# CLIENT_ID = os.getenv('spotify_client_id')
# CLIENT_SECRET = os.getenv('spotify_client_secret')

# auth_url = url='https://accounts.spotify.com/api/token'
# response = requests.post(auth_url, data=body_params, auth = (CLIENT_ID, CLIENT_SECRET))

# token_raw = json.loads(response.text)
# token = token_raw["access_token"]

# spotify_headers = {"Authorization": "Bearer {}".format(token)}

# client_credentials_manager = SpotifyClientCredentials(client_id=os.getenv('spotify_client_id'), client_secret=os.getenv('spotify_client_secret'))
# sp = spotipy.Spotify(client_credentials_manager = client_credentials_manager)

# **Functions**

Some helpful functions to use instead of having to repeat the same code over and over.

In [None]:
# gets gsheets file and sheet and puts it in a dataframe
def sheetsToDataFrame(fileName,sheetName):
  creds, _ = default()
  gc = gspread.authorize(creds)
  sheets_file = gc.open(fileName)

  the_sheet = sheets_file.worksheet(sheetName)
  the_sheet_rows = the_sheet.get_all_values()
  df = pd.DataFrame.from_records(the_sheet_rows)
  df.columns = df.iloc[0]
  df = df.iloc[1:]
  return df

In [None]:
# takes a dataframe and adds it to a gsheets file
def dataFrameToSheets(df,fileName,sheetName):
  df_values = df.values.tolist()

  creds, _ = default()
  gc = gspread.authorize(creds)
  sheets_file = gc.open(fileName)

  # Check if a sheet with the same name already exists
  sheet_exists = False
  for sheet in sheets_file.worksheets():
      if sheet.title == sheetName:
          sheet_exists = True
          break

  if sheet_exists:
    # Prompt user for confirmation
    confirm = input(f"A sheet with the name '{sheetName}' already exists. Are you sure you want to overwrite it? (Y/N): ")

    # wants to overwrite existing sheet
    if confirm.lower() == 'y':
      sheets_file.del_worksheet(sheets_file.worksheet(sheetName))
      print(f"Sheet '{sheetName}' successfully overwritten. Exiting...")

    # doesn't want to overwrite existing sheet
    else:
      confirm2 = input(f"Would you like to add the data to a new sheet? (Y/N): ")

      if confirm2.lower() == "y":
        sheetName = input("Enter a new sheet name: ")
        print(f"New sheet '{sheetName}' successfully created. Exiting...")

      else:
        print(f"No data written to GoogleSheets file {fileName}. Exiting...")
        return None

  # get the count of rows and columns
  num_rows, num_cols = df.shape

  # create a new sheet in the file and set it as the active worksheet
  worksheet = sheets_file.add_worksheet(title=sheetName, rows=num_rows, cols=num_cols)

  # write the DataFrame to the active worksheet
  worksheet.update([df.columns.values.tolist()] + df.values.tolist())

  # print the contents of the new sheet
  print(worksheet.get_all_values())

  return sheetName

# dataFrameToSheets(df,fileName,sheetName)

In [None]:
# hit api
@retry(stop=stop_after_attempt(3), wait=wait_fixed(0.1),
  retry=retry_if_exception_type(requests.HTTPError))
def get(url,headers=None):
  try:
    if headers is not None:
      r = requests.get(url=url, headers=headers)
    else:
      r = requests.get(url=url)
    r.raise_for_status()  # raise an error on a bad status
    return r
  except requests.HTTPError:
    print(r.status_code, r.reason)
    raise

In [None]:
# def getSpotifyDataFromID(song_id,headers):
#   # get song_title, artist_name, and song_art
#   spotify_api_url = f"https://api.spotify.com/v1/tracks/{song_id}"
#   spotify_response = get(spotify_api_url,headers)
#   spotify_data = spotify_response.json()
#   return spotify_data

In [None]:
# encode query parameters before they are added to the api string
def encodeQueryParameter(query_param):
    # Convert spaces to "+" signs
    query_param = query_param.replace(" ", "%20")
    # Encode all characters to be safe for a query string parameter
    encoded_param = urllib.parse.quote(query_param, safe='')

    return encoded_param

In [None]:
# hit the musixmatch api
def getFromMmApi(song_title,artist_name):
  # get lyrics
  # get track id from isrc
  encoded_song_title = encodeQueryParameter(song_title)
  encoded_artist_name = encodeQueryParameter(artist_name)
  mm_api_url = f"https://api.musixmatch.com/ws/1.1/track.search?apikey={os.getenv('musixmatch_apikey')}&q_track={encoded_song_title}&q_artist={encoded_artist_name}"
  print(f"mm_api_url: {mm_api_url}")
  mm_response = get(mm_api_url)
  mm_data = mm_response.json()
  return mm_data

In [None]:
# reformat api response
def getTrackInfo(mm_response):
  if mm_response['message']['header']['status_code'] != 200:
    return {
      'track_id': "NA",
      'commontrack_id': "NA",
      'artist_id': "NA",
      'track_name_mm': "NA",
      'artist_name': "NA",
      'album_name': "NA",
      'explicit': "NA",
      'genre': "NA",
      'vanity_genre': "NA"
      }
  else:
    try:
      first_track = mm_response['message']['body']['track_list'][0]['track']
    except:
      return {
      'track_id': "NA",
      'commontrack_id': "NA",
      'artist_id': "NA",
      'track_name': "NA",
      'artist_name': "NA",
      'album_name': "NA",
      'explicit': "NA",
      'genre': "NA",
      'vanity_genre': "NA"
      }
    else:
      # Use try-except blocks for each attribute
      try:
          track_id = first_track['track_id']
      except KeyError:
          track_id = "NA"

      try:
          commontrack_id = first_track['commontrack_id']
      except KeyError:
          commontrack_id = "NA"

      try:
          artist_id = first_track['artist_id']
      except KeyError:
          artist_id = "NA"

      try:
          track_name = first_track['track_name']
      except KeyError:
          track_name = "NA"

      try:
          artist_name = first_track['artist_name']
      except KeyError:
          artist_name = "NA"

      try:
          album_name = first_track['album_name']
      except KeyError:
          album_name = "NA"

      try:
          explicit = first_track['explicit']
      except KeyError:
          explicit = "NA"

      try:
          genre = first_track['primary_genres']['music_genre_list'][0]['music_genre']['music_genre_name']
      except (KeyError, IndexError):
          genre = "NA"

      try:
          vanity_genre = first_track['primary_genres']['music_genre_list'][0]['music_genre']['music_genre_vanity']
      except (KeyError, IndexError):
          vanity_genre = "NA"

      track_info = {
        'track_id': str(track_id),
        'commontrack_id': str(commontrack_id),
        'artist_id': str(artist_id),
        'track_name': track_name,
        'artist_name': artist_name,
        'album_name': album_name,
        'explicit': explicit,
        'genre': genre,
        'vanity_genre': vanity_genre
      }

      return track_info

In [None]:
# getTrackInfo just slightly altered to work better with the remaining
def getTrackInfoNAs(mm_response):
  if mm_response['message']['header']['status_code'] != 200:
    return None
  else:
    try:
      first_track = mm_response['message']['body']['track_list'][0]['track']
    except:
      return None
    else:
      # Use try-except blocks for each attribute
      try:
          track_id = first_track['track_id']
      except KeyError:
          track_id = "NA"

      try:
          commontrack_id = first_track['commontrack_id']
      except KeyError:
          commontrack_id = "NA"

      try:
          artist_id = first_track['artist_id']
      except KeyError:
          artist_id = "NA"

      try:
          track_name = first_track['track_name']
      except KeyError:
          track_name = "NA"

      try:
          artist_name = first_track['artist_name']
      except KeyError:
          artist_name = "NA"

      try:
          album_name = first_track['album_name']
      except KeyError:
          album_name = "NA"

      try:
          explicit = first_track['explicit']
      except KeyError:
          explicit = "NA"

      try:
          genre = first_track['primary_genres']['music_genre_list'][0]['music_genre']['music_genre_name']
      except (KeyError, IndexError):
          genre = "NA"

      try:
          vanity_genre = first_track['primary_genres']['music_genre_list'][0]['music_genre']['music_genre_vanity']
      except (KeyError, IndexError):
          vanity_genre = "NA"

      track_info = {
        'track_id': str(track_id),
        'commontrack_id': str(commontrack_id),
        'artist_id': str(artist_id),
        'track_name': track_name,
        'artist_name': artist_name,
        'album_name': album_name,
        'explicit': explicit,
        'genre': genre,
        'vanity_genre': vanity_genre
      }

      return track_info

# **Data Augmentation**

Pulling my kaggle data set from my gsheets file and augmenting it with data from the Musixmatch API

In [None]:
file_name = "spotify-2023"

In [None]:
data_df = sheetsToDataFrame(file_name,"original")

In [None]:
data_df.head()

In [None]:
api_resp = getFromMmApi(data_df["track_name"][13],data_df["artist(s)_name"][13])
track_obj = getTrackInfo(api_resp)

In [None]:
# Create an empty list to store all the track info dictionaries
all_track_info = []

In [None]:
# Get total number of rows
total_rows = data_df.shape[0]

# Loop through each row in the dataframe
for index, row in data_df.iterrows():
    # Print the current row number out of total rows
    print(f"Processing row {index+1} out of {total_rows}. {((index+1)/total_rows)*100:.2f}% complete.")

    # Get the track name and artist name from the current row
    track_name = row['track_name']
    artist_name = row['artist(s)_name']
    print("track_name: ",track_name)
    print("artist_name: ",artist_name)

    # Use these values to get data from the API
    api_resp = getFromMmApi(track_name, artist_name)

    # Get track info from the API response
    track_obj = getTrackInfo(api_resp)
    print(track_obj)
    print("*************************************************")

    # If the track object is not None, set its values as new columns in the current row
    if track_obj:
        for key, value in track_obj.items():
            data_df.at[index, key] = value

    # Add a delay of, let's say, 2 seconds (you can adjust this as needed)
    time.sleep(2)

# Convert the list of track info dictionaries to a dataframe
track_info_df = pd.DataFrame(all_track_info)

# Merge the original dataframe with the track info dataframe
final_df = pd.concat([data_df, track_info_df], axis=1)

In [None]:
dataFrameToSheets(final_df_renamed,file_name,"plus musixmatch")

[['track_name_mm', 'artist(s)_name', 'artist_count', 'released_year', 'released_month', 'released_day', 'in_spotify_playlists', 'in_spotify_charts', 'streams', 'in_apple_playlists', 'in_apple_charts', 'in_deezer_playlists', 'in_deezer_charts', 'in_shazam_charts', 'bpm', 'key', 'mode', 'danceability_%', 'valence_%', 'energy_%', 'acousticness_%', 'instrumentalness_%', 'liveness_%', 'speechiness_%', 'track_id', 'commontrack_id', 'artist_id', 'artist_name', 'album_name', 'explicit', 'genre', 'vanity_genre', 'track_name'], ['Seven', 'Latto, Jung Kook', '2', '2023', '7', '14', '553', '147', '141381703', '43', '263', '45', '10', '826', '125', 'B', 'Major', '80', '89', '83', '31', '0', '8', '4', '259290449', '161106040', '55671664', 'Jung Kook', 'Seven - Single', '0', 'Pop', 'Pop', 'Seven'], ['LALA', 'Myke Towers', '1', '2023', '3', '23', '1474', '48', '133716286', '48', '126', '58', '14', '382', '92', 'C#', 'Major', '71', '61', '74', '7', '0', '10', '4', '254749330', '158186413', '33349167', 

'plus musixmatch v2'

# **Handle NAs**

My attempt at handling when the API didn't return a valid response. In a future iteration I will try to improve this section in particular.

In [None]:
data_df_2 = sheetsToDataFrame(file_name,"plus-musixmatch")

In [None]:
# Loop through each row in final_df_renamed
for index, row in data_df_2.iterrows():

    # Check if 'track_name_mm' is 'NA'
    if row['track_name_mm'] == 'NA':

        print("track_name: ",row['track_name'])
        print("artist_name: ",row['artist(s)_name'])

        # Normalize track name and artist name using unidecode to handle special characters
        normalized_track_name = unidecode(row['track_name'])
        normalized_artist_name = unidecode(row['artist(s)_name'])
        print("normalized_track_name: ",normalized_track_name)
        print("normalized_artist_name: ",normalized_artist_name)

        # First, try the entire artist name string
        api_resp = getFromMmApi(normalized_track_name, normalized_artist_name)
        track_obj = getTrackInfoNAs(api_resp)

        print("track_obj 1: ",track_obj)
        print("artist_count: ",row['artist_count'])

        # If this fails, split the artist name and try each artist separately
        if not track_obj and int(row['artist_count']) > 1:
            for artist in normalized_artist_name.split(","):
                cleaned_artist = artist.strip()  # Remove spaces
                print("cleaned_artist: ",cleaned_artist)

                api_resp = getFromMmApi(normalized_track_name, cleaned_artist)
                print(f"{cleaned_artist}-api_resp: ",api_resp)
                track_obj = getTrackInfoNAs(api_resp)

                # Break if we get a result for any artist
                if track_obj:
                    break

        print("track_obj 2: ",track_obj)
         # If the track object is not None, set its values as new columns in the current row
        if track_obj:
            for key, value in track_obj.items():
                data_df_2.at[index, key] = value

        print("*************************************************")
        # Optional: Sleep to prevent hitting API rate limits
        time.sleep(2)