# BLOCK 1: DIV PROYECT:  <b><i>Eventful<i></b>

## Instalation and credentials

### Required packages

In [1]:
%pip install -q spotipy
%pip install -q spacy
%pip install -q ratelimit
%pip install -q bs4
%pip install -q langdetect

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


### Credentials and data authorization

<b>Credentials</b>: we load the credentials needed to use the <i>APIs</i> or <i>Web scrapers</i>. These are stored in a safe location in a separated file, for security purposes.

### Necesary imports throughout the code

In [2]:
import json
import re
import requests
import urllib.request
import os
import time
import numpy as np
import spacy
import spotipy
from spotipy import oauth2
from spotipy.oauth2 import SpotifyClientCredentials
from ratelimit import limits, sleep_and_retry
from IPython.display import clear_output
from credentials import * # These are the credentials
import re

## Data gathering

In [3]:
final_json = {}
try:
    with open('data.json', 'r') as file:
        final_json = json.load(file)
except json.JSONDecodeError as e:
    raise Exception(f'Error: {e}')

In [4]:
# Obtain the artists
TOP_ARTISTS = list(final_json.keys())


### Function to limit calls per second

This function aims to avoid the issues that arise when an API is called many times per second.
It is setup to 1 call per second.

In [5]:
# 1 call per second
CALLS = 1
RATE_LIMIT = 1

@sleep_and_retry
@limits(calls=CALLS, period=RATE_LIMIT)
def check_limit():
    # Empty function to limit calls to APIs
    return

### Ticketmaster

<b>Ticketmaster:</b> we take a list of the currently 50 most listened artists, and we check which of those have upcoming concerts. Then, we gather those concerts and their important data.

In [6]:

artists = {}
photos = {}

# This function will be used later. It calls the Seatgeek API to add alternative prices to an event, and enrich our original data
# To find the event, we make a call using the artist name and the date
def find_alternative_price(artist, date, time):
    # We need to transform data in order to have the Seatgeek API understand it
    search_artist = artist.replace(" ", "-")
    search_date = date + "T" + time
    # We need to have the search terms be in ASCII code for the Seatgeek API to work
    if len(search_artist) != len(search_artist.encode()):
        return "Inexistent"

    search = f"https://api.seatgeek.com/2/events?performers.slug={search_artist}&datetime_local={search_date}&client_id={SEATGEEK_API_CLIENT_ID}"
    check_limit()
    with urllib.request.urlopen(search) as url:  
        data = json.loads(url.read().decode())

    # We make sure that there is only one concert as result (more would mean we cannot find the exact concert, less would mean that the concert is not on Seatgeek)
    # Then, we take the lowest price for that event and return it
    if data["meta"]["total"] == 1:
        return data["events"][0]["stats"]["lowest_price"]

    # If something fails along the way, we return that there is no alternative price
    return "Inexistent"

# We will find the unique Ticketmaster API identifier of each artist using their name. We will store it in a dictionary 
# We need to do this in order to find only the concerts of the original artist (not tribute concerts for example)
for keyword in TOP_ARTISTS:
    # regx = re.compile('\W')
    # if_space = regx.findall(keyword)
    keyword = keyword.replace(" ", "")
    # We make a call to the Tickermaster API and store the information, downloaded in json format, in a dictionary
    search = f"https://app.ticketmaster.com/discovery/v2/attractions.json?&keyword={keyword}&apikey={TICKETMASTER_API_KEY}"
    check_limit() 
    with urllib.request.urlopen(search) as url:  
        data = json.loads(url.read().decode())

    # We access the section of the dictionary in which the id's of artists are stored, and retrieve it
    attractions = data["_embedded"]["attractions"]
    for attraction in attractions:
        if attraction["name"].replace(" ", "") == keyword:
            artists[attraction["name"]] = attraction["id"]
            # Extra information, used only for Part 2
            photos[attraction["name"]] = []
            images = attraction["images"]
            for image in images:
                photos[attraction["name"]].append(image["url"])

# Extra information, used only for Part 2
with open('photos.json', 'w') as file:
        json.dump(photos, file, indent=4)
                

