# Import

In [None]:
import requests
import base64
import time
import pandas as pd
import csv
import json
import re
import random
from requests import post, get

# Key

In [None]:
client_id = "2422f03fd1d24dc499c55936b285bef7"
client_secret = "538b7681f2bc4e828196034cb34989cb"

# Get token

In [None]:
def get_token(client_id, client_secret):
    # Mã hóa client_id và client_secret thành base64
    auth_string = f"{client_id}:{client_secret}"
    auth_bytes = auth_string.encode('utf-8')
    auth_base64 = base64.b64encode(auth_bytes).decode('utf-8')

    # Thiết lập thông tin yêu cầu
    auth_url = 'https://accounts.spotify.com/api/token'
    headers = {
        'Authorization': f'Basic {auth_base64}'
    }
    data = {
        'grant_type': 'client_credentials'
    }

    # Gửi yêu cầu POST để lấy token
    response = requests.post(auth_url, headers=headers, data=data)

    # Kiểm tra phản hồi và trả về token nếu thành công
    if response.status_code == 200:
        token = response.json().get('access_token')
        return token
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

def get_valid_token(client_id, client_secret):
    token = None
    while token is None:
        token = get_token(client_id, client_secret)
        if token is None:
            print("Token expired or error. Retrying in 1 minute...")
            time.sleep(60)  # Nếu token không hợp lệ, chờ 1 phút và thử lại
    return token

token = get_valid_token(client_id, client_secret)
if token:
    print(f"Access Token: {token}")

# Get track ids

## Get keyword

In [None]:
KEYWORDS = 10000
def fetch_all_category_names(auth_token, locale=None, total_names=1000):
    """
    Fetch up to 'total_names' category names using Spotify API pagination.
    
    Args:
        auth_token (str): Spotify API authorization token.
        locale (str, optional): Locale in ISO format (e.g., "sv_SE").
        total_names (int): Total number of category names to fetch.
    
    Returns:
        list: A list of up to 'total_names' category names.
    
    Raises:
        requests.HTTPError: If the API request fails.
        ValueError: If the response format is invalid.
    """
    url = "https://api.spotify.com/v1/browse/categories"
    headers = {
        "Authorization": f"Bearer {auth_token}"
    }
    limit = 50  # Maximum items per request
    offset = 0
    category_names = []
    global start_time

    while len(category_names) < total_names:
        params = {
            "locale": locale,
            "limit": limit,
            "offset": offset
        }
        
        if time.time() - start_time >= 2700:
            token = get_valid_token(client_id, client_secret)
            headers["Authorization"] = f"Bearer {auth_token}"
            start_time = time.time()
            
        response = requests.get(url, headers=headers, params=params)

         # Handle rate limit (429)
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 1))  # Default to 1 second
            print(f"Rate limit hit. Retrying after {retry_after} seconds.")
            time.sleep(retry_after)
                
        if response.status_code != 200:
            response.raise_for_status()  # Handle HTTP errors
        
        data = response.json()
        
        try:
            categories = data['categories']['items']
            if not categories:  # No more data to fetch
                break
            
            # Extract names and add them to the result list
            category_names.extend([category['name'] for category in categories])
            
            # Update offset for the next page
            offset += limit
            
            # Stop if we reach the total_names limit
            if len(category_names) >= total_names:
                category_names = category_names[:total_names]  # Truncate excess names
                break
        except KeyError:
            raise ValueError("Invalid response format: 'categories' or 'items' key is missing.")
    
    return category_names

token = get_valid_token(client_id, client_secret)
start_time = time.time()

locales = [
    "en_US", "en_GB", "sv_SE", "fr_FR", "de_DE", 
    "es_ES", "es_MX", "it_IT", "zh_CN", "zh_TW",
    "ja_JP", "ko_KR", "pt_PT", "pt_BR", "ru_RU",
    "ar_SA", "nl_NL", "no_NO", "da_DK"
]
keywords = []
for locale in locales:
    try:
        print(f"Fetching categories for locale: {locale}")
        categories = fetch_all_category_names(
            auth_token=token, 
            locale=locale, 
            total_names=KEYWORDS
        )
        keywords.extend(categories)
        print(f"{len(categories)} categories fetched for locale")
    except Exception as e:
        print(f"Error fetching categories for locale {locale}: {e}")

