In [1]:
import os
import json
import time
import logging
import argparse
import requests
import langchain
import webbrowser
import urllib.parse
import pandas as pd

from tqdm import tqdm
from bs4 import BeautifulSoup
from pydantic import BaseModel
from urllib.parse import urlparse
from typing import List, Dict, Optional

from selenium import webdriver
from selenium.webdriver.firefox.options import Options

from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain.schema import OutputParserException
from http.server import BaseHTTPRequestHandler, HTTPServer

In [3]:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logging.getLogger("langchain").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)

In [56]:
def get_file_from_gist(gist_id: str, gist_token: str, file_name: str) -> Optional[Dict]:
    headers = {'Authorization': f'token {gist_token}'}
    url = f'https://api.github.com/gists/{gist_id}'

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()

        data = response.json()
        files = data.get('files', {})

        if file_name in files:
            return json.loads(files[file_name]['content'])
        else:
            return {}

    except requests.exceptions.RequestException as e:
        print(f"Erro ao fazer a requisição: {e}")
    except json.JSONDecodeError as e:
        print(f"Erro ao decodificar o JSON: {e}")
    except KeyError as e:
        print(f"Erro: Chave ausente na resposta: {e}")

    return None

In [57]:
class App():
    def __init__(self, channel_url: str, full: bool, output: str, database: Dict[str, str], temperature: float):            
        self.channel_url = channel_url
        self.channel_name = channel_url.rsplit("/")[-2].replace("@", "")
        self.full = full
        self.output = output
        self.database = database
        self.titles = self._get_content()
        self.model_name = "llama-3.3-70b-versatile"
        self.temperature = temperature
        self.chat = self._instance_model()
        self.user_id = os.environ['SPOTIFY_USER_ID']
        self.redirect_uri = os.environ.get('SPOTIFY_REDIRECT_URI')
        self.client_id = os.environ.get('SPOTIFY_CLIENT_ID')
        self.client_secret = os.environ.get('SPOTIFY_CLIENT_SECRET')
        self.spotify_api = 'https://api.spotify.com/v1'
        self.spotify_auth_code = self._get_spotify_auth_code()
        self.spotify_access_token = self._get_spotify_access_token()
        self.headers = {'Authorization': f'Bearer {self.spotify_access_token}'}
        self.playlist_url = f'{self.spotify_api}/users/{self.user_id}/playlists'
        self.playlist_id = self._get_playlist_id()
        self.gist_token = os.environ.get('GIST_ACCESS_TOKEN')
        self.now = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime())
    
    def _instance_model(self) -> callable:
        class MusicDetails(BaseModel):
            artist: str
            track: str
            title: str
            
        llm = ChatGroq(model_name=self.model_name, temperature=self.temperature)
        parser = JsonOutputParser(pydantic_object=MusicDetails)
        prompt = ChatPromptTemplate.from_messages([
            ("system", """"You are a JSON extraction assistant. Always respond with a valid JSON using the structure below.\n
            If there are no explicit mentions of a song (artist name and/or track title), return unknown for the fields.

                {{
                    "artist": "artist name here",
                    "track": "track name here",
                    "title": "full title here, artist + track"
                }}"""),
            ("user", "{input}")
        ])
        
        return prompt | llm | parser
       
        
    def _identify(self, title: str) -> bool:
        for data in self.database:
            if 'original_title' in data.keys() and title == data['original_title']:
                return True
        return False
        

    def _parse(self, html: str) -> List[str]:
        soup = BeautifulSoup(html, "html.parser")
        titles_elements = soup.find_all(id="video-title")
    
        titles = [title.get_text(strip=True) for title in titles_elements]
        
        return [title for title in titles if not self._identify(title)]


    def _get_content(self) -> List[str]:

        if not self.channel_url.startswith('http'):
            raise ValueError(f"O endereço informado é invalido. Endereço fornecido: {self.channel_url}")
        
        options = Options()
        options.add_argument("--headless")

        with webdriver.Firefox(options=options) as driver:
            driver.get(self.channel_url)
            time.sleep(3)

            if self.full:
                last_height = driver.execute_script("return document.documentElement.scrollHeight")

                while True:
                    driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
                    time.sleep(2)
                    new_height = driver.execute_script("return document.documentElement.scrollHeight")

                    if new_height == last_height:
                        break
                    last_height = new_height

            html = driver.page_source

        return self._parse(html)
    
    
    def _get_wait_time(self, error: str) -> int:
        try:
            seconds = str(error).split('Please try again in ')[-1].split('s', 1)[0]
            seconds = float(seconds.replace('.', '').replace('m', '.'))
            return int(seconds * 60)
        except Exception:
            logging.warning("Failed to extract wait time from error message.")
            return 120
    
    
    def _ask(self, description: str) -> Dict[str, str]:
        default_response = {
            "artist": "Unknown",
            "track": "Unknown",
            "title": "Unknown",
        }
        
        try:
            return self.chat.invoke(input=description)
        except OutputParserException:
            logging.warning(f"Failed to parse response for: {description}")
            return default_response
        except Exception as error:
            wait_time = self._get_wait_time(error)
            logging.error(f"Error occurred. Retrying in {wait_time} seconds.")
            time.sleep(wait_time)
            try:
                return self.chat.invoke(input=description)
            except Exception:
                logging.error(f"Final failure for input: {description}")
                return default_response
            
            
    def _get_new_tracks(self) -> List[Dict[str, str]]:
        new_tracks = []
        
        for title in tqdm(self.titles, desc="Extracting", ncols=80):
            music_obj = self._ask(title)

            for key in music_obj.keys():
                music_obj[key] = music_obj[key].title()

            music_obj['original_title'] = title
            music_obj['channel'] = self.channel_name
            
            new_tracks.append(music_obj)
        
        return new_tracks
    

    def _get_spotify_auth_code(self) -> str:
        url = 'https://accounts.spotify.com/authorize'
        port = urlparse(self.redirect_uri).port
        params = {
            'client_id': self.client_id,
            'response_type': 'code',
            'redirect_uri': self.redirect_uri,
            'scope': "user-read-private user-read-email playlist-modify-public playlist-modify-private"
        }

        authorization_url = f"{url}?{urllib.parse.urlencode(params)}"
        
        webbrowser.open(authorization_url)
        
        class RequestHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                query = urllib.parse.urlparse(self.path).query
                params = urllib.parse.parse_qs(query)
                if 'code' in params:
                    self.send_response(200)
                    self.end_headers()
                    self.wfile.write(b"Autorizacao concluida! Pode fechar esta janela.")
                    self.server.auth_code = params['code'][0]
                else:
                    self.send_response(400)
                    self.end_headers()
                    self.wfile.write(b"Codigo de autorizacao nao encontrado.")
        
        server = HTTPServer(('localhost', port), RequestHandler)
        print("Aguardando autorizacao...")
        server.handle_request()
        
        return getattr(server, 'auth_code', None)
    
    
    def _get_spotify_access_token(self) -> str:
        url = 'https://accounts.spotify.com/api/token'

        data = {
            'grant_type': 'authorization_code',
            'code': self.spotify_auth_code,
            'redirect_uri': self.redirect_uri,
            'client_id': self.client_id,
            'client_secret': self.client_secret,
        }

        return requests.post(url, data=data).json().get('access_token')
        

    def _get_playlist_id(self) -> str:
        user_playlists = requests.get(self.playlist_url, headers=self.headers).json()

        for playlist in user_playlists['items']:
            if 'Youtube Scrapping' in playlist['name']:
                return playlist['id']
            else:
                return requests.post(self.playlist_url, headers=self.headers, json={
                    'name': 'Youtube Scrapping',
                    'description': 'Musicas que foram retiradas do Youtube',
                    'public': True
                }).json()['id']
            
    
    def _update_database(self) -> None:
        content = json.dumps(self.database, indent=4)
        url = f'https://api.github.com/gists/{os.environ.get("GIST_ID_DATA")}'

        payload = {
            "description": f"Scrapping realizado em {self.now}",
            "files": {
                "report.json": {
                    "content": content
                }
            }
        }

        headers = {
            "Authorization": f"token {self.gist_token}",
            "Accept": "application/vnd.github.v3+json"
        }
        
        requests.patch(url, json=payload, headers=headers)


    def run(self):
        new_tracks = self._get_new_tracks()

        url = f'{self.spotify_api}/playlists/{self.playlist_id}/tracks'
        not_added = []

        for track in new_tracks:
            artist = track['artist']
            music = track['track']

            track_metadata = requests.get(f'{self.spotify_api}/search', headers=self.headers, params={
                "q": f"artist:{artist} track:{music}",
                "type": "track"
            }).json()['tracks']['items']

            if track_metadata:
                track_uri = track_metadata[0]['uri']
                new_track = track_metadata[0]['name']
                new_artist = track_metadata[0]['artists'][0]['name']

                playlist = requests.get(url, headers=self.headers).json()['items']
                
                existing_tracks = {(item['track']['name'], item['track']['artists'][0]['name']) for item in playlist}

                if (new_track, new_artist) not in existing_tracks:
                    response = requests.post(url, headers=self.headers, json={'uris': [track_uri]})
                
                    if response.status_code != 201:
                        not_added.append(track)

                self.database.append(track)
            else:
                not_added.append(track)
        
        df = pd.DataFrame(not_added)
        df.to_excel(self.output, index=False)
        self._update_database()