# We will find concert information for all of the concerts of each artist (using their id), and store it in a dictionary
information = {}
for artist in artists:
    # We make a call to the Tickemaster API.
    search = f"https://app.ticketmaster.com/discovery/v2/events.json?&attractionId={artists[artist]}&apikey={TICKETMASTER_API_KEY}"    
    check_limit()
    with urllib.request.urlopen(search) as url:  
        data = json.loads(url.read().decode())
    
    # We discard artists with no upcoming concerts
    if data["page"]["totalElements"] == 0:
        continue

    # We create an entry in the dictionary for each artist. It will have a list of concerts
    information[artist] = []
    events = data["_embedded"]["events"]
    for event in events:
        try:
            # Some fields are absolutely necessary for the functionality of the app: name, url, date and time, price, and venue
            # Thus, if they are not available in the original data source we will ignore that concert
            name = event["name"]
            url = event["url"]

            date = event["dates"]["start"]["localDate"]
            time = event["dates"]["start"]["localTime"]
            timezone = event["dates"]["timezone"]

            # We take the lowest price (remember that we want to find the cheapest, most convenient event).
            # We will ignore concerts with a price of 0€ (since in this context it usually indicates that prices have not been oficially released yet)
            prices = event["priceRanges"]
            minPrice = min(prices, key=lambda x:x['min'])
            cheapestPrice = minPrice["min"]
            currency = minPrice["currency"]
            if cheapestPrice == 0: 
                raise KeyError('nullPrice')
            
            venue = event["_embedded"]["venues"][0]
            venueName = venue["name"]

            # Using the information we have from Ticketmaster, we look for the price of the same concert on Seatgeek
            alternativeCheapestPrice = find_alternative_price(artist, date, time)

            # We have stored the information in variables while checking it is valid. 
            # When the checks have finished, we will store the information in a dictionary for each concert.
            information[artist].append({})
            information[artist][-1]["Concert Name"] = name
            information[artist][-1]["Concert URL"] = url
            information[artist][-1]["Date"] = date
            information[artist][-1]["Time"] = time
            information[artist][-1]["Timezone"] = timezone
            information[artist][-1]["Ticketmaster Cheapest Price"] = cheapestPrice
            information[artist][-1]["Seatgeek Cheapest Price"] = alternativeCheapestPrice
            information[artist][-1]["Currency"] = currency
            information[artist][-1]["Venue"] = venueName

            # Some more information is optional: city, country, classifications (concert genre), parking information, accesibility information
            # We will check if it is available, else we will simply indicate that it is not (but still keep the concert in our list)
            if "city" in venue: 
                information[artist][-1]["City"] = venue["city"]["name"]
            else: 
                information[artist][-1]["City"] = "Not Specified"

            if "country" in venue: 
                information[artist][-1]["Country"] = venue["country"]["name"]
            else:
                information[artist][-1]["Country"] = "Not Specified"

            if "classifications" in event and len(event["classifications"]) > 0 and "genre" in event["classifications"][0]: 
                information[artist][-1]["Main Genre"] = event["classifications"][0]["genre"]["name"]
            else: 
                information[artist][-1]["Main Genre"] = "Not Specified"
            if "products" in event: 
                products = event["products"]
                for product in products:
                    if product == "Parking": 
                        information[artist][-1]["Parking Service"] = "Yes"
                        break
                else: 
                    information[artist][-1]["Parking Service"] = "No"
            if "accessibility" in event and "info" in event["accessibility"]:
                information[artist][-1]["Accessibility Services"] = "Yes"
            else:
                information[artist][-1]["Accessibility Services"] = "No"    

            # Extra Information, used only for part 2
            latitude = venue["location"]["latitude"]
            longitude = venue["location"]["longitude"]

            information[artist][-1]["Latitude"] = latitude
            information[artist][-1]["Longitude"] = longitude
            
        except KeyError as e:
            pass

    else:
        # We delete information about an artist if no concerts have been found (or only concerts with incomplete information)
        if information[artist] == []:
            del information[artist]

# We store the information in the final json file
for artist in final_json:
    if 'Concerts' not in final_json[artist]:
        final_json[artist]['Concerts'] = {}

    if artist in information:
        final_json[artist]['Concerts'] = information[artist]

### Spotify

<b>Spotify:</b> we use the <i>Spotipy</i> library to obtain the most listened songs of each of the artists with upcoming concerts.

#### <i>Spotipy</i> general functions

In [7]:
# SPOTIPY AUTHORIZATION
SCOPE = 'user-top-read user-read-currently-playing user-modify-playback-state'
CACHE = '.spotipyoauthcache'

try:
    sp_oauth = oauth2.SpotifyOAuth( SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET,SPOTIPY_REDIRECT_URI, scope=SCOPE,cache_path=CACHE )
except:
    raise('not completed')

