In [None]:
%load_ext sql
%sql sqlite:///testdata/test.db

In [None]:

import sqlite3
import pandas as pd
import logging
from bs4 import BeautifulSoup
logger = logging.getLogger()
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
)
connection = sqlite3.connect("testdata/test.db")
cursor = connection.cursor()

In [None]:
from transformations.utils.io import extract_specific_files_flat, get_latest_valid_zip
latest_valid_youtube_zip = get_latest_valid_zip(
    "../data/bronze/landing/google",
    "*.zip",
    expected_file_path="Takeout/YouTube and YouTube Music/history",
)

if latest_valid_youtube_zip is None:
    raise ValueError("No valid zip found in folder which contains youtube data!")
output_folder = "testdata/youtube"
extract_specific_files_flat(
    latest_valid_youtube_zip,
    prefix="Takeout/YouTube and YouTube Music/history/watch-history.html",
    output_path=output_folder,
)
html_file_path = output_folder + "/watch-history.html"

In [None]:
def extract_youtube_watch_history_raw(html_path: str) -> pd.DataFrame:
    logger.info(
        "Finding all elements in watch history html file which contain watch history"
        " info."
    )
    with open(html_path, "r", encoding="utf-8") as f:
        soup = BeautifulSoup(f, "lxml")
    
    content_cell_list = soup.find_all(
        name='div', 
        attrs={
            "class": 'content-cell mdl-cell mdl-cell--6-col mdl-typography--body-1'
        }
    )
    logger.info(f"Found {len(content_cell_list)} elements to extract ihstory from.")
    errored_cells = []
    watch_history_list: list[dict] = []
    for index, content_cell in enumerate(content_cell_list):
        try:
            row = {}
            # Extract video link and name
            video_tag = content_cell.find('a', href=True)
            if video_tag is None:
                raise Exception("Video tag not found")
            row["video_name"] = video_tag.text.strip()
            row["video_url"] = video_tag['href'].strip()

            # Extract channel link and name (next <a> tag)
            channel_tag = video_tag.find_next('a', href=True)
            if channel_tag is None:
                raise Exception("Channel tag not found")
            row["channel_name"] = channel_tag.text.strip()
            row["channel_url"] = channel_tag['href'].strip()

            # Extract the date (text following the channel link)
            date_tag = channel_tag.next_sibling.next_sibling
            if date_tag is None:
                raise Exception("Date tag not found")
            row["date"] = date_tag.strip()
            watch_history_list.append(row)
        except Exception:
            errored_cells.append(content_cell)
    
    logger.info("finished extracting all watch history from html.")
    total_elements = len(content_cell_list)
    successful_elements = total_elements - len(errored_cells)
    logger.info(f"Extracted {successful_elements}/{total_elements}")
    return pd.DataFrame(watch_history_list)

    
df_raw = extract_youtube_watch_history_raw(html_file_path)
df_raw


In [None]:
df_raw.to_sql("youtube_watch_history_raw", con=connection, if_exists="replace")

In [None]:
%%sql
select * from youtube_watch_history_raw
limit 10;

In [None]:
def prepare_df(df: pd.DataFrame) -> pd.DataFrame:
    df["datetime_str"] = df["date"].str.replace("\u202F", " ").str.replace("\u00A0", " ")
    df = df.drop(columns=["date"])
    return df

def parse_youtube_date_str(df: pd.DataFrame) -> pd.DataFrame:
    # fix in future convert to utc
    df["datetime_str"] = df["datetime_str"].str.removesuffix(" BST")
    df["datetime"] = pd.to_datetime(df["datetime_str"], format="%b %d, %Y, %I:%M:%S %p")
    df = df.drop(columns=["datetime_str"])
    return df

def determine_video_type(df: pd.DataFrame) -> pd.DataFrame:
    df = df.sort_values(by=["datetime"]).reindex()
    # Filter only watchable content
    df = df[df["video_url"].str.contains("watch")]
    
    df["next_vid_datetime"] = df["datetime"].shift(-1)

    df["time_to_next_video"] = (
        (df["next_vid_datetime"] - df["datetime"])
    ).dt.total_seconds()
    # last element does not have next_vid_datetime
    df["time_to_next_video"] = df["time_to_next_video"].fillna(-1)
    # Filter vids not watched
    df = df[(df["time_to_next_video"] > 5) | (df["time_to_next_video"] == -1)]
    df.loc[df["time_to_next_video"] > 90, "video_type"] = "likely video"
    df.loc[df["time_to_next_video"] <= 90, "video_type"] = "likely short"
    df.loc[df["time_to_next_video"] == -1, "video_type"] = "unknown"
    df.loc[df["time_to_next_video"] > 3 * 60 * 60, "video_type"] = "unknown"
    df.loc[df["video_name"].str.contains("#"), "video_type"] = "likely short"
    # df.loc[df["time_to_next_video"]]
    return df

# I JUST REALISED HOW MUCH I HATE PANDAS 


df = pd.read_sql_query("SELECT * from youtube_watch_history_raw", connection)
df = prepare_df(df)
df = parse_youtube_date_str(df)
df = determine_video_type(df)
df.to_sql(
    "youtube_watch_history", 
    con= connection, 
    if_exists="replace",
    index=False,
)
df.tail()

In [None]:
%%sql

select channel_name, count(channel_name) from youtube_watch_history
where video_type = "likely short"
group by channel_name
order by count(channel_name) desc
limit 30;

In [None]:
%%sql

select channel_name, video_name, video_url, video_type, time_to_next_video  from youtube_watch_history
where time_to_next_video > 1000
order by time_to_next_video desc
;