In [None]:
from spotdl import Spotdl
from spotdl.utils.config import DEFAULT_CONFIG
from spotdl.utils.search import get_search_results, get_simple_songs, parse_query
from spotdl.utils.search import get_simple_songs
from spotdl.download.downloader import Downloader
from yt_dlp import YoutubeDL

sp_client = Spotdl(
    client_id=DEFAULT_CONFIG["client_id"],
    client_secret=DEFAULT_CONFIG["client_secret"],
    user_auth=DEFAULT_CONFIG["user_auth"],
    cache_path=DEFAULT_CONFIG["cache_path"],
    no_cache=True,
    headless=True,
)

yt_client = Downloader(settings={"simple_tui": True})
dl_client = YoutubeDL({'format': 'bestaudio/best'})

In [None]:
from dataclasses import dataclass
from io import BytesIO, TextIOWrapper
import numpy as np
import numpy.typing as npt
import requests
from pyparsing import Any
from os import path, makedirs
import hashlib
import logging
import json
from soundfile import read
from tqdm.notebook import tqdm



def mkmd5(string: str):
    return hashlib.md5(string.encode()).hexdigest()

@dataclass
class Metadata:
    title: str
    artist: str | None
    cover: str | None
    album: str | None
    year: str | None
    genre: str | None

@dataclass
class AnalyzedTrack:
    meta: Metadata
    sample_rate: int
    waveform: npt.NDArray
    downbeats: list[int]
    best_downbeats: list[int]
    bpm: int  


class Cache:
    name: str
    
    def __init__(self, name: str):
        self.name = name
        self.check_cache_exists()
    
    @staticmethod
    def load(file: TextIOWrapper):
        return json.load(file)

    @staticmethod
    def dump(data, file: TextIOWrapper):
        return json.dump(data, file, ensure_ascii=False)


    def check_cache_exists(self):
        if not path.isdir(path.join("./.cache/", self.name)):
            logging.debug(f"Creating cache {path.join('./.cache/', self.name)}")
            makedirs(path.join("./.cache/", self.name))

    def get_cache_filename(self, cache_key: str) -> str:
        return path.join("./.cache/", self.name, f"{mkmd5(cache_key)}.cache")

    def get(self, key) -> dict[str, Any] | None:
        cache_filename = self.get_cache_filename(key)
        if not path.exists(cache_filename):
            return None
        logging.debug(f"{self.name.upper()}: loading {cache_filename}")
        with open(cache_filename, 'r') as file:
            return self.load(file)

    def set(self, key, data: dict[str, Any]):
        cache_filename = self.get_cache_filename(key)
        logging.debug(f"{self.name.upper()}: saving {cache_filename}")
        with open(cache_filename, 'w') as file:
            self.dump(data, file)
            
    
class AudioCache(Cache):
    @staticmethod
    def load(file):
        return tuple(np.load(file)) #type: ignore
    
    @staticmethod
    def dump(data, file):
        np.save(file, np.array(data)) #type: ignore
    