spotify = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials(SPOTIPY_CLIENT_ID, SPOTIPY_CLIENT_SECRET))

# Functions
# --- Search artists
def search_artist(name: str):
    results = spotify.search(q='artist:' + name, type='artist')
    items = results['artists']['items']
    if len(items) > 0:
        artist = items[0]
    return artist.get('uri', None)

# --- Track info about each artist
def track_info(artist: str):
    if not isinstance(artist, str):
        raise ValueError("El argumento 'artist' debe ser una cadena (string)")
    results = spotify.artist_top_tracks(search_artist(artist))
    res = {}
    for track in (results['tracks'][:5]):
        res[str(track['name'])] = {
            'name':  track['name'],
            'popularity': track['popularity'],
            'url': track['uri'],
            'album_type': track['album']['album_type'],
            'album_name': track['album']['name']
        }
    return dict(res)

# --- Json storage of the data
def final_json_store(total: dict):
    for artist in total:
        # Track info about an artist
        most_listened_songs = track_info(artist)

        # Check if 'Spotify tracks' key exists, if not, create it
        if 'Spotify tracks' not in total[artist]:
            total[artist]['Spotify tracks'] = {}

        # Add the most listened songs
        total[artist]['Spotify tracks'] = most_listened_songs
    return total  


#### Storage of the spotify songs

In [8]:
spotify_added = final_json_store(final_json)
final_json.update(spotify_added)

### Setlist

<b>Setlist:</b> we use the <i>Setlist API</i> to obtain the most played songs in the past concerts of each artist.

#### <i>Setlist</i> general functions

In [9]:
# Function to obtain setlist information from a given artist
def get_setlist_data(arid: str, option: str):    
    check_limit()
    # Heaaders for the GET request
    headers = {
    'Accept': 'application/json',
    'x-api-key': SETLIST_API_KEY,
    }
    # Append the artist id for the search
    url_setlist = "https://api.setlist.fm/rest/1.0/search/setlists?artistMbid="+str(arid)+"&p=1"
    response = requests.get(url_setlist, headers=headers)
    # Transform the response into json
    data = response.json()

    empty = True
    i=0
    recents = []
    max = 9 # Number of setlists to gather data from
    # Loop to find the latest non empty setlist
    while empty == True:
        # Current setlist
        try:
            setlist = data['setlist'][i]['sets']['set']
        except IndexError:
            return recents
        except KeyError:
            return recents
        # Check if empty
        if len(setlist) == 0:
            i += 1
        else:
            # Return last setlist
            if option == "latest":
                empty = False
                return data['setlist'][i]
            # Return 8 last setlists
            if option == "8_most_recent":
                i += 1
                max -= 1
                if max == 0:
                    empty = False
                else:
                    try:
                        recents.append(data['setlist'][i])
                    except IndexError:
                        return recents
    # Return setlist data, including venue, name, songs played...
    return recents

# Function that returns a dict containing every song played in a given concert
def get_songs(setlist_data):
    # Song dictionary for the result and counter of songs and sets
    song_dict = {}
    i = 1
    set_n = 1

    # Loop through every set in the concert
    for set in setlist_data['sets']['set']:
        set_n += 1
        # Check if the set is empty
        if len(set) != 0:
            # Loop through every song and update the result dictionary
            for song in set['song']:
                song_dict.update({str(i): song['name']})
                i+= 1
    return song_dict
    
# Function to return the musicbrainzId of a given artist for use in other functions
def get_mb_id(artist:str):
    check_limit()
    # Get request given an input artist name
    url = "http://musicbrainz.org/ws/2/artist/?query=artist:"+str(artist)
    
    response = requests.get(url, headers={"Accept": "application/json"})
    # Make sure the response is in utf-8 to avoid formatting issues
    response.encoding = 'utf-8'
    # Transform the response into json
    data = response.json()
    # Locate the artist MBid and return it
    id = data['artists'][0]['id']
    return id

# Given the name of an artist, return top 5 played songs in its last 8 concerts
def get_5_most_played(artist):
    top_songs = {}
    # Obtain the mbId of an artist to retrieve its data
    arid = get_mb_id(artist)
    # Obtain the 8 latest concerts of said artist
    last_8_setlist = get_setlist_data(arid, "8_most_recent")
    # Append all songs played in a concert to a list
    all_songs = get_8_songlist(last_8_setlist)
    # Create a dict with key: song and value: times played over the last 8 concerts
    for song in all_songs:
        if song in top_songs:
            top_songs[song] += 1
        else:
            top_songs[song] = 1
    # Sort the dictionary in descending order
    sorted_top = sorted(top_songs.items(), key=lambda x: x[1], reverse=True)
    index = 0
    top5 = {}
    # Get the top 5 songs from the dict
    while index < 5:
        try:
            top5.update({sorted_top[index][0]:sorted_top[index][1]})
        except IndexError:
            break
        index += 1
    return top5

