# Data Collection Project - Heavy Metal data from Wikipedia and Spotify

#### Libraries and Packages

In [None]:
from bs4 import BeautifulSoup
import requests
import pandas as pd
import numpy as np

In [None]:
import re

#### Scrapping Wikipedia Data: Genres and Band Names

In [None]:
# Getting URLs

source = 'https://en.wikipedia.org/wiki/Heavy_metal_genres'
response = requests.get(source)
soup = BeautifulSoup(response.text, 'html.parser')
pages = soup.find(class_='navbox-list navbox-odd')
pages = pages.findAll('a')

links = []

for page in pages:
    links.append(('List_of_' + page.get('title').lower().replace(' ','_') + '_bands').replace('_music',''))

In [None]:
def string_ajustment(band):
    """Ajustment of the retrieved band name string"""
    
    end = band.find('[') # Remove brackets from band name
    if end > -1:
        band = band[:end]
    else:
        band = band
                    
    end = band.find('(') # Remove parentesis from band name
    if end > -1:
        band = band[:end]
        
    band = band.title().rstrip() # Uppercase in first letters; last space removal
        
    return band

In [None]:
%%time

data = []
genres = []

for link in links:
    url = 'https://en.wikipedia.org/wiki/' + link
    genre = url[url.rfind('/') + 1:]
    
    list_from = ['List_of_', '_bands', ',_!–K', ',_L–Z', '_']
    list_to = ['', '', '', '', ' ']
    
    for idx, element in enumerate(list_from):
        genre = genre.replace(list_from[idx], list_to[idx])
    
    genre = genre.title()
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    
    # Table detection
    tables = []
    tables = soup.find_all('table', {'class':'wikitable'}) # 1st attempt
    if len(tables) == 0:
        tables = soup.find_all('table', {'class':'wikitable sortable'}) # 2nd attempt
    
    # Getting table data
    if len(tables) > 0: # pages with tables
        genres.append(genre)
        for table in tables:
            table = table.tbody
            rows = table.find_all('tr')
            columns = [v.text.replace('\n', '') for v in rows[0].find_all('th')]

            for i in range(1, len(rows)):
                tds = rows[i].find_all('td')
                band = tds[0].text.replace('\n', '')
                band = string_ajustment(band)
                values = [band, genre]
                    
                data.append(pd.Series(values)) # Append band
    else:
        # Getting data in lists
        groups = soup.find_all('div', class_ = 'div-col columns column-width') # Groups being lists of bands, 1st attempt
        if len(groups) == 0:
            groups = soup.find_all('table', {'class':'multicol'}) # Groups being lists of bands, 2nd attempt
        
        for group in groups:
            genres.append(genre)
            array = group.text.split('\n')[1:len(group.text.split('\n'))-1]
            
            for band in array:
                if (band != '0-9'):                    
                    band = string_ajustment(band)
                    if (band.find('Reference') > -1) or (band.find('See also') > -1): # Remove text without band name
                        break
                    elif len(band) > 1:
                        values = [band, genre]
                        data.append(pd.Series(values)) # Append band
                        
    if genre not in genres: # Two possibilities: either data in multiple urls or no data available (non-relevant genre)
        additional_links = [link  + ',_!–K', link  + ',_L–Z']
        
        for additional_link in additional_links:
            url = 'https://en.wikipedia.org/wiki/' + additional_link
            response = requests.get(url)
            soup = BeautifulSoup(response.text, 'html.parser')
            groups = soup.find_all('table', {'class':'multicol'}) # Groups being lists of bands
            
            for group in groups:
                genres.append(genre)
                array = group.text.split('\n')[1:len(group.text.split('\n'))-1]
                
                for band in array:
                    if (band != '0-9'):
                        band = string_ajustment(band)
                        
                        if (band.find('Reference') > -1) or (band.find('See also') > -1): # Remove text without band name
                            break
                        elif len(band) > 1:
                            values = [band, genre]
                            data.append(pd.Series(values)) # Append band

In [None]:
df_bands = pd.DataFrame(data)
df_bands.columns = ['Band', 'Genre']
df_bands.drop_duplicates(inplace=True)

df_bands

In [None]:
df_bands.groupby(['Genre']).count()

In [None]:
df_bands.groupby(['Band']).count()[df_bands.groupby(['Band']).count()['Genre']>=2]

In [None]:
# Amount of bands with multiple genres
print('The number of bands with multiple genres is', len(df_bands.groupby(['Band']).count()[df_bands.groupby(['Band']).count()['Genre']>=2]))
print('This is equivalent of the following percentage of the data:', len(df_bands.groupby(['Band']).count()[df_bands.groupby(['Band']).count()['Genre']>=2])/len(df_bands))

In [None]:
df_unique = pd.DataFrame(df_bands['Band'].drop_duplicates()) # Auxiliary dataframe containing only band names
df_unique

#### Spotify API Authorization

In [None]:
!pip install requests

In [None]:
import base64
import requests
import datetime
from urllib.parse import urlencode

In [None]:
client_id ='YOUR_CLIENT_ID'
client_secret = 'YOUR_CLIENT_SECRET'