class Song():
    yt_link: str
    spotify_link: str
    _audio_cache: tuple[int, npt.NDArray] | None = None
    _spotify_cache: dict[str, Any] | None = None
    _yt_cache: dict[str, Any] | None = None
    
    __yt_cache_instance = Cache("yt")
    __spotify_cache_instance = Cache("spotify")
    __audio_cache_instance = AudioCache("audio")
    
    def __init__(self, yt_url, spotify_url):
        self.yt_link = yt_url
        self.spotify_link = spotify_url
        
        if not self._yt_cache:
            if (cached := self.__yt_cache_instance.get(self.yt_link)):
                self._yt_cache = cached
            else:
                self._yt_cache = self._download_yt_meta()
                self.__yt_cache_instance.set(self.yt_link, self._yt_cache)
        if not self._spotify_cache:
            if (cached := self.__spotify_cache_instance.get(self.yt_link)):
                self._spotify_cache = cached
            else:
                self._spotify_cache = self._download_spotify_meta()
                self.__spotify_cache_instance.set(self.yt_link, self._spotify_cache)

        logging.info(f"loaded {self._spotify_cache['name']}")
        
        # print(self._yt_cache["formats"])
        # print(self.yt_link, self.spotify_link, self._spotify_cache, self._yt_cache)
    
    @staticmethod
    def _check_link_valid(url: str):
        req = requests.head(url)
        return req.status_code < 400
            
    
    @classmethod
    def find(cls, spotify_link = None, youtube_link = None, query = None):
        if spotify_link and youtube_link:
            logging.info("Loading custom song config")
            return cls(spotify_link, youtube_link)
        if spotify_link:
            logging.info("Loading from spotify")
            return cls.from_spotify_link(spotify_link)
        if youtube_link:
            logging.info("Loading from youtube")
            return cls.from_yt_link(youtube_link)
        if query:
            logging.info(f"Searching {query}")
            return cls.from_search(query)
    
    @classmethod
    def from_spotify_link(cls, spotify_link: str):
        if (_cache_spotify := cls.__spotify_cache_instance.get(spotify_link)):
            cls._spotify_cache = _cache_spotify
            return cls(_cache_spotify["download_url"], _cache_spotify["url"])

        spotify_song = sp_client.search([spotify_link])[0]
        yt_url = yt_client.search(spotify_song)
        spotify_song.download_url = yt_url
        cls._spotify_cache = spotify_song.json
        cls.__spotify_cache_instance.set(spotify_song.url, spotify_song.json)
        return cls(yt_url, spotify_song.url)
    
    @classmethod
    def from_yt_link(cls, youtube_link: str):
        if (_cache_yt := cls.__yt_cache_instance.get(youtube_link)):    
            cls._youtube_cache = _cache_yt
            return cls(_cache_yt["url"], _cache_yt["CUSTOM__spotify_url"])
        
        yt_song = dl_client.extract_info(youtube_link, download=False)
        cls._yt_cache = yt_song
        
        if yt_song:
            title = f"{yt_song.get('artist', '')} {yt_song.get('track', yt_song['fulltitle'])}"
            spotify_song = sp_client.search([title])[0]
            cls._spotify_cache = spotify_song.json
            spotify_song.download_url = youtube_link
            yt_song["CUSTOM__spotify_url"] = spotify_song.url
            cls.__spotify_cache_instance.set(spotify_song.url, spotify_song.json)
            cls.__yt_cache_instance.set(youtube_link, yt_song)
            return cls(youtube_link, spotify_song.url)
        raise Exception("this url is not valid")
    
    @classmethod
    def from_search(cls, query: str):
        return cls.from_spotify_link(query) # dirty way to search for song
    
    def _download_audio(self) -> tuple[int, npt.NDArray]:
        if not self._yt_cache:
            raise Exception("youtube cache not initialized")
                    
        audio_url = self._yt_cache["url"]
        if not self._check_link_valid(audio_url):
            self._yt_cache = self._download_yt_meta()
            self.__yt_cache_instance.set(self.yt_link, self._yt_cache)
            audio_url = self._yt_cache["url"]
        
        logging.info(f"Downloading {self._yt_cache['fulltitle']}")
        response = requests.get(audio_url, stream=True)
        total_size = int(response.headers.get('content-length', 0))
        progress_bar = tqdm(total=total_size, unit='B', unit_scale=True)
        buffer = BytesIO()
        for data_chunk in response.iter_content(chunk_size=1024):
            buffer.write(data_chunk)
            progress_bar.update(len(data_chunk))
        # Close the progress bar
        progress_bar.close()
        logging.info(f"Downloaded {self._yt_cache['fulltitle']}")
        print(read(buffer))
        return (0, np.array([]))
    
    def _download_spotify_meta(self) -> dict[str, Any]:
        spotify_song = sp_client.search([self.spotify_link])[0]
        return spotify_song.json
    
    def _download_yt_meta(self) -> dict[str, Any]:
        yt_song = dl_client.extract_info(self.yt_link, download=False)
        if yt_song:
            return yt_song
        raise Exception("this url is not valid")
    
    # def analyze(self) -> AnalyzedTrack:
    #     return AnalyzedTrack() 
    
logging.basicConfig(level=logging.INFO)


