# YouTube Playlist Extractor

A lot of threads on mafiascum.net contain links to YouTube videos distributed across multiple posts. Particularly when they share the same purpose or theme, it can be useful to extract these links and compile them into a single playlist for easy viewing or listening.

This example demonstrates how to use functions from the [donbot](https://github.com/Computational-Mafia/donbot) module to extract YouTube video links across a thread and compile them into a playlist. By changing the thread URL parameter and maybe adding your account credentials, you can use this example to extract a playlist from any thread on the forum.

**The implementation here has so far been validated to work for threads with less than 50 songs.** It won't work for threads that require user privileges to access.

In [3]:
# @markdown ## Provide a thread url and playlist type. Press the play button the left to generate!
thread = "https://forum.mafiascum.net/viewtopic.php?t=92087"  # @param {type:"string"}
playlist_type = "music"  # @param ["music", "regular videos"]

import requests
from lxml import html
from lxml.html import HtmlElement
from math import floor


def count_posts(thread_html: HtmlElement) -> int:
    """
    Counts the number of posts in the specified thread.

    Parameters
    ----------
    thread_html : HtmlElement
        The HTML of a page from the thread to count posts in.

    Returns
    -------
    int
        The number of posts in the specified thread.
    """
    post_count_path = "//div[@class='pagination']/text()"
    post_count_element = next(
        el for el in thread_html.xpath(post_count_path) if el.strip()
    )
    return int("".join([c for c in post_count_element if c.isdigit()]))


def get_post(post_html: HtmlElement) -> dict:  # sourcery skip: merge-dict-assign
    """
    Extracts the data of a post from the post HTML.

    Parameters
    ----------
    post_html : HtmlElement
        The HTML of a post.

    Returns
    -------
    dict
        The post's data, including post `id`, `number`, `user, `time`, and `content`.
    """
    post_number_path = ".//span[@class='post-number-bolded']//text()"
    post_user_path = ".//a[@class='username' or @class='username-coloured']/text()"
    post_user_id_path = ".//a[@class='username' or @class='username-coloured']/@href"
    post_content_path = ".//div[@class='content']"
    post_timestamp_path = ".//p[@class='author modified']/text()"
    post_id_path = ".//a/@href"

    post = {}
    post["number"] = int(post_html.xpath(post_number_path)[0][1:])
    post["id"] = post_html.xpath(post_id_path)[0]
    post["id"] = post["id"][post["id"].rfind("#") + 2 :]
    post["user"] = post_html.xpath(post_user_path)[0]
    post["user_id"] = post_html.xpath(post_user_id_path)[0]
    post["user_id"] = post["user_id"][post["user_id"].rfind("=") + 1 :]
    post["content"] = html.tostring(post_html.xpath(post_content_path)[0])
    post["content"] = post["content"].decode("UTF-8").strip()[21:-6]
    post["time"] = post_html.xpath(post_timestamp_path)[-1]
    post["time"] = post["time"][post["time"].find("» ") + 2 :].strip()
    return post


def get_posts(
    thread_page_html: HtmlElement, start: int = 0, end: int | float = -1
) -> list[dict]:
    """
    Retrieve posts from a thread.

    Parameters
    ----------
    thread_page_html : HtmlElement
        The HTML of a page from the thread to retrieve posts from.
    start : int
        Lowest post number to retrieve.
    end : int, optional
        Highest post number to retrieve.

    Returns
    -------
    list[dict]
        Each post's data, including post `id`, `number`, `user, `time`, and `content`.
    """
    posts = []
    end = end if end != -1 else float("inf")
    for raw_post in thread_page_html.xpath("//div[@class='postbody']"):
        post = get_post(raw_post)
        if post["number"] >= start and post["number"] <= end:
            posts.append(post)
    return posts


def get_thread_page_urls(
    thread: str, thread_page_html: HtmlElement, start: int = 0, end: int = -1
) -> list[str]:
    """
    Get the URLs of the pages of a thread.

    Parameters
    ----------
    thread : str
        The URL of the thread.
    thread_page_html : HtmlElement
        The HTML of a page from the thread.
    end : int
        The number of pages to retrieve.

    Returns
    -------
    list[str]
        The URLs of the pages of the thread.
    """
    end = end if end != -1 else count_posts(thread_page_html)

    posts_per_page = 25
    start_page_id = floor(start / posts_per_page) * posts_per_page
    end_page_id = floor(end / posts_per_page) * posts_per_page

    return [
        f"{thread}&start={str(page_id)}"
        for page_id in range(start_page_id, end_page_id + 1, posts_per_page)
    ]


def extract_youtube_links(post_content: str):
    "Return all links to youtube videos in the post content"

    clean_youtube_links = []
    for link_path in ["//iframe/@src", "//a/@href"]:
        for link in html.fromstring(post_content).xpath(link_path):
            if "youtube" not in link and "youtu.be" not in link:
                continue
            video_id = link.split("/")[-1]
            clean_youtube_links.append(f"https://www.youtube.com/watch?v={video_id}")
    return clean_youtube_links


def create_playlist_url(video_links: list[str]):
    "Return a youtube playlist url from a list of video links"

    video_ids = [link.split("v=")[1] for link in video_links]
    return f"http://www.youtube.com/watch_videos?video_ids={','.join(video_ids)}"


if __name__ == "__main__":
    session = requests.Session()
    thread_html = html.fromstring(session.get(thread).content)
    thread_urls = get_thread_page_urls(thread, thread_html)

    posts = []
    for thread_url in thread_urls:
        thread_page_html = html.fromstring(session.get(thread_url).content)
        posts.extend(get_posts(thread_page_html))
    
    assert(len(posts) >= 144)
    first_user_contents = [post["content"] for post in posts if post["user"] == posts[0]["user"]]
    youtube_links = sum(
        (extract_youtube_links(content) for content in first_user_contents), []
    )
    playlist_url = create_playlist_url(youtube_links)
    final_playlist_url = session.get(playlist_url).url

    print('Playlist URL:')
    if playlist_type == 'music':
        print(final_playlist_url.replace('www', 'music'))
    else:
        print(final_playlist_url)

Playlist URL:
http://www.youtube.com/watch_videos?video_ids=E0jrUBRlAvA,z87nhzCdYEU,0QaefDV7cb8,QApcyPKEwXI,D9o_XTzYoa8,Epj5A84mDp0,LILIDv0JzEM,9b25Balcsj4,seok6lO1n-8,bLaSOG3Vjbo,qlcfgoTRtUQ,RZxK1kDO7K8,b_CpWmkhwq0,3WpdCZC9q6w,E91pJYO_s7I,PEeRO4k40pM,zdU0qwZKLfU,3zb9X_unuqo,Qss4rfv7n88,n87C0iUyrCU,0ckoUgA7UZQ,iKBCVZqqooY,PVVK7HkdW9k,ekU1dQjMsOQ,NRiOvSqn8aw,xFlULnKuxtg,FnjjbqMjVe4,F9oCB6Rsnqg,fj1RPTj-188,tIxLU8WUK1Y,UtfkrGRK8wA,VYW4F5q7XBE,ds18Ozzp8h0,VC_jqzx4QXc,tGVRsIDNuKU,h2kUX_Fmj7k,3uZ9i4QYRLo,OMFAhvcPLrU,rTcF-tZwlXI,ji6wHH0Gf9I,VS6ixn2berk,E0jrUBRlAvA,z87nhzCdYEU,0QaefDV7cb8,QApcyPKEwXI,D9o_XTzYoa8,Epj5A84mDp0,LILIDv0JzEM,9b25Balcsj4,seok6lO1n-8,bLaSOG3Vjbo,qlcfgoTRtUQ,RZxK1kDO7K8,b_CpWmkhwq0,3WpdCZC9q6w,E91pJYO_s7I,PEeRO4k40pM,zdU0qwZKLfU,3zb9X_unuqo,Qss4rfv7n88,n87C0iUyrCU,0ckoUgA7UZQ,iKBCVZqqooY,PVVK7HkdW9k,ekU1dQjMsOQ,NRiOvSqn8aw,xFlULnKuxtg,FnjjbqMjVe4,F9oCB6Rsnqg,fj1RPTj-188,tIxLU8WUK1Y,UtfkrGRK8wA,VYW4F5q7XBE,ds18Ozzp8h0,VC_jqzx4QXc,tGVRsIDNuKU,h2kUX_Fmj7k,3uZ9i4QYRLo,OMFA