In [27]:
!pip install pytube
!pip install spotipy
!pip install ytmusicapi

Reading package lists... Done
Building dependency tree       
Reading state information... Done
ffmpeg is already the newest version (7:4.2.7-0ubuntu0.1).
0 upgraded, 0 newly installed, 0 to remove and 64 not upgraded.


In [19]:
from ytmusicapi import YTMusic
import ytmusicapi
from pprint import pprint
import os
from pytube import YouTube
import urllib.request
import requests
import string
from spotipy.client import Spotify
from spotipy import SpotifyOAuth
from spotipy.oauth2 import SpotifyOauthError, SpotifyClientCredentials
import pandas as pd
import numpy as np
import json
import concurrent.futures
import time
import threading
from pydub import AudioSegment
from IPython.display import Audio
from dotenv import load_dotenv
import multiprocessing

In [31]:
audio = AudioSegment.from_file('/kaggle/working/songs/9rj1NPEWOH4.mp4')
wav_path = 'test' + ".wav"
audio.export(wav_path, format="wav")

<_io.BufferedRandom name='test.wav'>

# Scrapper Class Definition

In [52]:
class YTScraper:
  def __init__(
    self,
    token_file_path = 'oauth.json',
  ):
    self.yt_music = YTMusic(token_file_path)
    
  def get_search_results(self,search_str):
    return self.yt_music.search(search_str,filter = 'songs')
  
  def get_lyrics(self, video_id):
    video=self.yt_music.get_watch_playlist(videoId=video_id)
    lyrics_id = video['lyrics']
    lyrics = None
    if lyrics_id:
      lyrics=self.yt_music.get_lyrics(lyrics_id)
    return lyrics
  
  def download_song_by_video_id(
    self,
    video_id,
    output_path=None,
  ):
    if output_path is not None:
      os.makedirs(output_path,exist_ok=True)
    else:
      output_path = ""
    yt = YouTube(f"https://youtube.com/watch?v={video_id}", use_oauth = True, allow_oauth_cache=True)
    temp_file = os.path.join(output_path,f"{video_id}.mp4")
    yt.streams.filter(only_audio=True).first().download(output_path=output_path, filename=f"{video_id}.mp4")
    if os.path.exists(temp_file):
      print(f"Downloaded successfully: {temp_file}")
    return os.path.join(output_path,f"{video_id}.mp4")
  
  def scrap_by_search(self, search_str, download_output_file_path = None):
    search_result = self.get_search_results(search_str=search_str)
    video_id = None
    for item in search_result:
      if item['resultType'] in ['song'] and item['category'] == 'Songs':
        video_id = item['videoId']
        break 
    if (video_id):
      lyrics = self.get_lyrics(video_id=video_id)
      if not lyrics:
        lyrics = {'lyrics':None,'source':None}
      download_path = self.download_song_by_video_id(video_id=video_id, output_path=download_output_file_path)
    else:
      raise Exception("Null Video Id")
    return {
      "lyrics" : lyrics['lyrics'],
      "audio_file_path" : download_path
    }
  
