In [None]:
import os
import pandas as pd
from tqdm.notebook import tqdm
import time
import random
import json
from pytube import YouTube
import requests

PATH_SPOTIFY_DATASET = "../dataset/Spotify/spotify_dataset_clean.csv"
PATH_YOUTUBE_DOWNLOAD = "../dataset/Dataset40k/youtube_data"
PATH_CHROME_DRIVER = 'chromedriver.exe'
for p in [PATH_SPOTIFY_DATASET, PATH_YOUTUBE_DOWNLOAD, PATH_CHROME_DRIVER]:
    assert os.path.exists(p), f"Received bad path ´{p}´"

PATH_SAVE_FINAL_CSV = "../dataset/Dataset40k/spotify_with_youtube.csv"
assert not os.path.exists(PATH_SAVE_FINAL_CSV), f"`{PATH_SAVE_FINAL_CSV}` already exists."

In [5]:
df = pd.read_csv(PATH_SPOTIFY_DATASET, dtype={'youtube_url': str, 'youtube_title': str})
df.head()

Unnamed: 0,track_id,artists,album_name,track_name,popularity,duration_ms,explicit,danceability,energy,key,...,instrumentalness,liveness,valence,tempo,time_signature,track_genre,youtube_search_query,original_index,youtube_url,youtube_title
0,3nqQXoyQOWXiESFLlDF1hG,"['Sam Smith', 'Kim Petras']",Unholy (feat. Kim Petras),Unholy (feat. Kim Petras),100,156943,False,0.714,0.472,2,...,5e-06,0.266,0.238,131.121,4,['dance'],"Unholy (feat. Kim Petras) - Sam Smith, Kim Pet...",20001,,
1,2tTmW7RDtMQtBk7m2rYeSw,"['Bizarrap', 'Quevedo']","Quevedo: Bzrp Music Sessions, Vol. 52","Quevedo: Bzrp Music Sessions, Vol. 52",99,198937,False,0.621,0.782,2,...,0.033,0.23,0.55,128.033,4,['hip-hop'],"Quevedo: Bzrp Music Sessions, Vol. 52 - Bizarr...",51664,,
2,4h9wh7iOZ0GGn8QVp4RAOB,['OneRepublic'],I Ain’t Worried (Music From The Motion Picture...,I Ain't Worried,96,148485,False,0.704,0.797,0,...,0.000745,0.0546,0.825,139.994,4,['piano'],"I Ain't Worried - OneRepublic, Official, music...",79000,,
3,4LRPiXqCikLlN15c3yImP7,['Harry Styles'],As It Was,As It Was,95,167303,False,0.52,0.731,6,...,0.00101,0.311,0.662,173.93,4,['pop'],"As It Was - Harry Styles, Official, music video",81052,,
4,6xGruZOHLs39ZbVccQTuPZ,['Joji'],Glimpse of Us,Glimpse of Us,94,233456,False,0.44,0.317,8,...,5e-06,0.141,0.268,169.914,3,['pop'],"Glimpse of Us - Joji, Official, music video",81102,,


# Setup webdriver for download

In [4]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options

# Setup Chrome options to make it less detectable
chrome_options = Options()
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)

# Initialize Chrome WebDriver
webdriver_service = Service('chromedriver.exe')  # Specify the correct path to the chromedriver
driver = webdriver.Chrome(service=webdriver_service, options=chrome_options)

# Scrape YouTube URLs and titles matching Spotify data via pre-made search queries

In [None]:
def human_like_delay(min_delay, max_delay):
    time.sleep(random.uniform(min_delay, max_delay))
failed = {}

# Download loop with error handling and delays
for i, row in tqdm(df.iterrows(), total=len(df)):
    url_is_defined = not pd.isna(row["youtube_url"])
    title_is_defined = not pd.isna(row["youtube_title"])

    if url_is_defined or title_is_defined:
        assert title_is_defined and title_is_defined, "If one is defined, so should the other!"
        continue

    try:
        query = row['youtube_search_query']
        url = f"https://www.youtube.com/results?search_query={query.replace(' ', '+')}"

        # Add a human-like delay before each request to avoid being rate-limited
        human_like_delay(0.2, 0.4)
        driver.get(url)

        # Add a random delay to simulate human scrolling and viewing behavior
        human_like_delay(0.5, 1.0)
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

        video = driver.find_element(By.XPATH, '//a[@id="video-title"]')
        youtube_title = video.get_attribute("title")
        youtube_url = video.get_attribute("href")

        # Store the results in the dataframe
        df.loc[i, "youtube_url"] = youtube_url
        df.loc[i, "youtube_title"] = youtube_title

    # Log failed attempts with the exception message
    except Exception as e:
        print(i, end=", ")
        df.loc[i, "youtube_url"] =   "**FAILED**"
        df.loc[i, "youtube_title"] = "**FAILED**"
        failed[i] = str(e)

    # Save progress periodically
    if (i % 500 == 0) and (i != 0):
        df.to_csv(f"BACKUP_{i}.csv", index=False)

    # Delay between actions to avoid being rate-limited or flagged as a bot
    human_like_delay(0.2, 1.0)