# Given 8 setlists, return all songs played
def get_8_songlist(last_8_setlist):
    all_songs = []
    # Loop through each setlist
    for setlist in last_8_setlist:
        songs_played = get_songs(setlist)
        # Append each song
        for song in songs_played.values():
            all_songs.append(song)
    return all_songs

def venue_set(venue: str, artist_name: str):
    pass

#### Top 5 most played songs

In [10]:
topSongs_dict = {}

# Get the top 5 songs of a group of artists and dump it in a JSON file
top_songs_dict = {}
for artist in TOP_ARTISTS:
    clear_output(wait=True)  # Clear output so messages don't pile up
    top_songs_dict[artist] = get_5_most_played(artist)
clear_output(wait=True)

for artist in final_json:
    try:
        final_json[artist]['Setlist tracks'] = top_songs_dict[artist]
    except:
        pass


### Setlist ft. Spotify

<b>Setlist ft Spotify</b>: we join these two data sections in order to have the most probable songs the artist will play (the most played <b>and</b> most listened)

In [11]:
# Find the coincidences between most listened songs and most played in concerts from each artist

# Functions
# --- Check if any of the dictionaries is empty
def is_empty(setlist_dict: dict, spotify_dict: dict):
    if setlist_dict == {} or spotify_dict == {}:
        return True
    return False

# --- Compare the keys of two dictionaries in order to find common keys
def compare_keys(nested_dict1: dict, nested_dict2: dict):
    common_keys = []
    for keys1 in nested_dict1:
        for keys2 in nested_dict2:
            if keys1 == keys2:
                common_keys.append(keys1)
    return common_keys

# --- Compare the songs of each artists and find the coincidences
def top5_compare(setlist_dict, spotify_dict):
    if is_empty(setlist_dict, spotify_dict):
        return {}
    common_artists = compare_keys(setlist_dict, spotify_dict)
    common_songs = {}
    for artist in common_artists:
        common_songs[artist] = compare_keys(setlist_dict[artist], spotify_dict[artist])
    return common_songs

# --- Create a new dictionary with the data we want
def set_new_list(artist, songs: set, setlist_dict: dict, spotify_dict: dict):
    song_dict = {}
    for song in songs:
        if song in spotify_dict.get(artist, {}) and song in setlist_dict.get(artist, {}):
            song_dict[song] = {
                'name': spotify_dict[artist][song]['name'],
                'times it has been played': setlist_dict[artist][song],
                'album': spotify_dict[artist][song]['name']
            }
    return song_dict


setlist_dict = {}
spotify_dict = {}
# CODE: obtain the data and store it after comparison

for artist in final_json:
    # Top 5 most played songs from each artist
    if 'Setlist tracks' in final_json[artist]:
        setlist_dict[artist] = final_json[artist]['Setlist tracks']

    # Top 5 most listened songs from each artist
    if 'Spotify tracks' in final_json[artist]:
        spotify_dict[artist] = final_json[artist]['Spotify tracks']


# Create the comparison betweeen both files
common_songs = top5_compare(setlist_dict, spotify_dict)
final_dict = {}
for artist in common_songs:
        final_dict[artist] = set_new_list(artist, common_songs[artist], setlist_dict, spotify_dict)

# We store the information in the final json file
for artist in final_json:
    if 'Spotify and Setlist' not in final_json[artist]:
        final_json[artist]['Spotify and Setlist'] = {}

    if artist in final_dict:
        final_json[artist]['Spotify and Setlist'] = final_dict[artist]

### Lyric scraping

**BeautifulSoup:** Using BeautifulSoup we scrape the lyrics of the songs we have gathered. The lyrics come from Genius. We use SpaCy to process the lyrics and then store the word count of each song.

In [12]:
## Scrapes lyrics from Genius and counts the number of lemmatized words in each song.
import json
import re
import requests
from bs4 import BeautifulSoup
from collections import defaultdict

import spacy
from langdetect import detect
import unicodedata

if not spacy.util.is_package('en_core_web_sm'):
    spacy.cli.download('en_core_web_sm')