class SpotifyScraper:
    def __init__(
        self,
        client_id,
        client_secret,
        redirect_uri,
        scope = "user-library-read playlist-read-private playlist-read-collaborative"
        
    ):
        self.yt_scraper = YTScraper()
        self._credentials = SpotifyClientCredentials(client_id = client_id, client_secret = client_secret)
        self.sp = Spotify(auth = self.get_access_token())
            
    def get_access_token(self):
        # Get access token
        access_token = self._credentials.get_access_token(as_dict = False)
        if not access_token:
            raise Exception("Access Token Not Found")
        return access_token

    def get_auth_header(self):
        return {"Authorization": "Bearer " + self.get_access_token()}
    # Function to fetch user playlists
    
    def get_user_playlists(self):
        print("Retrieving user playlists...")
        headers = self.get_auth_header()
        response = requests.get("https://api.spotify.com/v1/me/playlists", headers=headers)
        response_json = response.json()
        # for item in response_json["items"]:
        #     playlists[item["name"]] = item["id"]
        print("Playlists retrieved successfully.")
        return response_json
    
    @staticmethod
    def sanitize_filename(filename):
        valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
        return ''.join(c for c in filename if c in valid_chars)

    def get_track_info_by_id(self, track_id):
        headers = self.get_auth_header()
        response = requests.get(f"https://api.spotify.com/v1/tracks/{track_id}", headers=headers)
        if not response.ok:
            raise Exception(f"{response.status_code} : {response.text}")
        response_json = response.json()
        return response_json
    
    def get_playlist_info_by_id(self, playlist_id):
        headers = self.get_auth_header()
        response = requests.get(f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", headers=headers)
        if not response.ok:
            raise Exception(f"{response.status_code} : {response.text}")
        response_json = response.json()
        return response_json
    
    def get_several_track_info_by_id(self, track_ids_str):
        headers = self.get_auth_header()
        response = requests.get(f"https://api.spotify.com/v1/tracks?ids={track_ids_str}", headers=headers)
        if not response.ok:
            raise Exception(f"{response.status_code} : {response.text}")
        response_json = response.json()
        return response_json
    
    def get_several_artist_info_by_id(self, artist_ids_str):
        headers = self.get_auth_header()
        response = requests.get(f"https://api.spotify.com/v1/artists?ids={artist_ids_str}", headers=headers)
        if not response.ok:
            raise Exception(f"{response.status_code} : {response.text}")
        response_json = response.json()
        return response_json
    
    def get_several_audio_feature_by_id(self, track_ids_str):
        headers = self.get_auth_header()
        response = requests.get(f"https://api.spotify.com/v1/audio-features?ids={track_ids_str}", headers=headers)
        if not response.ok:
            raise Exception(f"{response.status_code} : {response.text}")
        response_json = response.json()
        return response_json
    
    def destructure_artist_data(self,artists_list):
        artist_infos = []
        artist_genres = []
        artist_popularities = []
        artist_followers = []
        for artist_obj in artists_list:
            artist_infos.append({
                "name": artist_obj["name"],
                "id": artist_obj["id"],
                "genres": artist_obj.get("genres", []),
                "popularity": artist_obj["popularity"],
                "followers": artist_obj["followers"]["total"]
            })
            artist_genres.extend(artist_obj.get("genres", []))
            artist_popularities.append(artist_obj["popularity"])
            artist_followers.append(artist_obj["followers"]["total"])
        return {
            'artist_infos' : artist_infos,
            'artist_genres' : artist_genres,
            'artist_popularities' : artist_popularities,
            'artist_followers' : artist_followers,
        }
        
    def get_playlist_by_id(self,playlist_id : str):
        return self.sp.playlist_tracks(playlist_id = playlist_id)    
    
    def construct_track_info_dict(self,track_json = None, audio_features_json = None, artist_json = None): 
        
        most_popular_artist = max(artist_json['artist_infos'], key=lambda x: x["popularity"])
        
        search_str = f"{track_json['name']} {most_popular_artist['name']}"
        
        lyrics_audio_data = self.yt_scraper.scrap_by_search(search_str=search_str, download_output_file_path = "songs")
        
        track_info = {
        "track_name": track_json["name"],
        "track_id": track_json["id"],
        "track_number": track_json["track_number"],
        "disc_number": track_json["disc_number"],
        "duration_ms": track_json["duration_ms"],
        "explicit": track_json["explicit"],
        "popularity": track_json["popularity"],
        "preview_url": track_json["preview_url"],
        "isrc": track_json["external_ids"].get("isrc"),
        "album_name": track_json["album"]["name"],
        "album_id": track_json["album"]["id"],
        "album_type": track_json["album"]["album_type"],
        "album_total_tracks": track_json["album"]["total_tracks"],
        "album_release_date": track_json["album"]["release_date"],
        "album_release_date_precision": track_json["album"]["release_date_precision"],
        "album_images": track_json["album"]["images"],
        "popular_artist": most_popular_artist["name"],
        "popular_artist_id": most_popular_artist["id"],
        "artist_names": [artist["name"] for artist in track_json["artists"]],
        "artist_ids": [artist["id"] for artist in track_json["artists"]],
        "combined_genres": list(set(artist_json['artist_genres'])),
        "artist_popularity": most_popular_artist["popularity"],
        "artist_followers": most_popular_artist["followers"],
        "external_url": track_json["external_urls"]["spotify"],
        "acousticness": audio_features_json["acousticness"],
        "danceability": audio_features_json["danceability"],
        "energy": audio_features_json["energy"],
        "instrumentalness": audio_features_json["instrumentalness"],
        "key": audio_features_json["key"],
        "liveness": audio_features_json["liveness"],
        "loudness": audio_features_json["loudness"],
        "mode": audio_features_json["mode"],
        "speechiness": audio_features_json["speechiness"],
        "tempo": audio_features_json["tempo"],
        "time_signature": audio_features_json["time_signature"],
        "valence": audio_features_json["valence"],
    }
        track_info.update(lyrics_audio_data)
        return track_info
        
    def _get_single_threaded_data(self,track_obj, audio_feat_obj):
        artist_ids_list = [artist['id'] for artist in track_obj["artists"]]
        artists_id_str = ','.join(artist_ids_list[:50])
        artists_data = self.get_several_artist_info_by_id(artists_id_str)["artists"]
        destructured_artist_data = self.destructure_artist_data(artists_list=artists_data)  
        return self.construct_track_info_dict(track_obj, audio_feat_obj, destructured_artist_data)
    
    def _get_multi_threaded_data(self,tracks_json,audio_features_json, max_workers):
        data = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # List of tasks to be executed by the thread pool
            tasks = [
                executor.submit(self._get_single_threaded_data , *args) for args in zip(tracks_json, audio_features_json)
            ]
            
            for future in concurrent.futures.as_completed(tasks):
                try:
                    data.append(future.result())
                except Exception as exc:
                    print(f'Task generated an exception: {exc}') 
        return data
    
    def _get_multiprocessed_data(self,tracks_json,audio_features_json, max_workers):
        data = []
        with multiprocessing.Pool(processes=max_workers) as pool:
            # List of tasks to be executed by the thread pool
            tasks = [
                pool.apply_async(self._get_single_threaded_data , *args) for args in zip(tracks_json, audio_features_json)
            ]
            
            for task in tasks:
                # try:
                    result = task.get()
                    print(result)
                    data.append(task.get())
                # except Exception as exc:
                    # print(f'Task generated an exception: {exc}') 
        return data
    
    def get_several_track_data(self, track_ids_list : list[str], parallelization = 0, max_workers = 50):
        data = []
        for idx in range(0,len(track_ids_list),50):
            track_id_str = ",".join(track_ids_list[idx:idx+50])
            tracks_json = self.get_several_track_info_by_id(track_id_str)["tracks"]
            audio_features_json = self.get_several_audio_feature_by_id(track_ids_str=track_id_str)["audio_features"]
            if parallelization == 1:
                data.extend(self._get_multi_threaded_data(tracks_json,audio_features_json,max_workers))
            elif False and parallelization == 2:
                data.extend(self._get_multiprocessed_data(tracks_json,audio_features_json,max_workers))
            else:
                for track_obj, audio_feat_obj in zip(tracks_json, audio_features_json):
                    data.append(self._get_single_threaded_data(track_obj, audio_feat_obj))
        return data
     
      

## Loading the Environment and Authentication

### Loading in Kaggle Environment

In [13]:
import os
import json

from kaggle_secrets import UserSecretsClient
secrets = UserSecretsClient()

os.environ['KAGGLE_USERNAME'] = secrets.get_secret("KAGGLE_USERNAME")
os.environ['KAGGLE_KEY'] = secrets.get_secret("KAGGLE_KEY")
client_id = secrets.get_secret("SPOTIFY_CLIENT_ID")
client_secret = secrets.get_secret("SPOTIFY_CLIENT_SECRET")
redirect_uri = secrets.get_secret("REDIRECT_URI")
yt_music_oauth_metadata = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0",
    "Accept": "*/*",
    "Accept-Language": "en-US,en;q=0.5",
    "Content-Type": "application/json",
    "X-Goog-AuthUser": "0",
    "x-origin": "https://music.youtube.com",
    "Cookie" : secrets.get_secret("YT_MUSIC_COOKIE")
}