In [None]:
Song._check_link_valid("https://rr1---sn-2va3vhuxa-f5f6.googlevideo.com/videoplayback?expire=1706415413&ei=1YC1ZanSIuOni9oPqoe3uA0&ip=31.0.144.130&id=o-AN63kiccH-fWO3YZ223WCdaWpjPpnbddB4NHGLC1nlsT&itag=251&source=youtube&requiressl=yes&xpc=EgVo2aDSNQ%3D%3D&mh=PJ&mm=31%2C29&mn=sn-2va3vhuxa-f5f6%2Csn-f5f7lnl7&ms=au%2Crdu&mv=m&mvi=1&pl=19&gcr=pl&initcwndbps=446250&vprv=1&svpuc=1&mime=audio%2Fwebm&gir=yes&clen=7251122&dur=423.621&lmt=1626989482374168&mt=1706393598&fvip=3&keepalive=yes&fexp=24007246&c=ANDROID&txp=2311224&sparams=expire%2Cei%2Cip%2Cid%2Citag%2Csource%2Crequiressl%2Cxpc%2Cgcr%2Cvprv%2Csvpuc%2Cmime%2Cgir%2Cclen%2Cdur%2Clmt&sig=AJfQdSswRQIhAMowiZQrDWpOOdnpQfYd557tIiT-4s6_V5Vw_ZUosqy2AiAgfPiqvsRhAikKmeQVlj-_URiS12dahIC_i5hMG4RjmA%3D%3D&lsparams=mh%2Cmm%2Cmn%2Cms%2Cmv%2Cmvi%2Cpl%2Cinitcwndbps&lsig=AAO5W4owRgIhAKqcJvUpR6IqGEbZFz1tyHMLN_TxPfCxOYpR3k0hfhEOAiEA95DJbQjYRq_2bgRBqltt6aqWvKT_fj_R4KSBiuZJbfU%3D")

In [None]:
song = (Song.find(spotify_link="https://open.spotify.com/track/0ddC48e0XVsjoczyIYiGCr"))._download_audio()

In [10]:
import requests
from tqdm import tqdm
from typing import Any
from io import BytesIO
class Downloader:
    request_headers: dict[str, Any] = {'Accept-Encoding': 'identity', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-us,en;q=0.5', 'Sec-Fetch-Mode': 'navigate'}
    
    @classmethod
    def download(cls, youtube_url:str):
        res = requests.get(youtube_url, stream=True, headers=cls.request_headers)
        total_size = int(res.headers.get('content-length', 0))
        progress_bar = tqdm(total=total_size, unit='B', unit_scale=True)
        buffer = BytesIO()
        for data_chunk in res.iter_content(chunk_size=1024):
            buffer.write(data_chunk)
            progress_bar.update(len(data_chunk))
        # Close the progress bar
        progress_bar.close()

Downloader.download("https://rr7---sn-2va3vhuxa-f5f6.googlevideo.com/videoplayback?expire=1706505928&ei=aOK2ZayFIPLEi9oPov-GsA4&ip=31.0.144.130&id=o-AJMmHGChk83Vfj_G23FvPG5le_D2lvHTSmV6A1HTYXiC&itag=251&source=youtube&requiressl=yes&xpc=EgVo2aDSNQ%3D%3D&mh=iQ&mm=31%2C29&mn=sn-2va3vhuxa-f5f6%2Csn-f5f7kn7e&ms=au%2Crdu&mv=m&mvi=7&pl=19&initcwndbps=638750&vprv=1&svpuc=1&mime=audio%2Fwebm&gir=yes&clen=3051134&dur=179.421&lmt=1701262428803223&mt=1706483837&fvip=3&keepalive=yes&fexp=24007246&c=ANDROID&txp=6218224&sparams=expire%2Cei%2Cip%2Cid%2Citag%2Csource%2Crequiressl%2Cxpc%2Cvprv%2Csvpuc%2Cmime%2Cgir%2Cclen%2Cdur%2Clmt&sig=AJfQdSswRgIhALgwsuJ-GAfud9ejgSp44bmEREX2x1Kdm1LhRoUTDwD9AiEA4wwcsOGi9jurpCEsfIB51DGKxnIdqC6GP5CHTuUgQaM%3D&lsparams=mh%2Cmm%2Cmn%2Cms%2Cmv%2Cmvi%2Cpl%2Cinitcwndbps&lsig=AAO5W4owRAIgDyDRe0ceSEoRRXAUcJVHEiLKGF90kC8YMkiue9NjOm0CIGHIwzsI9Mqex53bVBI18D9U6VVN-J1z0CKzJAzwDfnC")

  0%|          | 0.00/3.05M [00:00<?, ?B/s]

NameError: name 'BytesIO' is not defined

KeyboardInterrupt: 