In [9]:
# import dependencies
#  data packages
import pandas as pd
import numpy as np
#  web-scraping packages
from urllib.request import urlopen
from bs4 import BeautifulSoup
#  Spotify packages
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
#  misc processing packages
import re
import itertools
from pyspark.sql import SparkSession
#  config features
import config

In [10]:
# initialize spotify auth
client_credentials_manager = SpotifyClientCredentials(
    client_id = config.client_id, client_secret = config.client_secret)
sp = spotipy.Spotify(client_credentials_manager = client_credentials_manager)

In [None]:
# # setup deepnote spark integration
# ! sudo apt-get update
# ! sudo mkdir -p /usr/share/man/man1
# ! sudo apt-get install -y openjdk-11-jdk
# ! pip install pyspark

# # initialize spark session
# spark = SparkSession \
#     .builder \
#     .master("local[*]") \
#     .appName('sp_search') \
#     .getOrCreate() 
# sc = spark.sparkContext

Get:1 http://deb.debian.org/debian buster InRelease [122 kB]
Get:2 http://deb.debian.org/debian-security buster/updates InRelease [34.8 kB]
Get:3 http://deb.debian.org/debian buster-updates InRelease [56.6 kB]
Get:4 http://deb.debian.org/debian buster/main amd64 Packages [7,909 kB]
Get:5 http://deb.debian.org/debian-security buster/updates/main amd64 Packages [378 kB]
Get:6 http://deb.debian.org/debian buster-updates/main amd64 Packages [8,788 B]
Fetched 8,509 kB in 5s (1,722 kB/s)




The following additional packages will be installed:
  at-spi2-core ca-certificates-java dbus dbus-user-session
  dconf-gsettings-backend dconf-service dmsetup fonts-dejavu-extra
  glib-networking glib-networking-common glib-networking-services
  gsettings-desktop-schemas java-common libapparmor1 libargon2-1 libasound2
  libasound2-data libatk-bridge2.0-0 libatk-wrapper-java
  libatk-wrapper-java-jni libatspi2.0-0 libcap2 libcolord2 libcryptsetup12
  libdbus-1-3 libdconf1 libdevmapper1.02.1 libdrm-amdgpu

### Grab Wikipedia Data
We will be using the beautiful soup library to scrape each year's top 100 tracks. Wikipedia lists each year's top-100 list in a tabulated format with the first column being the song's rank, the second column being the track name, and the last column being the artists involved.

In [11]:
# grab first wikitab from wikipedia page
def grab_wikitab(url, cls):
    html = urlopen(url)
    soup = BeautifulSoup(html, 'html.parser')
    return soup.find('table',{'class':cls})

In [12]:
# create function that uses artist href's to grab birth info
def birth_info(href):
    wiki_home = 'https://en.wikipedia.org'
    try: infobox = grab_wikitab(wiki_home + href, 'infobox').select('td')[1:5]
    except: return None, None
    for i in infobox:
        bday = i.find('span', {'class':'bday'})
        # get doesn't default to none if find returns none
        try: origin = i.find('a').get('title')
        except: origin = None
        if bday != None or origin != None:
            return bday, origin 
    return None, None

In [13]:
# create wikipedia scrape function that grabs artists' hrefs
def wiki_refs(wikitab):
    # initialize empty list to append to
    urls = []
    for i in np.arange(2, len(wikitab), 3):
        line = wikitab[i].find('a')
        if line != None:
            href = line.get('href')
            ### we opted against adding the below bday and origin
            ### this statement would fail as it creates too many url calls
            ### and would cause a keyboard interruption
            ### if we had more time we would explore this further
            # bday, origin = birth_info(href)
            urls.append([line.get('title'), href])
    return urls