print(len(keywords))

In [None]:
def get_auth_header(token):
    return {"Authorization": "Bearer " + token}

def search_playlists_by_keyword(token, keyword, limit=50):
    url = f"https://api.spotify.com/v1/search?q={keyword}&type=playlist&limit={limit}"
    headers = get_auth_header(token)
    result = get(url, headers=headers)

     # Handle rate limit (429)
    if result.status_code == 429:
        retry_after = int(result.headers.get("Retry-After", 1))  # Default to 1 second
        print(f"Rate limit hit. Retrying after {retry_after} seconds.")
        time.sleep(retry_after)
    
    if result.status_code != 200:
        print(f"Failed to fetch playlists for keyword '{keyword}'. Status Code: {result.status_code}")
        return []

    data = json.loads(result.content)
    return data["playlists"]["items"]

def get_all_tracks_from_playlist(token, playlist_id):
    url = f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks"
    headers = get_auth_header(token)
    all_tracks = []
    
    while url:
        result = get(url, headers=headers)
         
        # Handle rate limit (429)
        if result.status_code == 429:
            retry_after = int(result.headers.get("Retry-After", 1))  # Default to 1 second
            print(f"Rate limit hit. Retrying after {retry_after} seconds.")
            time.sleep(retry_after)
            
        if result.status_code != 200:
            print(f"Failed to fetch tracks for playlist {playlist_id}. Status Code: {result.status_code}")
            return all_tracks  # Trả về các tracks đã lấy được trước đó (nếu có)
        
        data = json.loads(result.content)
        all_tracks.extend(data["items"])  # Thêm tất cả các bài hát vào danh sách
        
        # Lấy URL của trang tiếp theo nếu có
        url = data.get("next")
    
    return all_tracks

def remove_duplicate_playlists(playlists):
    """Loại bỏ playlist trùng lặp dựa trên playlist ID."""
    unique_playlists = {}
    for playlist in playlists:
        if not playlist:  # Bỏ qua nếu playlist là None
            continue
        playlist_id = playlist.get("id")
        if playlist_id not in unique_playlists:
            unique_playlists[playlist_id] = playlist
    return list(unique_playlists.values())

if __name__ == "__main__":
    token = get_valid_token(client_id, client_secret)
    start_time = time.time()

    # Lấy playlist từ các từ khóa
    playlists = []
    for keyword in keywords:
        if time.time() - start_time >= 2700:
            token = get_valid_token(client_id, client_secret)
            start_time = time.time()
        playlists.extend(search_playlists_by_keyword(token, keyword, limit=50))
        time.sleep(0.2)  # Nghỉ để tránh vượt quá hạn mức API

    # Loại bỏ các playlist trùng lặp
    unique_playlists = remove_duplicate_playlists(playlists)

    # Tên file CSV
    csv_file = "track_ids.csv"

    # Ghi dữ liệu vào file CSV
    with open(csv_file, mode="w", newline="", encoding="utf-8-sig") as file:
        writer = csv.writer(file)
        
        # Ghi tiêu đề cột
        writer.writerow([
            "Playlist Name", "Playlist Owner", "Track Index", "Track Name",
            "Track ID", "Artist Name", "Album Name", "Track URI", "Artist URI",
            "Album URI", "Duration (ms)"
        ])
        
        # Lấy thông tin bài hát từ mỗi playlist
        for idx, playlist in enumerate(unique_playlists):
            playlist_name = playlist.get("name", "Unknown Playlist")
            playlist_owner = playlist.get("owner", {}).get("display_name", "Unknown Owner")
            playlist_id = playlist.get("id", None)

            if not playlist_id:
                print(f"{idx + 1}. Skipped playlist with missing ID")
                continue

            print(f"\n{idx + 1}. Playlist: {playlist_name} (Owner: {playlist_owner})")

            # Lấy tất cả các bài hát từ playlist này
            if time.time() - start_time >= 2700:
                token = get_valid_token(client_id, client_secret)
                start_time = time.time()
                
            tracks = get_all_tracks_from_playlist(token, playlist_id)
            print(f"   Total Tracks Retrieved: {len(tracks)}")

            # Lưu thông tin từng bài hát vào file CSV
            for track_idx, item in enumerate(tracks):
                track = item.get("track")
                if not track:
                    print(f"      {track_idx + 1}. Invalid track (NoneType)")
                    continue

                # Lấy thông tin track và kiểm tra các giá trị
                track_name = track.get("name", "Unknown Track")
                track_id = track.get("id", "Unknown ID")
                artist_name = track.get("artists", [{}])[0].get("name", "Unknown Artist")
                track_uri = track.get("uri", "Unknown URI")
                artist_uri = track.get("artists", [{}])[0].get("uri", "Unknown URI")
                album_uri = track.get("album", {}).get("uri", "Unknown URI")
                duration_ms = track.get("duration_ms", "Unknown Duration")
                album_name = track.get("album", {}).get("name", "Unknown Album")

                # Ghi dữ liệu bài hát vào file CSV
                writer.writerow([
                    playlist_name, playlist_owner, track_idx + 1, track_name,
                    track_id, artist_name, album_name, track_uri, artist_uri,
                    album_uri, duration_ms
                ])

    print(f"\nData saved to {csv_file}")