with open(os.path.join('/kaggle/working/','oauth.json'), 'w') as f:
    json.dump(yt_music_oauth_metadata, f)

### Loading in Local Environement

In [17]:
# Load environment variables
load_dotenv(dotenv_path='.env')
client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")
redirect_uri = os.getenv("REDIRECT_URL")
yt_music_oauth_metadata = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0",
    "Accept": "*/*",
    "Accept-Language": "en-US,en;q=0.5",
    "Content-Type": "application/json",
    "X-Goog-AuthUser": "0",
    "x-origin": "https://music.youtube.com",
    "Cookie" : os.getenv("YT_MUSIC_COOKIE")
}

with open('oauth.json', 'w') as f:
    json.dump(yt_music_oauth_metadata, f)


## Scrapper Instance

In [53]:
scrapper = SpotifyScraper(
  client_id=client_id,
  client_secret=client_secret,
  redirect_uri=redirect_uri,
)

In [46]:
playlist_items = scrapper.sp.playlist_items('2zgeKW2m3Le8hRmF4PzIsm',additional_types=['tracks'])

dict_keys(['added_at', 'added_by', 'is_local', 'primary_color', 'track', 'video_thumbnail'])

In [48]:
playlist_track_ids = []

In [49]:
for item in playlist_items['items']:
  playlist_track_ids.append(item['track']['id'])