In [14]:
# create function that grabs top 100 for each year
def wiki_lists(url_prefix, min_yr, max_yr):
    # initialize empty df to append bb songs to
    df = pd.DataFrame()
    ### we were going to create a url list so we could capture
    ### artist origin and DOB info
    ### due to time constraints to solution a better way to grab
    ### these features we have limited the scope on this
    # urls = []
    # for each year open the url and extract the table as a df
    for yr in np.arange(min_yr, max_yr + 1):
        # grab wikitab
        url = f'{url_prefix}{str(yr)}'
        wikitab = grab_wikitab(url, 'wikitable')
        # create pd dataframe out of table contents
        df_yr = pd.read_html(str(wikitab))
        df_yr = pd.DataFrame(df_yr[0])
        # align columns
        df_yr.columns = ['rank','track_nm','artist_nms']
        # create year column
        df_yr['year'] = yr
        # append to general df
        df = pd.concat([df, df_yr], ignore_index = True)
        ### as stated above we no longer need the url list
        # append to list of artist hrefs
        # urls.extend(wiki_refs(wikitab.select('td')))
    return df#, urls

### Grab Spotify Data
We will use the data scraped from Wikipedia to search Spotify for as many billboard year-end top 100 tracks as we can find. Using regex and some text formatting we will try to capture as many songs as possible. After finding these songs and retrieving their Spotify track id's, we can use those id's to call their track attributes.

In [74]:
def search_term(df, artist_col_nm, track_col_nm):
    # lowercase and split using vector functions
    artist_search = df[artist_col_nm].str.lower() \
        .str.replace('[\"\']', '', regex = True) \
        .str.replace('( (feat\W|\(|and\s|featuring|,|with\s|&\s).*)', '', regex = True)
    # define track terms
    track_search = df[track_col_nm].str.replace('[\"\'\.]', '', regex = True).str.split(pat = "/").str[0]
    return artist_search, track_search

In [104]:
# create function that iterates through artists for matching
def artist_searcher(output, artist):
    # if len(output) == 0: return None, None
    # else:
    for i in output:
        artists = [re.sub('[\"\']', '', a['name'].lower()) for a in i['artists']]
        # artists = set(itertools.chain.from_iterable(artists))
        if artist in artists:
        # if set(artist.split()).isdisjoint(artists) == False:
            # try: return i[0]['id'], i[0]['artists'][0]['id']
            # except: pass#return None, None
            try: return i['id'], i['artists'][0]['id']
            except: pass
        elif set(artist.split()).intersection(set(artists)):
            try: return i['id'], i['artists'][0]['id']
            except: pass
    return None, None

In [99]:
# create search rules to optimize accuracy of spotify search selection
def search_rule(query, artist, track, year):
    ### tried limiting search but lost records
    # try: output = sp.search(q = query + ' year:' + str(year) + '-' + str(year + 2),
                            # limit = 2, type = 'track')['tracks']['items']
    # except:
    output = sp.search(q = query, type = 'track')['tracks']['items']
    # if condition is met we skip all other loops
    if len(output) >= 1:
        return output[0]['id'], output[0]['artists'][0]['id']
    # if first condition is not met we just search on song title
    else:
        output = sp.search(q = 'track:' + track, type = 'track')['tracks']['items']
        # use artist name to select appropriate title
        track_id, artist_id = artist_searcher(output, artist)
        if track_id is not None: return track_id, artist_id
        else:
            for word in track.split():
                output = sp.search(q = 'track:' + word, type = 'track')['tracks']['items']
                # use artist name to select appropriate title
            return artist_searcher(output, artist)

In [94]:
# for each song in the wiki df search spotify for track id's
# this method leverages pyspark and RDD's
# we did this to assess data ingestion time difference
def track_id_search_rdd(df, sp, artist_col, track_col, year):
    df['query'] = 'artist:' + df[artist_col] + ' track:' + df[track_col] 
    sp_df = spark.createDataFrame(df[['query', artist_col, track_col, year]])
    rdd = sp_df.rdd.map(lambda x: search_rule(x.query, x[artist_col], x[track_col], x[year]))
    df2 = rdd.toDF(['track_ids', 'artist_ids'])
    return df2.toPandas()

In [78]:
# for each song in the wiki df search spotify for track id's
# this method is only slightly less efficient than the pyspark + RDD method
# so we have decided to use this as it is somewhat more digestible code to read
def track_id_search(df, sp, artist_col, track_col, year):
    df['query'] = 'artist:' + df[artist_col] + ' track:' + df[track_col] 
    # iterate through search rules until we find a match or give up
    track_ids, artist_ids = zip(*df[['query', artist_col, track_col, year]].apply(
        lambda x: search_rule(x.query, x[artist_col], x[track_col], x[year]), axis = 1))
    return track_ids, artist_ids