driver.quit()

# Remove failed
assert df["track_id"].unique().shape[0] == df.shape[0], "The track_id must be unique for saving the files"
df = df[df["youtube_url"].str.contains("youtube") & (df["youtube_url"] != "**FAILED**")]

# Download YouTube audio, metadata, and thumbnail for matched Spotify tracks

In [None]:
failed = {}
for i, row in tqdm(df.iterrows(), total=len(df)):
    # Setup
    url = row["youtube_url"]
    track_id = row["track_id"]
    audio_path = f"{PATH_YOUTUBE_DOWNLOAD}/{track_id}.mp3"  
    meta_data_path = f"{PATH_YOUTUBE_DOWNLOAD}/{track_id}.json"  
    if row["youtube_video_skipped"]:
        continue    
    elif os.path.exists(audio_path) or os.path.exists(meta_data_path): # TODO: shouldn't  or --> and?
        continue
    elif (i%100 == 0) and (i != 0):
        human_like_delay(30, 60)
    else:
        human_like_delay(1, 2)
    try:
        try:
            yt = YouTube(url)
            video_length_minutes = yt.length / 60  
            is_length_outside_range = (video_length_minutes < 1) or (video_length_minutes > 10)
            has_low_views = yt.views < 100_000
            is_age_restricted = yt.age_restricted
            if is_length_outside_range or has_low_views or is_age_restricted:
                print(f"{i}c", end=", ")
                df.loc[i, 'youtube_video_skipped'] = True
                continue
        except Exception as e:
            if str(e) == "TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'":
                print(f"{i}t", end=", ")
                human_like_delay(10, 20)
                df.loc[i, 'youtube_video_skipped'] = True
                continue
    
        # Download audio
        audio_stream = yt.streams.filter(only_audio=True).first()
        audio_stream.download(output_path=PATH_YOUTUBE_DOWNLOAD, filename=f"{track_id}.mp3")  
        assert os.path.exists(audio_path), "Audio download failed"

        # Metadata
        metadata = {
            "title": yt.title,
            "author": yt.author,
            "publish_date": yt.publish_date.strftime('%Y-%m-%d') if (yt.publish_date is not None) else "NaN",
            "description": yt.description,
            "length_ms": yt.length*1000,
            "views": yt.views,
            "thumbnail_url": yt.thumbnail_url,
            "keywords": yt.keywords,
            "rating": yt.rating,
            "video_id": yt.video_id,
            "spotify_id": track_id,
            "channel_id": yt.channel_id,
            "channel_url": yt.channel_url,
            "is_age_restricted": yt.age_restricted,
            "captions_available": bool(yt.captions),
            "captions_english": []  # Default to empty list
        }
    
        # Check for English captions
        english_captions = [caption for caption in yt.captions if caption.code.lower().startswith("en.")]
        if english_captions:
            caption = english_captions[0]
            metadata["captions_english"] = caption.json_captions["events"]
    
        # Save metadata
        with open(meta_data_path, 'w') as f:
            json.dump(metadata, f)  
        assert os.path.exists(meta_data_path), "Metadata saving failed!"

        # Download thumbnail
        response = requests.get(yt.thumbnail_url)
        thumbnail_path = f"{PATH_YOUTUBE_DOWNLOAD}/{track_id}_thumbnail.jpg"
        if response.status_code == 200:
            with open(thumbnail_path, 'wb') as f:
                f.write(response.content)
            assert os.path.exists(thumbnail_path), "Thumbnail download failed!"

    except Exception as e:
        print(f"{i}f", end=", ")
        failed[i] = str(e)
        
df.to_csv(PATH_SAVE_FINAL_CSV, index=False)