if not spacy.util.is_package('es_core_news_sm'):
    spacy.cli.download('es_core_news_sm')

nlp_en = spacy.load('en_core_web_sm')
nlp_es = spacy.load('es_core_news_sm')

stopwords_en = nlp_en.Defaults.stop_words
stopwords_es = nlp_es.Defaults.stop_words

def get_link(artist: str, song: str) -> str:
    """
    Gets the link to the lyrics page on Genius.
    Args:
        artist (str): Artist name
        song (str): Song name
    """
    artist = artist.replace(' ', '-')
    song = remove_punctuation_contractions(song).replace(' ', '-')

    return f'https://www.genius.com/{artist}-{song}-lyrics'

def get_lyrics(artist: str, song: str) -> str or None:
    """
    Gets the lyrics from Genius.
    Returns None if the lyrics page does not exist.
    Args:
        artist (str): Artist name
        song (str): Song name
    """
    link = get_link(artist, song)
    page = requests.get(link)
    soup = BeautifulSoup(page.text, 'html.parser')
    try:
        lyrics = soup.find('div', class_='Lyrics__Container-sc-1ynbvzw-1 kUgSbL').get_text(separator=" ")
    except AttributeError:
        return None
    lyrics = re.sub(r'\[.*?\]', '', lyrics)  # remove tags between brackets
    lyrics = re.sub(r'\s+', ' ', lyrics)  # replace consecutive whitespace with a single space
    return lyrics.strip()

def remove_stopwords(lyrics: str) -> str:
    """
    Removes stopwords from the lyrics.
    Args:
        lyrics (str): Lyrics
    """
    if not lyrics:
        return ''
    stopwords = stopwords_es if detect(lyrics) == 'spanish' else stopwords_en
    return ' '.join([word for word in lyrics.split() if word not in stopwords])

def remove_punctuation_contractions(lyrics: str) -> str:
    """
    Removes punctuation and contractions from the lyrics.
    Args:
        lyrics (str): Lyrics
    """
    lyrics = ''.join(c for c in unicodedata.normalize('NFD', lyrics) if unicodedata.category(c) != 'Mn')  # remove diacritics
    lyrics = re.sub(r'[^\w\s]', '', lyrics)  # remove punctuation
    lyrics = re.sub(r"(\b\w+)'(\w+\b)", r'\1\2', lyrics)  # remove contractions
    return lyrics

def preprocess(lyrics: str) -> str:
    """
    Preprocesses the lyrics.
    Args:
        lyrics (str): Lyrics
    """
    return remove_punctuation_contractions(remove_stopwords(lyrics))

def count_lemmatized_words(lyrics):
    """
    Counts the number of lemmatized words in the lyrics.
    Args:
        lyrics (str): Lyrics
    """
    if not lyrics:
        return {}
    
    if detect(lyrics) == 'spanish':
        stopwords = stopwords_es
        doc = nlp_es(lyrics)
    else:
        stopwords = stopwords_en
        doc = nlp_en(lyrics)
    
    word_count = defaultdict(int)  # default value for the count of a word is 0

    for token in doc:
        lemma = token.lemma_.lower()  # Converting to lowercase for consistent results
        if lemma not in stopwords and len(lemma) > 2:
            word_count[lemma] += 1

    return word_count

def write_json(data: dict, filename: str):
    """
    Writes the data to a JSON file. (Unused utility function.)
    Args:
        data (dict): Dictionary to write
        filename (str): Name of the file
    """
    import json
    with open(filename, 'w') as f:
        json.dump(data, f, indent=4)

def get_songs() -> list:
    """
    Reads final_json and returns a list of tuples of the form (Artist, Title).
    """
    songs = []
    for artist in final_json:
        for title in final_json[artist]["Spotify tracks"]:
            print(f"Analyzing the lyrics for: {artist} - {title}")
            songs.append((artist, title))
        clear_output(wait=True)
    return songs

songs = get_songs()
for artist, title in songs:
    print(f"Adding the lyric analysis for: {artist} - {title}")
    words = count_lemmatized_words(preprocess(get_lyrics(artist, title)))
    final_json[artist]["Spotify tracks"][title]["words"] = words
    clear_output(wait=True)

Adding the lyric analysis for: Arctic Monkeys - R U Mine?


## Data storage

<b>Data storage:</b> we store the data back to our json file, once it has been updated

In [13]:
try:
    with open('data.json', 'w') as file:
        json.dump(final_json, file, indent=4)
except TypeError as e:
    raise Exception(f'Error: {e}')