In [79]:
# create function that uses track_ids to find audio features
# and uses artist_ids to grab genres
def audio_feature(df, af_id_col, genre_id_col):
    # grab track_ids to get audio features
    af_ser = df[af_id_col]
    af_df = pd.DataFrame()
    # grab artist_ids to get artist genre
    genre_ser = df[genre_id_col]
    genre_df = pd.DataFrame()
    # iterate through series and make api calls
    for i in np.arange(0, df.shape[0], 50):
        # start with audio features
        af_output = sp.audio_features(list(af_ser[i:(i + 50)]))
        af_output = [i for i in af_output if i is not None]
        af_extended = pd.DataFrame(af_output)
        af_df = pd.concat([af_df, af_extended], ignore_index = True)
        # follow with genres
        genre_output = sp.artists(list(genre_ser[i:(i + 50)]))['artists']
        genre_output = [[i['id'], i['genres'][0]] for i in genre_output if len(i['genres']) > 0]
        genre_extended = pd.DataFrame(genre_output)
        genre_df = pd.concat([genre_df, genre_extended], ignore_index = True)
    # dedup df's to avoid duplication when joining
    af_df.drop_duplicates(inplace = True)
    genre_df.drop_duplicates(inplace = True)
    # rename genre df columns
    genre_df.columns = ['artist_ids', 'genre']
    return af_df, genre_df

In [80]:
# average by year function
def avger(df, col_list, id_col):
    avg_df = df.groupby(id_col, as_index = False)[col_list].mean()
    return avg_df

In [81]:
# create melt function...drop unnecessary id columns
def melter(df, id_list, col_list, drop_list):
    if drop_list != None:
        melt_df = df.drop(labels = drop_list, axis = 1) 
    else: melt_df = df
    melt_df = melt_df.melt(id_vars = id_list, value_vars = col_list, 
        var_name = 'audio_feature', value_name = 'value')
    return melt_df

### Put it all together
We will be using the functions created above to create our data sources for the visualizations and analyses.

In [108]:
# first grab wikipedia data
df = wiki_lists(config.wiki_url_prefix, config.min_yr, config.max_yr)
# use function with regex to create search terms for spotify api
df['artist_search'], df['track_search'] = search_term(df, 'artist_nms', 'track_nm')

In [109]:
# use search terms to call track and+ artist id's
df['track_ids'], df['artist_ids'] = track_id_search(df, sp, 'artist_search', 'track_search', 'year')

In [113]:
# run pyspark equivalent search function to compare time of run
### opted out of using the pandas version (above cell) for readability
# spark_tracks, spark_artists = track_id_search_rdd(df, sp, 'artist_search', 'track_search', 'year')

In [114]:
# use the track_id's to call audio features & genre
df2 = df.dropna()
af_df, genre_df = audio_feature(df2, 'track_ids', 'artist_ids')
df3 = df2.merge(af_df, how = 'inner', left_on = 'track_ids', right_on = 'id').dropna()
df3 = df3.merge(genre_df, how = 'inner', on = 'artist_ids').dropna()

In [115]:
# make variations of df for usability in analysis and viz
col_list = ['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness',
            'instrumentalness', 'liveness', 'valence', 'tempo', 'duration_ms', 'time_signature']
drop_list = ['type', 'id', 'uri', 'track_href', 'analysis_url', 'query']
id_list = ['rank', 'track_nm', 'artist_nms', 'year', 'genre', 'artist_search', 'track_search']
long_melt = melter(df3, id_list, col_list, drop_list)
avg_df = avger(df3, col_list, 'year')
avg_melt = melter(avg_df, ['year'], col_list, None)

In [116]:
# send everything to csv's
df3.to_csv("/work/billboard_music_analysis/src/data/bb_100_feat.csv", index = False)
long_melt.to_csv("/work/billboard_music_analysis/src/data/bb_100_feat_melt.csv", index = False)
avg_df.to_csv("/work/billboard_music_analysis/src/data/avg_feat.csv", index = False)
avg_melt.to_csv("/work/billboard_music_analysis/src/data/avg_feat_melt.csv", index = False)

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=3f060e00-a163-4cce-a4c0-34e77cfdc670' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>