In [11]:
import requests
import base64

In [9]:
# 1. Configura tus credenciales de Spotify
CLIENT_ID = '*'  # Reemplázalo con tu Client ID
CLIENT_SECRET = '*'  # Reemplázalo con tu Client Secret

In [12]:
# usual imports
import os
import sys
import numpy as np # get it at: http://numpy.scipy.org/
# path to the Million Song Dataset subset (uncompressed)
# CHANGE IT TO YOUR LOCAL CONFIGURATION
msd_subset_path=r'C:\Users\aleja\Desktop\TFM\MillionSongSubset'
msd_subset_data_path=os.path.join(msd_subset_path,'data')
msd_subset_addf_path=os.path.join(msd_subset_path,'AdditionalFiles')
assert os.path.isdir(msd_subset_path),'wrong path' # sanity check
# path to the Million Song Dataset code
# CHANGE IT TO YOUR LOCAL CONFIGURATION
msd_code_path=r'C:\Users\aleja\Desktop\TFM\MSongsDB-master'
assert os.path.isdir(msd_code_path),'wrong path' # sanity check
# we add some paths to python so we can import MSD code
# Ubuntu: you can change the environment variable PYTHONPATH
# in your .bashrc file so you do not have to type these lines
sys.path.append( os.path.join(msd_code_path,'PythonSrc') )

In [13]:
import sqlite3
import csv
import time
import os

# Establecer conexión con la base de datos (si aún no está conectada)
conn = sqlite3.connect(os.path.join(msd_subset_addf_path,
                                    'subset_track_metadata.db'))

# Construir la consulta SQL
q = "SELECT * FROM songs"  # Esto seleccionará todas las columnas de la tabla 'songs'

# Ejecutar la consulta
t1 = time.time()
res = conn.execute(q)
song_list = res.fetchall()  # Guardamos todos los resultados
t2 = time.time()

# Imprimir el tiempo de ejecución
print('All song data extracted (SQLite) in:', str(t2 - t1), 'seconds')

# Cerrar la conexión a la base de datos
conn.close()

# Guardar los resultados en un archivo CSV
with open('songs_data.csv', 'w', newline='', encoding='utf-8') as csvfile:
    # Asumimos que la primera fila contiene los encabezados de las columnas
    # Puedes obtener estos nombres de columna si los necesitas
    column_names = [description[0] for description in res.description]  # Obtener los nombres de las columnas

    writer = csv.writer(csvfile)
    writer.writerow(column_names)  # Escribir los encabezados

    # Escribir los datos
    writer.writerows(song_list)

print("Data saved to songs_data.csv.")

All song data extracted (SQLite) in: 0.04544568061828613 seconds
Data saved to songs_data.csv.