In [40]:
playlist_items.keys()

dict_keys(['href', 'items', 'limit', 'next', 'offset', 'previous', 'total'])

In [22]:
track_ids_list = ['7MJQ9Nfxzh8LPZ9e9u68Fq',
 '1GEBsLDvJGw7kviySRI6GX',
 '0gplL1WMoJ6iYaPgMCL0gX',
 '698ItKASDavgwZ3WjaWjtz',
 '3LlmKSHR3Rs0Y3KHQLAYDk',
 '20R4HfKloPKgXDqU7UKk3x',
 '4RVwu0g32PAqgUiJoXsdF8',
 '6habFhsOp2NvshLv26DqMb',
 '18A7ha5BitZjmdHTCwXFbU',
 '75MNhvTCCKsST3YqqUiU9r',
 '1rfofaqEpACxVEHIZBJe6W',
 '5eGEc27nnhtmcOh6RC890a',
 '5PjdY0CKGZdEuoNab3yDmX',
 '6ocbgoVGwYJhOv1GgI9NsF',
 '1mavQ4WCzXSeL2Dm5DS4GQ',
 '3ebXMykcMXOcLeJ9xZ17XH',
 '36jnG0GLshZiH7oWkOq7gV',
 '3z8h0TU7ReDPLIbEnYhWZb',
 '5SDzo5YMWly9n6hVHvxPwp',
 '0ct6r3EGTcMLPtrXHDvVjc',
 '7BMO7O7ImjV8HNTH74Tshv',
 '561jH07mF1jHuk7KlaeF0s',
 '6or1bKJiZ06IlK0vFvY75k',
 '6M47gaKejso9772SKTa3yH',
 '630sXRhIcfwr2e4RdNtjKN',
 '5QDLhrAOJJdNAmCTJ8xMyW',
 '0lYBSQXN6rCTvUZvg9S0lU',
 '5wANPM4fQCJwkGd4rN57mH',
 '4ZtFanR9U6ndgddUvNcjcG',
 '6RUKPb4LETWmmr3iAEQktW',
 '1P17dC1amhFzptugyAO7Il',
 '2DB4DdfCFMw1iaR6JaR03a',
 '3o9kpgkIcffx0iSwxhuNI2',
 '0QLb1y64s617SAnnDoUZLN',
 '5amAJIEdIVtWYEi4wGp7Fn',
 '5flerg6aEao2VayZezVlgu',
 '39LLxExYz6ewLAcYrzQQyP']

In [54]:
track_data = scrapper.get_several_track_data(playlist_track_ids,parallelization=1)

Downloaded successfully: songs\2xrUg_1X_XY.mp4
Downloaded successfully: songs\gpdI00q4D9Q.mp4
Downloaded successfully: songs\SGUYRt6A3ck.mp4
Downloaded successfully: songs\DcDbKDAb7go.mp4
Downloaded successfully: songs\tPIeEy1pjGk.mp4
Downloaded successfully: songs\RDM7PGD__AM.mp4
Downloaded successfully: songs\p6w5IzenMlA.mp4
Downloaded successfully: songs\mVuSfxqpZa4.mp4
Downloaded successfully: songs\uUDWCiJZxQc.mp4
Downloaded successfully: songs\he9mKLl02to.mp4
Downloaded successfully: songs\UvynvnxZJ3Q.mp4Downloaded successfully: songs\fdz_cabS9BU.mp4

Downloaded successfully: songs\qXIIoN5X3YU.mp4
Downloaded successfully: songs\oo2qFi-Db-E.mp4
Downloaded successfully: songs\DooiOx9Osks.mp4
Downloaded successfully: songs\iIBo9mjIPr8.mp4
Downloaded successfully: songs\KEGZVv6KXL0.mp4
Downloaded successfully: songs\5NjUWx0e6gA.mp4
Downloaded successfully: songs\l9d6Ih2rzog.mp4
Downloaded successfully: songs\UFWDI_5bByk.mp4
Downloaded successfully: songs\0yaxbciy_4Y.mp4
Downloaded su

In [55]:
df = pd.DataFrame(track_data)

5

In [61]:
df.to_csv("Hundred_songs_df.csv")

In [12]:
temp = pd.read_pickle("track_data.pkl")

FileNotFoundError: [Errno 2] No such file or directory: 'track_data.pkl'