In [2]:
from dotenv import load_dotenv
import os
import json
import pandas as pd
import base64
from requests import post, get
import csv

load_dotenv()

client_id = os.getenv("CID")
client_secret = os.getenv("SECRET")



In [None]:
def get_token():
    auth_string = client_id + ":" + client_secret
    auth_bytes = auth_string.encode("utf-8")
    auth_base64 = str(base64.b64encode(auth_bytes), "utf-8")

    url = "https://accounts.spotify.com/api/token"
    headers = {
        "Authorization": "Basic " + auth_base64,
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {"grant_type": "client_credentials"}

    result = post(url, headers=headers, data=data)
    json_result = json.loads(result.content)
    token = json_result["access_token"]

    return token

def get_auth_header(token):
    return {"Authorization": "Bearer " + token}

In [None]:
import random

def getRandomSearch():
    # Una lista de todos los caracteres que se pueden elegir.
    characters = 'abcdefghijklmnopqrstuvwxyz1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZñÑ ¿?:;-,><&/()=!¡|@#$"'
    
    # Obtiene un carácter aleatorio de la cadena de caracteres.
    randomCharacter = random.choice(characters)
    randomSearch = ''
    
    # Coloca el carácter comodín al principio, o al principio y al final, de forma aleatoria.
    if random.randint(0, 1) == 0:
        randomSearch = randomCharacter + '%'
    else:
        randomSearch = '%' + randomCharacter + '%'
    
    return randomSearch

def search_banch_of_tracks(randomSearch, randomOffset, limit=50):
    token = get_token()
    auth_header = get_auth_header(token)
    url = "https://api.spotify.com/v1/search"
    params = {
        "q": randomSearch,
        "type": "track",
        "limit": limit,
        "offset": randomOffset,
    }
    response = get(url, headers=auth_header, params=params)
    json_response = json.loads(response.content)
    return json_response

def write_csv(json_response, file):
    try:
        new_tracks_df = pd.DataFrame(json_response["tracks"]["items"])
        with open(file, "a") as f:
            new_tracks_df.to_csv(f)
    except:
        pass
        
def depurar_csv(file):
    df = pd.read_csv(file)
    df.drop_duplicates(subset=["id"], inplace=True)
    df.to_csv(file, index=False)
    
def get_several_tracks(file, iterations=1000):
    for i in range(0, iterations):
        randomSearch = getRandomSearch()
        randomOffset = random.randint(0, 1000)
        print(f'Iteration {i} - Search: {randomSearch} - Offset: {randomOffset}')
        json_response = search_banch_of_tracks(randomSearch, randomOffset)
        write_csv(json_response, file)
    depurar_csv(file)
    
    
def get_track_features(df):
    token = get_token()
    auth_header = get_auth_header(token)
    url = "https://api.spotify.com/v1/audio-features"
    track_ids = df["id"].tolist()
    track_features = []
    for i in range(0, len(track_ids), 100):
        try:
            params = {"ids": ",".join(track_ids[i:i+100])}
            response = get(url, headers=auth_header, params=params)
            json_response = json.loads(response.content)
            if "audio_features" in json_response:
                track_features += json_response["audio_features"]
                #add the json the id of each track
                for j in range(len(json_response["audio_features"])):
                    try:
                        track_features[-1-j]["id"] = track_ids[i+99-j]
                    except:
                        pass
            else:
                print(f"No audio features found in response: {json_response}")
        except Exception as e:
            print(f"Error in batch {i} to {i+100}: {str(e)}")
            continue
    return track_features


In [None]:
#get_several_tracks(file="db_extract.csv", iterations=10000)

In [None]:
depurar_csv(file="db_extract.csv")
df = pd.read_csv("db_extract.csv")
track_features = get_track_features(df)


In [None]:
#check all track features have id, if not, erase it
track_features_aux = {"track": []}
for track in track_features:
    try:
        if "id" not in track:
            track_features.remove(track)
        else:
            track_features_aux["track"].append(track)
    except:
        pass

df_features = pd.DataFrame(track_features_aux["track"])
df_features.head()

In [None]:
#merge two dataframes with the same id
df_merged = pd.merge(df, df_features, on="id")
#if acousticness is in blank, erase that row
df_merged = df_merged[df_merged["acousticness"].notna()]
df_merged.to_csv("db_extract_features_merged.csv")


In [None]:
#drop columns that are not needed [Unnamed: 0, disc_number, external_ids, external_urls, href, is_local, prewiew_url, track_number, type_x, uri_x, type_y, uri_y, track_href, analysis_url, duration_ms_y, time_signature]
df_merged.drop(columns=["Unnamed: 0", "disc_number", "external_ids", "external_urls", "href", "is_local", "preview_url", "track_number", "type_x", "uri_x", "type_y", "uri_y", "track_href", "analysis_url", "duration_ms_y", "time_signature"], inplace=True)


In [None]:
#change column duration_ms_x to duration_ms
df_merged.rename(columns={"duration_ms_x": "duration_ms"}, inplace=True)
df_merged.to_csv("db_extract_features_merged.csv")


In [7]:
#ckeck if 'ES' is in available markets, if not, erase that row
df_merged = pd.read_csv("db_extract_features_merged.csv")
df_markets = df_merged[df_merged["available_markets"].str.contains("ES")]
df_markets.set_index("id", inplace=True)
df_markets.drop(columns=["Unnamed: 0"], inplace=True)
df_markets.to_csv("db_extract_features_merged_markets.csv")


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_markets.drop(columns=["Unnamed: 0"], inplace=True)


In [11]:
df = pd.read_csv("db_extract_features_merged_markets.csv")
df.drop(columns=["available_markets"], inplace=True)
df.to_csv("db_extract_final.csv")

In [33]:
df = pd.read_csv("db_extract_final.csv")
df.drop(columns=["Unnamed: 0"], inplace=True)

import ast

def get_artists(json_str):
    l = []
    json_str = ast.literal_eval(json_str)
    for artist in json_str:
        l.append(artist['name'])
    return l

df['artists'] = [get_artists(json_str) for json_str in df['artists']]
df['album'] = [ast.literal_eval(json_str)['name'] for json_str in df['album']]

df.to_csv("db_extract_final.csv")