# Get features

## Save

In [None]:
def save_to_csv(data, filename='audio_features.csv'):
    if data is not None:  # Kiểm tra xem dữ liệu có phải là None không
        try:
            # Kiểm tra dữ liệu có trong dạng đúng không
            if isinstance(data, list) and isinstance(data[0], dict):
                df = pd.DataFrame(data)
                df.to_csv(filename, mode='a', header=not pd.io.common.file_exists(filename), index=False)
                print(f"Data saved to {filename}")
            else:
                print("Data format is incorrect.")
        except Exception as e:
            print(f"Error saving to CSV: {e}")
    else:
        print("No data to save.")

## Request features from track id

In [None]:
def get_audio_features(track_ids, token):
    url = 'https://api.spotify.com/v1/audio-features'
    
    # Chuyển tất cả các Track ID thành chuỗi
    track_ids = map(str, track_ids)
    
    # Ghép các Track ID thành chuỗi
    ids = ','.join(track_ids)
    
    headers = {'Authorization': f'Bearer {token}'}
    
    response = requests.get(f'{url}?ids={ids}', headers=headers)

     # Handle rate limit (429)
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 1))  # Default to 1 second
        print(f"Rate limit hit. Retrying after {retry_after} seconds.")
        time.sleep(retry_after)
    
    if response.status_code == 200:
        return response.json()  # Trả về dữ liệu JSON nếu thành công
    else:
        print(f"Error fetching data: {response.status_code}")
        return None  # Trả về None nếu có lỗi

# Hàm xử lý các track từ CSV
def process_tracks_from_csv(file_path, client_id, client_secret):
    # Lấy token từ client_id và client_secret
    token = get_valid_token(client_id, client_secret)
    start_time = time.time()
    
    if token is None:
        print("Failed to get token.")
        return
    
    track_ids = []  # Danh sách track ids từ CSV
    chunk_size = 100  # Số lượng track_id mỗi lần gọi API
    
    # Đọc file CSV và lấy Track ID
    df = pd.read_csv(file_path)
    track_ids = df['Track ID'].tolist()  # Giả sử cột 'Track ID' trong CSV

    # Xử lý theo từng chunk
    for i in range(0, len(track_ids), chunk_size):
        if time.time() - start_time >= 2700: # 45 minutes
            token = get_valid_token(client_id, client_secret)
            start_time = time.time()
            
        track_ids_chunk = track_ids[i:i+chunk_size]
        
        audio_features = get_audio_features(track_ids_chunk, token)
        
        if audio_features:
            save_to_csv(audio_features['audio_features'])  # Lưu dữ liệu vào CSV
            #print(audio_features['audio_features'])
        else:
            print("No data returned for the chunk.")


In [None]:
file_path = '/kaggle/working/track_ids.csv'  # Đường dẫn đến tệp CSV của bạn
process_tracks_from_csv(file_path, client_id, client_secret)