In [14]:
class SpotifyAPI:
    def __init__(self, client_id, client_secret):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token = None
        self.get_access_token()  # Inicializa el token

    def get_access_token(self):
        """Obtiene un nuevo token de acceso usando el flujo Client Credentials."""
        url = "https://accounts.spotify.com/api/token"
        headers = {
            "Authorization": "Basic " + base64.b64encode(f"{self.client_id}:{self.client_secret}".encode()).decode(),
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = {"grant_type": "client_credentials"}
        response = requests.post(url, headers=headers, data=data)
        if response.status_code == 200:
            self.token = response.json()['access_token']
            print("Nuevo token obtenido.")
        else:
            raise Exception(f"Error al obtener el token: {response.status_code} - {response.text}")

    def make_request(self, url, params=None):
        """Hace una solicitud a la API de Spotify, con manejo de token expirado."""
        if not self.token:
            self.get_access_token()  # Asegura que hay un token válido

        headers = {"Authorization": f"Bearer {self.token}"}
        response = requests.get(url, headers=headers, params=params)

        # Si el token expiró, renueva y vuelve a intentar
        if response.status_code == 401:
            print("Token expirado, obteniendo un nuevo token...")
            self.get_access_token()
            headers["Authorization"] = f"Bearer {self.token}"
            response = requests.get(url, headers=headers, params=params)

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Error en la solicitud: {response.status_code} - {response.text}")

    def search_song(self, query):
        """Busca una canción en Spotify."""
        url = "https://api.spotify.com/v1/search"
        params = {
            "q": query,
            "type": "track",
            "limit": 1  # Limitamos la búsqueda a una sola canción
        }
        data = self.make_request(url, params)
        tracks = data.get("tracks", {}).get("items", [])
        if tracks:
                return {
                    "name": tracks[0]["name"],
                    "artist": tracks[0]["artists"][0]["name"],
                    "track_id": tracks[0]["id"],
                    "preview_url": tracks[0].get("preview_url")
                }
        else:
            raise Exception("No se encontró la canción.")

    def download_preview(self, preview_url, filename="preview.mp3"):
        """Descarga la vista previa de una canción si está disponible."""
        if not preview_url:
            raise Exception("No hay vista previa disponible para esta canción.")
        response = requests.get(preview_url)
        if response.status_code == 200:
            with open(filename, "wb") as file:
                file.write(response.content)
            print(f"Vista previa descargada como '{filename}'")
        else:
            raise Exception(f"Error al descargar la vista previa: {response.status_code} - {response.text}")


In [15]:
import requests
from bs4 import BeautifulSoup
from jsonpath_ng import parse
import json

def fetch_preview_url(track_id: str) -> str:
    embed_url = f"https://open.spotify.com/embed/track/{track_id}"
    
    try:
        # Perform HTTP GET request
        response = requests.get(embed_url)
        if response.status_code != 200:
            print(f"Failed to fetch embed page: {response.status_code}")
            return None

        # Parse the HTML content
        html = response.text
        soup = BeautifulSoup(html, "html.parser")
        script_elements = soup.find_all("script")

        # Search for the target JSON script content
        for script in script_elements:
            script_content = script.string
            if script_content:
                return find_node_value_with_jsonpath(script_content, "audioPreview")

    except Exception as e:
        print(f"Error fetching preview URL: {e}")
        return None
    
    return None

def find_node_value_with_jsonpath(json_string: str, target_node: str) -> str:
    try:
        # Parse the JSON string
        json_object = json.loads(json_string)
        
        # Construct and apply the JsonPath query
        query = f"$..{target_node}.url"
        #print(f"Using JsonPath Query: {query}")  # Debug query
        jsonpath_expr = parse(query)

        # Execute the query
        matches = [match.value for match in jsonpath_expr.find(json_object)]
        return matches[0] if matches else None

    except Exception as e:
        print(f"Error processing JSON: {e}")
        return None

# Example usage
if __name__ == "__main__":
    track_id = "7qiZfU4dY1lWllzX7mPBI3"  # Replace with your track ID
    preview_url = fetch_preview_url(track_id)
    print(f"Preview URL: {preview_url}")

Preview URL: https://p.scdn.co/mp3-preview/7339548839a263fd721d01eb3364a848cad16fa7


In [16]:
import pandas as pd

df = pd.read_csv("songs_data.csv").head(10)
if 'title' not in df.columns:
    raise ValueError("El archivo CSV debe contener una columna llamada 'title'.")

# Inicializa la API de Spotify
spotify = SpotifyAPI(CLIENT_ID, CLIENT_SECRET)

# Listas para las nuevas columnas
search_titles = []
search_artists = []
search_track_ids = []
preview_urls = []

# Iterar sobre cada título en el CSV
for title, artist_name in zip(df['title'], df['artist_name']):
    try:
        song = spotify.search_song(f"{title} {artist_name}")
        search_titles.append(song["name"])
        search_artists.append(song["artist"])
        search_track_ids.append(song["track_id"])
    except Exception as e:
        print(f"Error procesando la canción '{title}': {e}")
        search_titles.append(None)
        search_artists.append(None)
        search_track_ids.append(None)

# Añadir las nuevas columnas al DataFrame
df["search_title"] = search_titles
df["search_artist"] = search_artists
df["search_track_id"] = search_track_ids

for track_id in df["search_track_id"]:
    preview_urls.append(fetch_preview_url(track_id))

df["preview_url"] = preview_urls

# Guardar el DataFrame actualizado
df.to_csv("output_with_search_columns_improved_test.csv", index=False)
print("Archivo procesado y guardado como 'output_with_search_columns_improved.csv'.")

Nuevo token obtenido.
Archivo procesado y guardado como 'output_with_search_columns_improved.csv'.
