In [2]:
import pandas as pd
import numpy as np
import ast

In [3]:
# 기존 데이터는 2020.01.17까지
df = pd.read_csv('../data/spotify_songs.csv')

In [20]:
df['track_album_release_date'].value_counts()

track_album_release_date
2020-01-10    270
2019-11-22    244
2019-12-06    235
2019-12-13    220
2013-01-01    219
             ... 
1973-08-28      1
2000-03-23      1
1967-04         1
1968-07-03      1
2014-04-18      1
Name: count, Length: 4530, dtype: int64

In [5]:
import time
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import pandas as pd
import requests
from requests.exceptions import HTTPError
from spotipy.exceptions import SpotifyException
import threading
# from dotenv import load_dotenv


# 스포티파이 api키 입력
client_id = '9138c9fe66ce41dfa7ab1b83ecf6f650'
client_secret = '9521407805044cb7a9573a36519197c1'

client_credentials_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

In [6]:
# 새로운 노래 id들 불러오기
df = pd.read_csv('../data/new_songs.csv')

ids = df['id'].tolist()

# 한번에 호출할 수 있는 100개에 맞추어 자르기
def chunk_ids(ids, chunk_size=100):
    for i in range(0, len(ids), chunk_size):
        yield ids[i:i + chunk_size]

id_chunks = list(chunk_ids(ids))