In [None]:
class SpotifyAPI(object):
    access_token = None
    access_token_expires = datetime.datetime.now()
    access_token_did_expire = True
    client_id = None
    client_secret = None
    token_url = 'https://accounts.spotify.com/api/token'
    
    def __init__(self, client_id, client_secret, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client_id = client_id
        self.client_secret = client_secret
    
    def get_client_credentials(self):
        """
        Returns a base64 encoded string 
        """
        client_id = self.client_id
        client_secret = self.client_secret
        
        if (client_id == None) or (client_secret == None):
            raise Exception('You must set client_id and client secret')
        client_creds = f'{client_id}:{client_secret}'
        client_creds_b64 = base64.b64encode(client_creds.encode())
        return client_creds_b64.decode()
    
    def get_token_headers(self):
        client_creds_b64 = self.get_client_credentials()
        return {
            'Authorization': f'Basic {client_creds_b64}' # <base64 encoded client_id:client_secret>
        }
    
    def get_token_data(self):
        return {
            'grant_type': 'client_credentials'
        }
    
    def perform_auth(self):
        token_url = self.token_url
        token_data = self.get_token_data()
        token_headers = self.get_token_headers()        
        r = requests.post(token_url, data=token_data, headers=token_headers)
        if r.status_code not in range(200, 299):
            raise Exception('Could not authenticate client.')
        data = r.json()
        now = datetime.datetime.now()
        access_token = data['access_token']
        expires_in = data['expires_in'] # seconds
        expires = now + datetime.timedelta(seconds=expires_in)
        self.access_token = access_token
        self.access_token_expires = expires
        self.access_token_did_expire = expires < now
        return True
    
    def get_access_token(self):
        token = self.access_token
        expires = self.access_token_expires
        now = datetime.datetime.now()
        if expires < now:
            self.perform_auth()
            return self.get_access_token()
        elif token == None:
            self.perform_auth()
            return self.get_access_token()
        return token
    
    def get_resource_header(self):
        access_token = self.get_access_token()
        headers = {
            'Authorization': f'Bearer {access_token}'
        }
        return headers
    
    def get_resource(self, lookup_id, resource_type='albums', version='v1'):
        
        if resource_type == 'tracks':
            endpoint = f'https://api.spotify.com/{version}/albums/{lookup_id}/{resource_type}'
        elif resource_type == 'features':
            endpoint = f'https://api.spotify.com/{version}/audio-features/{lookup_id}'
        elif resource_type == 'analysis':
            endpoint = f'https://api.spotify.com/{version}/audio-analysis/{lookup_id}'
        elif resource_type == 'popularity':
            endpoint = f'https://api.spotify.com/{version}/tracks/{lookup_id}'
        elif resource_type != 'albums':
            endpoint = f'https://api.spotify.com/{version}/{resource_type}/{lookup_id}'  
        else:
            endpoint = f'https://api.spotify.com/{version}/artists/{lookup_id}/albums' # Get an Artist's Albums
            
        headers = self.get_resource_header()
        r = requests.get(endpoint, headers=headers)
        if r.status_code not in range(200, 299):
            return {}
        return r.json()
        
    def get_artist(self, _id):
        return self.get_resource(_id, resource_type='artists')
    
    def get_albums(self, _id):
        return self.get_resource(_id, resource_type='albums')
    
    def get_album_tracks(self, _id):
        return self.get_resource(_id, resource_type='tracks')
    
    def get_track_features(self, _id):
        return self.get_resource(_id, resource_type='features')

    def get_track_analysis(self, _id):
        return self.get_resource(_id, resource_type='analysis')
    
    def get_track_popularity(self, _id):
        return self.get_resource(_id, resource_type='popularity')
    
    def get_next(self, result):
        """ returns the next result given a paged result
            Parameters:
            - result - a previously returned paged result
        """
        if result['next']:
            return self.get_next_resource(result['next'])
        else:
            return None
        
    def get_next_resource(self, url):
        endpoint = url
        headers = self.get_resource_header()
        r = requests.get(endpoint, headers=headers)
        if r.status_code not in range(200, 299):
            return {}
        return r.json()
    
    def base_search(self, query_params): # search_type = spotify's type
        headers = self.get_resource_header()
        endpoint = 'https://api.spotify.com/v1/search'        
        lookup_url = f'{endpoint}?{query_params}'
        r = requests.get(lookup_url, headers=headers)
        if r.status_code not in range(200, 299):
            return {}
        return r.json()
    
    def search(self, query=None, operator=None, operator_query=None, search_type='artist'):
        if query == None:
            raise Exception('A query is required.')
        if isinstance(query, dict):
            query = ' '.join([f'{k}:{v}' for k, v in query.items()])
        if operator != None and operator_query != None:
            if (operator.lower() == 'or') or (operator.lower() == 'not'): # Operators can only be OR or NOT 
                operator = operator.upper()
                if isinstance(operator_query, str):
                    query = f'{query} {operator} {operator_query}'
        query_params = urlencode({'q': query, 'type': search_type.lower()})
        return self.base_search(query_params)

#### Spotify Data Retrieval: Bands

In [None]:
spotify = SpotifyAPI(client_id, client_secret)

In [None]:
%%time

bands_id = []
bands_popularity = []

for band in df_unique['Band']:
    id_found = False
    result = spotify.search(query=band, search_type='artist')
    items = result['artists']['items']
    
    if len(items) > 0: # Loop to check whether more than one band is in items and retrieve desired band
        i = 0
        while i < len(items):
            artist = items[i]
            if band.lower() == artist['name'].lower():
                bands_id.append(artist['id'])
                bands_popularity.append(artist['popularity'])
                id_found = True
                break
            i = i + 1
    
    if (id_found == False) or (len(items) == 0): # If band not found
        bands_id.append(np.nan)
        bands_popularity.append(np.nan)

In [None]:
df_unique['Band ID'] = bands_id
df_unique['Band Popularity'] = bands_popularity
df_unique = df_unique.dropna() # Dropping bands with uri not found
df_unique.sort_values('Band')
df_unique

#### Spotify Data Retrieval: Albums

In [None]:
%%time

# Retrieve album data

data = []
album_names = []

for band_id in df_unique['Band ID'].values:
    try:
        results = spotify.get_albums(band_id)
    except:
        pass
    else:
        i = 0
        while i < len(results['items']):
            album_name = re.sub(r'\W+', '', str(results['items'][i]['name'])).lower()
            if (album_name not in album_names) and (results['items'][i]['album_type'] == 'album') and (results['items'][i]['artists'][0]['name'] != 'Various Artists'): # Avoid adding the same album twice, non-albums listed as albums (such as compilations)
                album_names.append(album_name)
                values = [band_id, results['items'][i]['name'], results['items'][i]['id'], results['items'][i]['release_date']]
                data.append(pd.Series(values))
#                 print(band_id, results['items'][i]['name'], '-', results['items'][i]['id'])
            
        while results['next']:
            try:
                results = spotify.get_next(results)
            except:
                pass
            else:
                i = 0
                while i < len(results['items']):
                    album_name = re.sub(r'\W+', '', str(results['items'][i]['name'])).lower()
                    if (album_name not in album_names) and (results['items'][i]['album_type'] == 'album') and (results['items'][i]['artists'][0]['name'] != 'Various Artists'): # Avoid adding the same album twice, non-albums listed as albums (such as compilations)
                        album_names.append(album_name)
                        values = [band_id, results['items'][i]['name'], results['items'][i]['id'], results['items'][i]['release_date']]
                        data.append(pd.Series(values))
#                         print(band_id, results['items'][i]['name'], '-', results['items'][i]['id'])
                    i = i + 1

In [None]:
df_albums = pd.DataFrame(data)
df_albums.columns = ['Band ID', 'Album Name', 'Album ID', 'Release Date']
df_albums.drop_duplicates(inplace=True) # Dropping duplicate albums
df_albums

In [None]:
df_albums = pd.merge(df_albums, df_unique, on='Band ID')
df_albums

In [None]:
df_albums[df_albums['Band']=='Parkway Drive']

#### Spotify Data Retrieval: Tracks & Features

In [None]:
%%time

# Retrieve track data

data = []

for album_id in df_albums['Album ID'].values:
#     print(len(data))
    try:
        results = spotify.get_album_tracks(album_id)
    except:
        pass
    else:
        i = 0
        while i < len(results['items']): 
            # Track Features
            try:
                track_features_dict = spotify.get_track_features(results['items'][i]['id'])
            except:
                pass
            else:
                track_features = list(track_features_dict.values())

                values = [album_id, results['items'][i]['name'], results['items'][i]['id']] + track_features
                data.append(pd.Series(values))
    #             print(values)
                
                if i == 0:
                    features_names = ['Album ID', 'Track Name', 'Track ID'] + list(track_features_dict.keys())

                i = i + 1
            
        while results['next']:
            try:
                results = spotify.get_next(results)
            except:
                pass
            else:
                i = 0
                while i < len(results['items']):
                    # Track Features
                    try:
                        track_features_dict = spotify.get_track_features(results['items'][i]['id'])
                    except:
                        pass
                    else:
                        track_features = list(track_features_dict.values())

                        values = [album_id, results['items'][i]['name'], results['items'][i]['id']] + track_features
                        data.append(pd.Series(values))
                        i = i + 1

In [None]:
df_tracks = pd.DataFrame(data)
df_tracks.columns = features_names
df_tracks

#### Spotify Data Retrieval: Track Popularity 

In [None]:
tracks_popularity = []

for idx, track_id in enumerate(df_tracks['Track ID']):
#     print(idx, len(df))
    try:
        tracks_popularity.append(spotify.get_track_popularity(track_id)['popularity'])
    except:
        tracks_popularity.append(np.nan)
        pass
    
df_tracks['Track Popularity'] = tracks_popularity

In [None]:
df = pd.merge(df_albums, df_tracks, on='Album ID')
df = pd.merge(df, df_bands, on='Band')

#### Creating a backup file

In [None]:
df.to_csv('C:/Users/spotifydata.csv', sep=';')