In [59]:
GIST_ID_CREDENTIAL = os.environ['GIST_ID_ACCESS']
GIST_ID_DATA = os.environ['GIST_ID_DATA']
GIST_TOKEN = os.environ['GIST_ACCESS_TOKEN']

In [60]:
database = get_file_from_gist(GIST_ID_DATA, GIST_TOKEN, 'report.json')
credentials = get_file_from_gist(GIST_ID_CREDENTIAL, GIST_TOKEN, "youtube-music-scrapping.json")

os.environ['SPOTIFY_USER_ID'] = credentials['user_id']
os.environ['SPOTIFY_CLIENT_ID'] = credentials['client_id']
os.environ['SPOTIFY_CLIENT_SECRET'] = credentials['client_secret']
os.environ['SPOTIFY_REDIRECT_URI'] = credentials['redirect_uri']
os.environ['GROQ_API_KEY'] = credentials['groq_api_key']


In [61]:
channel_url = "https://www.youtube.com/@GreatStonedDragon/videos"
full = False
output = 'report.xlsx'
temperature = 0.7

app = App(
    channel_url=channel_url,
    full=full,
    output=output,
    database=database,
    temperature=temperature
)

data = app.run()

Aguardando autorizacao...


127.0.0.1 - - [28/Jan/2025 22:30:42] "GET /callback?code=AQARZwfkHk4F5BG8bEAho7WoYuwBeKGS0_VJ2hnTbVrag-bk_9Fp3eLQNV7OCkxjcgVdLkB50lMOGwiUXMY3CMpJhsdqNXtsif-2hqaTbagrbyL3c_Ltk--JKpIVsCYTA_DLYxlYQ-eVaxO60XOMsAyutGcT4r_Bngu_kAzeWv1XnNycetNYLvE-B1CBc_dYu96Kc4_n480AptkelrYY3WG-ngADbLA5HOdRkzE8t4Ir91hspkeaoM0jMKhTPdrUgMT8HM8aYVCeedRQ_zvKkBGJEcu6WcM HTTP/1.1" 200 -
Extracting: 100%|█████████████████████████████████| 1/1 [00:00<00:00,  1.93it/s]