In [7]:
# Rate limit에 맞추어 요청 수 제한
class RateLimiter:
    def __init__(self, max_calls, period):
        self.max_calls = max_calls
        self.period = period
        self.calls = []
        self.lock = threading.Lock()

    def acquire(self):
        with self.lock:
            current_time = time.time()
            # 기간 내의 호출만 남김
            self.calls = [call for call in self.calls if current_time - call < self.period]
            if len(self.calls) >= self.max_calls:
                # 기간이 끝날 때까지 대기
                sleep_time = self.period - (current_time - self.calls[0])
                print(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds.")
                time.sleep(sleep_time)
                # 대기 후 호출 리스트 갱신
                self.calls = [call for call in self.calls if time.time() - call < self.period]
            # 호출 기록 추가
            self.calls.append(time.time())

rate_limiter = RateLimiter(max_calls=5, period=30)

In [8]:
# API로 audio features 호출
def get_audio_features(sp, ids):
    audio_features = []
    id_chunks = list(chunk_ids(ids))
    total_chunks = len(id_chunks)

    for idx, chunk in enumerate(id_chunks):
        rate_limiter.acquire()  # 호출 전에 RateLimiter를 통해 제한 관리
        try:
            features = sp.audio_features(chunk)
            if features is not None:
                audio_features.extend(features)
            else:
                print(f"Chunk {idx+1}/{total_chunks}: Received None for features.")
            print(f"Chunk {idx+1}/{total_chunks} processed.")
        except Exception as e:
            print(f"Error fetching audio features for chunk {idx+1}/{total_chunks}: {e}")
            # 필요에 따라 재시도 로직 추가 가능
    return audio_features

In [None]:
# Audio features 호출
audio_features = get_audio_features(sp, ids)
df = pd.DataFrame(audio_features)

df.to_csv('../data/audio_featrues.csv', index=False)

In [23]:
df = pd.read_csv('../data/new_songs.csv')

# 'id' 컬럼만 추출
ids = df['id'].tolist()

# 트랙은 50개씩 호출가능
def chunk_ids(ids, chunk_size=50):
    for i in range(0, len(ids), chunk_size):
        yield ids[i:i + chunk_size]

In [24]:
# API로 audio features 호출
def get_track_info(sp, ids):
    track_info = []
    id_chunks = list(chunk_ids(ids))
    total_chunks = len(id_chunks)

    for idx, chunk in enumerate(id_chunks):
        rate_limiter.acquire()  # 호출 전에 RateLimiter를 통해 제한 관리
        try:
            response = sp.tracks(chunk)
            tracks = response['tracks']
            if tracks:
                track_info.extend(tracks)
            else:
                print(f"Chunk {idx+1}/{total_chunks}: Received empty tracks.")
            print(f"Chunk {idx+1}/{total_chunks} processed.")
        except Exception as e:
            print(f"Error fetching track info for chunk {idx+1}/{total_chunks}: {e}")
            # 필요에 따라 재시도 로직 추가 가능
    return track_info


In [None]:
# Audio features 호출
track_infos = get_track_info(sp, ids)
df1 = pd.DataFrame(track_infos)

df1.to_csv('../data/track_infos.csv', index=False)

In [39]:
df = pd.read_csv('../data/spotify_songs.csv')
df1 = pd.read_csv('../data/audio_featrues.csv')
df2 = pd.read_csv('../data/track_infos.csv')

In [40]:

# 필요한 컬럼 생성
df2['artists'] = df2['artists'].apply(ast.literal_eval)
df2['album'] = df2['album'].apply(ast.literal_eval)

df2['track_artist'] = df2['artists'].apply(lambda x: ', '.join([artist['name'] for artist in x]))
df2['track_album_id'] = df2['album'].apply(lambda x: x['id'])
df2['track_album_name'] = df2['album'].apply(lambda x: x['name'])
df2['track_album_release_date'] = df2['album'].apply(lambda x: x['release_date'])

df1.rename(columns={'id': 'track_id'}, inplace=True)
df2.rename(columns={'id': 'track_id', 'name': 'track_name', 'popularity': 'track_popularity'}, inplace=True)

# 필요한거 골라서 합치기
df1_selected = df1[['track_id', 'danceability', 'energy', 'key', 'loudness', 'mode',
                    'speechiness', 'acousticness', 'instrumentalness', 'liveness',
                    'valence', 'tempo', 'duration_ms']]

df2_selected = df2[['track_id', 'track_name', 'track_artist', 'track_popularity',
                    'track_album_id', 'track_album_name', 'track_album_release_date']]

df_combined = pd.merge(df1_selected, df2_selected, on='track_id', how='inner')

# 원본데이터 수정
drop_col = ['playlist_id', 'playlist_name', 'playlist_genre', 'playlist_subgenre']
df.drop(columns=drop_col, inplace=True)

# 순서 맞추기
df_combined = df_combined[df.columns]

# 원본 뒤에 붙이고 저장하기
df_merged = pd.concat([df, df_combined], ignore_index=True)
df_merged.to_csv('../data/spotify_songs_new.csv')

# print(df.shape[0] + df_combined.shape[0])
# print(df_merged.shape[0])

In [34]:
df_combined.columns

Index(['track_id', 'danceability', 'energy', 'key', 'loudness', 'mode',
       'speechiness', 'acousticness', 'instrumentalness', 'liveness',
       'valence', 'tempo', 'duration_ms', 'track_name', 'track_artist',
       'track_popularity', 'track_album_id', 'track_album_name',
       'track_album_release_date'],
      dtype='object')

In [36]:
drop_col = ['playlist_id', 'playlist_name', 'playlist_genre', 'playlist_subgenre']
df.drop(columns=drop_col, inplace=True)

df.columns

Index(['track_id', 'track_name', 'track_artist', 'track_popularity',
       'track_album_id', 'track_album_name', 'track_album_release_date',
       'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness',
       'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
       'duration_ms'],
      dtype='object')

In [None]:
# client_credentials_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
# sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)


# release_date = '2021-01-01'
# query = f'release:{release_date}'
# # song = 'dynamite'
# track_info = sp.search(q=query, type='track')
# track_info

In [None]:
# import pprint
# pprint.pprint(track_info)

In [None]:
# class Spotify_audio_features:
#     def __init__(self):

#         client_credentials_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
#         self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

#     def get_features(self, song):
#         # get track id information
#         track_info = self.sp.search(q=song, type='track')
#         track_id = track_info["tracks"]["items"][0]["id"]

#         # get audio_feature
#         features = self.sp.audio_features(tracks=[track_id])
#         acousticness = features[0]["acousticness"]
#         danceability = features[0]["danceability"]
#         energy = features[0]["energy"]
#         liveness = features[0]["liveness"]
#         loudness = features[0]["loudness"]
#         valence = features[0]["valence"]
#         mode = features[0]["mode"]

#         result = {"acousticness" : acousticness,
#                     "danceability" : danceability,
#                     "energy" : energy,
#                     "liveness" : liveness,
#                     "loudness" : loudness,
#                     "valence" : valence,
#                     "mode" : mode}

#         return result

# spotify_audio = Spotify_audio_features()

# # get_features 메서드 호출
# song1 = spotify_audio.get_features('dynamite')

# song1

In [None]:


# auth_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
# sp = spotipy.Spotify(auth_manager=auth_manager)

# # API 호출 시 Rate Limiting을 체크하고 처리하는 함수
# def safe_api_call(api_call, *args, **kwargs):
#     while True:
#         try:
#             # Spotify API 호출
#             return api_call(*args, **kwargs)
#         except SpotifyException as e:
#             if e.http_status == 429:
#                 # Retry-After 헤더 값 확인
#                 retry_after = int(e.headers.get("Retry-After", 10))
#                 print(f"Rate limit hit. Retrying after {retry_after} seconds.")
#                 time.sleep(retry_after)  # 지정된 시간만큼 대기 후 다시 시도
#             else:
#                 raise e  # 다른 예외 발생 시 에러 다시 발생
#         except HTTPError as e:
#             print(f"HTTPError occurred: {e}")
#             raise e

# # 특정 날짜에 발매된 트랙을 검색하여 필요한 정보를 DataFrame으로 정리하는 함수
# def search_tracks_by_release_date(sp, release_date):
#     tracks = []
#     limit = 50
#     offset = 0
    
#     while True:
#         # 특정 날짜에 발매된 앨범을 쿼리
#         query = f'release:{release_date}'
#         results = safe_api_call(sp.search, q=query, type='album', limit=limit, offset=offset)
#         albums = results['albums']['items']
        
#         if not albums:
#             break  # 더 이상 결과가 없으면 중단

#         # 각 앨범에서 트랙 정보를 가져오기
#         for album in albums:
#             album_id = album['id']
#             album_name = album['name']
#             album_release_date = album['release_date']

#             # 앨범의 트랙 정보 가져오기
#             album_tracks = safe_api_call(sp.album_tracks, album_id)['items']
            
#             for track in album_tracks:
#                 track_id = track['id']
#                 track_name = track['name']
#                 track_artists = ', '.join([artist['name'] for artist in track['artists']])
#                 track_popularity = safe_api_call(sp.track, track_id)['popularity']

#                 # 트랙의 오디오 피처 (음악 분석 정보)
#                 audio_features = safe_api_call(sp.audio_features, track_id)[0]
                
#                 if audio_features:
#                     # 필요한 정보를 딕셔너리로 저장
#                     track_info = {
#                         'track_id': track_id,
#                         'track_name': track_name,
#                         'track_artist': track_artists,
#                         'track_popularity': track_popularity,
#                         'track_album_id': album_id,
#                         'track_album_name': album_name,
#                         'track_album_release_date': album_release_date,
#                         'playlist_name': None,  # 필요하다면 추가적으로 연관된 Playlist API를 호출해 가져올 수 있음
#                         'playlist_id': None,    # 필요하다면 연관된 Playlist 정보 추가
#                         'playlist_genre': None, # 필요하다면 장르를 가져오도록 수정 가능
#                         'playlist_subgenre': None,
#                         'danceability': audio_features['danceability'],
#                         'energy': audio_features['energy'],
#                         'key': audio_features['key'],
#                         'loudness': audio_features['loudness'],
#                         'mode': audio_features['mode'],
#                         'speechiness': audio_features['speechiness'],
#                         'acousticness': audio_features['acousticness'],
#                         'instrumentalness': audio_features['instrumentalness'],
#                         'liveness': audio_features['liveness'],
#                         'valence': audio_features['valence'],
#                         'tempo': audio_features['tempo'],
#                         'duration_ms': audio_features['duration_ms']
#                     }
#                     tracks.append(track_info)
        
#         offset += limit  # 다음 배치를 가져오기 위해 offset 증가

#     # DataFrame으로 변환
#     return pd.DataFrame(tracks)

# # 발매일 설정 (예: 2020년 2월 2일)
# release_date = '2020-02-02'
# df_tracks = search_tracks_by_release_date(sp, release_date)

# # DataFrame 출력
# print(df_tracks)

In [17]:
# def get_access_token(client_id, client_secret):
#     """스포티파이에서 액세스 토큰을 가져옵니다."""
#     url = 'https://accounts.spotify.com/api/token'
#     headers = {'Content-Type': 'application/x-www-form-urlencoded'}
#     data = {'grant_type': 'client_credentials'}

#     while True:
#         try:
#             response = requests.post(url, headers=headers, data=data, auth=(client_id, client_secret))
#             if response.status_code == 429:
#                 retry_after = int(response.headers.get('Retry-After', 10))
#                 print(f"액세스 토큰을 가져오는 중에 레이트 리미트에 도달했습니다. {retry_after}초 후에 다시 시도합니다...")
#                 time.sleep(retry_after)
#                 continue
#             response.raise_for_status()
#             break
#         except requests.exceptions.HTTPError as e:
#             print(f"액세스 토큰 요청 중 에러 발생: {e}")
#             if response.status_code == 429:
#                 retry_after = int(response.headers.get('Retry-After', 10))
#                 print(f"레이트 리미트에 도달했습니다. {retry_after}초 후에 다시 시도합니다...")
#                 time.sleep(retry_after)
#                 continue
#             else:
#                 raise  # 다른 에러는 다시 발생시킵니다.

#     return response.json()['access_token']

# def search_tracks_by_date(access_token, target_date):
#     """특정 날짜에 발매된 트랙을 검색합니다."""
#     url = 'https://api.spotify.com/v1/search'
#     headers = {'Authorization': f'Bearer {access_token}'}
#     query = 'year:2020-02'  # 2020년에 발매된 트랙 검색
#     tracks_on_date = []
#     limit = 50  # 한 번에 가져올 수 있는 최대 아이템 수
#     offset = 0  # 검색 결과의 시작 지점

#     while offset < 1000:  # 최대 1000개의 결과까지 가져올 수 있습니다
#         params = {
#             'q': query,
#             'type': 'track',
#             'limit': limit,
#             'offset': offset
#         }

#         while True:
#             try:
#                 response = requests.get(url, headers=headers, params=params)
#                 if response.status_code == 429:
#                     retry_after = int(response.headers.get('Retry-After', 10))
#                     print(f"레이트 리미트에 도달했습니다. {retry_after}초 후에 다시 시도합니다...")
#                     time.sleep(retry_after)
#                     continue
#                 response.raise_for_status()
#                 break
#             except requests.exceptions.HTTPError as e:
#                 print(f"트랙 검색 중 에러 발생: {e}")
#                 if response.status_code == 429:
#                     retry_after = int(response.headers.get('Retry-After', 10))
#                     print(f"레이트 리미트에 도달했습니다. {retry_after}초 후에 다시 시도합니다...")
#                     time.sleep(retry_after)
#                     continue
#                 else:
#                     raise  # 다른 에러는 다시 발생시킵니다.

#         data = response.json()
#         tracks = data.get('tracks', {}).get('items', [])

#         if not tracks:
#             break  # 더 이상의 결과가 없으면 종료

#         for track in tracks:
#             release_date = track['album']['release_date']
#             release_precision = track['album']['release_date_precision']
#             if release_precision == 'day' and release_date == target_date:
#                 track_info = {
#                     'name': track['name'],
#                     'artists': ', '.join(artist['name'] for artist in track['artists']),
#                     'release_date': release_date
#                 }
#                 tracks_on_date.append(track_info)

#         offset += limit  # 다음 페이지로 이동

#     return tracks_on_date

# def main():
#     access_token = get_access_token(client_id, client_secret)
#     target_date = '2020-02-04'  # 검색할 발매일

#     # 결과를 특정 변수에 저장합니다.
#     search_results = search_tracks_by_date(access_token, target_date)

#     # 필요에 따라 결과를 활용합니다.
#     # 예를 들어, 결과를 출력하려면 아래 코드를 사용하세요.
#     for idx, track in enumerate(search_results, 1):
#         print(f"{idx}. {track['name']} by {track['artists']} (Released on {track['release_date']})")

# if __name__ == "__main__":
#     main()

In [None]:
# import pprint

# pprint.pprint(search_results)