In [None]:
import os
import glob
import shutil
from pathlib import Path
from tqdm import tqdm

import pandas as pd
from PIL import Image

import pytesseract
from fuzzysearch import find_near_matches


In [None]:
''' ==================================================================================================
    Move only slide/long screenshots
    ==================================================================================================
'''

# move only slide/long screenshots
potential_img = 'potential'
img_dir = 'imgs'
for img_path in glob.glob(os.path.join(potential_img, "*.jpg")):
    img = Image.open(img_path)
    w, h = img.size
    img.close()
    max_aspect_ratio = 21/9 
    ascpect_ratio = h / w
    if ascpect_ratio > max_aspect_ratio:
        print(f'screenshot: {img_path}')
        shutil.move(img_path, os.path.join(img_dir, os.path.basename(img_path)))

In [None]:
''' ==================================================================================================
    OCR all screenshots in imgs directory and save them as txt files in strs directory
    ==================================================================================================
'''

pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'

def load_screenshot(img_path: os.PathLike) -> str:
        img = Image.open(img_path)
        try:
            ocr_text = pytesseract.image_to_string(img, lang='eng+pol')
        except pytesseract.TesseractError as e:
            # Cut the image in half
            width, height  = img.size
            img_top = img.crop((0,0, width, height//2))
            img_down = img.crop((0, height//2, width, height))
            ocr_text = pytesseract.image_to_string(img_top, lang='eng+pol')
            ocr_text += pytesseract.image_to_string(img_down, lang='eng+pol')
        return ocr_text

img_dir = 'imgs'
str_dir = 'strs'
for img_path in tqdm(glob.glob(os.path.join(img_dir, "*.jpg"))):
    if os.path.exists(os.path.join(str_dir, Path(img_path).stem + ".txt")):
        continue
    
    ocr_str = load_screenshot(img_path)
    str_path = os.path.join(str_dir, Path(img_path).stem + ".txt")
    with open(str_path, "w", encoding='UTF-8') as file:
        file.write(ocr_str)

In [None]:
UNACCESSIBLE_STATUS = [" is a private video", " is unavailable"]
CANT_BACKUP_STATUS = ["Remote end closed connection without response", "IncompleteRead", 'streamingData', " is age restricted, and can't be accessed without logging in.", " The read operation timed out", " HTTP Error 403: Forbidden", " The read operation timed out"] #, " is age restricted, and can't be accessed without logging in.", 
DOWNLOADED_STATUS = ["Downloaded"]
TITLE_DELETED_STATUS =  ["Deleted video", "Private video", "This video is unavailable."]

In [None]:
MANIFEST_PATH = r'C:\Users\lukas\Desktop\youtube-autobackup\data\manifest.h5'
df_output = pd.read_hdf(MANIFEST_PATH, key="df")
df_output['second_id'] = ''
df_output.to_hdf(os.path.join(os.path.dirname(MANIFEST_PATH), 'recreated.h5'), key="df", mode='w')
MANIFEST_PATH = r'C:\Users\lukas\Desktop\youtube-autobackup\data\recreated.h5'

In [None]:
import json
HISTORY_PATH = r'C:\Users\lukas\Desktop\youtube-autobackup\data\history.json'
TITLE_DELETED_STATUS =  ["Deleted video", "Private video", "This video is unavailable."]

with open(HISTORY_PATH, 'r', encoding='UTF-8') as file:
    history = json.load(file)

history_df = pd.json_normalize(history)
history_df = history_df.dropna(subset=['titleUrl'])
history_df['video_id'] = history_df['titleUrl'].apply(lambda x: x.split('?v=')[1])
history_df['title'] = history_df['title'].apply(lambda x: x.replace('Obejrzano: ', ''))
history_df = history_df[~history_df['title'].str.contains('https://www.youtube.com/watch')]
history_df = history_df[['video_id', 'title']]
history_df = history_df.drop_duplicates(subset=['video_id'])
history_df = history_df.set_index('video_id')

df_output = pd.read_hdf(MANIFEST_PATH, key="df")
df_output = df_output.set_index('contentDetails.videoId')
df_output = df_output.join(history_df, how='left', rsuffix='_history')
df_output['snippet.title'] = df_output[['snippet.title', 'title']].apply(lambda x: x['title'] if isinstance(x['snippet.title'], str) and any([keyword in x['snippet.title'] for keyword in TITLE_DELETED_STATUS]) else x['snippet.title'], axis=1)
# df_output['truetable'] = df_output[['snippet.title', 'title']].apply(lambda x: True if any([keyword in x['snippet.title'] for keyword in TITLE_DELETED_STATUS]) else False, axis=1)
# df_output = df_output[df_output['truetable']]
df_output = df_output.drop(columns=['title'])
df_output = df_output.reset_index()
df_output.to_hdf(MANIFEST_PATH, key="df", mode='w')
df_output

In [None]:
df_output = pd.read_hdf(MANIFEST_PATH, key="df")
df_output['snippet.title'] = df_output['snippet.title'].fillna('')
df_output.to_hdf(MANIFEST_PATH, key="df", mode='w')

In [None]:
import os
import glob
import pandas as pd
import numpy as np
import warnings

warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)
TITLE_DELETED_STATUS =  ["Deleted video", "Private video", "This video is unavailable."]
OCR_PATH = r'C:\Users\lukas\Desktop\youtube-autobackup\scripts\strs\*.txt'
MANIFEST_PATH = r'C:\Users\lukas\Desktop\youtube-autobackup\data\recreated.h5'


def clip(num, _min, _max):
    if num > _max:
        return _max
    if num < _min:
        return min
    return num

def get_context(idx, df, pos):
    found_idx = None
    while found_idx is None:
        if pos == 'after':
            idx += 1
        elif pos == 'before':
            idx -= 1
        
        if idx > len(df) or idx < 0:
            found_idx = clip(idx, 0, len(df))
        elif not any([key_word in df.loc[idx]["backup_status"] for key_word in UNACCESSIBLE_STATUS] + [df.loc[idx]["snippet.videoOwnerChannelId"] == os.getenv("USER_ID")]):
            found_idx = idx
        
    return df.loc[found_idx]

UNACCESSIBLE_STATUS = [" is a private video", " is unavailable"]

loaded_h5 = pd.read_hdf(MANIFEST_PATH, key="df")
playlists_ids = loaded_h5['playlist_id'].unique()
playlists_ids = ['PLomXEcQ9kTsF_XwsVGjICmnpm7ctX3hXH']

for playlists_id in playlists_ids:
    playlists_id = playlists_id
    break

def deleted_song_context_gen(loaded_h5, playlists_id):
    loaded_h5 = loaded_h5.reset_index(drop=True)

    subset_df = loaded_h5.copy()
    subset_df = loaded_h5[loaded_h5['playlist_id'] == playlists_id]
    subset_df = subset_df.sort_values(by='snippet.position')
    subset_df = subset_df.reset_index(drop=True)
    subset_df.to_csv('output.csv')

    for idx, playlist_row in subset_df.iterrows():
        if not any([key_word in playlist_row["backup_status"] for key_word in UNACCESSIBLE_STATUS] + [playlist_row["snippet.videoOwnerChannelId"] == os.getenv("USER_ID")]):
            continue
        
        print('=================================')
        print(playlist_row['snippet.title'])
        title_exist = False if any([keyword in playlist_row['snippet.title'] for keyword in TITLE_DELETED_STATUS]) or playlist_row['snippet.title']=="" else True
# isinstance(playlist_row['snippet.title'], float) or 
        if title_exist:
            print(f"{title_exist} - {idx}: ({playlist_row['snippet.position']}) {playlist_row['snippet.resourceId.videoId']} | {playlist_row['snippet.title']} | {playlist_row['snippet.videoOwnerChannelTitle']} ({playlist_row['backup_status']})")
            yield None, playlist_row, None
        else:
            context_before = get_context(idx, subset_df, pos='before')
            context_after = get_context(idx, subset_df, pos='after')

            print(f'''{title_exist} - {idx - 1}: ({context_before['snippet.position']}) {context_before['snippet.resourceId.videoId']} | {context_before['snippet.title']} | {context_before['snippet.videoOwnerChannelTitle']} ({context_before['backup_status']})
    {title_exist} - {idx}: ({playlist_row['snippet.position']}) {playlist_row['snippet.resourceId.videoId']} | {playlist_row['snippet.title']} | {playlist_row['snippet.videoOwnerChannelTitle']} ({playlist_row['backup_status']})
    {title_exist} - {idx + 1}: ({context_after['snippet.position']}) {context_after['snippet.resourceId.videoId']} | {context_after['snippet.title']} | {context_after['snippet.videoOwnerChannelTitle']} ({context_after['backup_status']})''')
            yield context_before, playlist_row, context_after

CONTEXT_LEN = 20
from fuzzysearch import find_near_matches

def parse_ocr(ocr: str, start_str: str, end_str: str):
    # start_idx = ocr.find(start_str)
    start_idx = find_near_matches(start_str, ocr, max_l_dist=1)
    if start_idx == []:
        return None
    start_idx = start_idx[0]
    start_idx = start_idx.start
    start_idx = 0 if start_idx - CONTEXT_LEN < 0 else start_idx - CONTEXT_LEN
    # end_idx = ocr.find(end_str)
    end_idx = find_near_matches(end_str, ocr, max_l_dist=1)
    # print(end_idx, flush=True)
    if end_idx == []:
        end_idx = len(ocr)
    else:
        end_idx = end_idx[0]
        end_idx = end_idx.start
    end_idx = len(ocr) if end_idx + CONTEXT_LEN >= len(ocr) else end_idx + CONTEXT_LEN
    
    return ocr[start_idx:end_idx]

def get_id_input(df, _id):
    df_subset = df[df['snippet.resourceId.videoId'] == _id]

    if df_subset.iloc[0]['second_id'] != "":
        return df

    input_id = input('ID: ')
    if input_id.lower() == "exit":
        raise KeyboardInterrupt
    elif input_id:
        df.loc[df['snippet.resourceId.videoId'] == _id,'second_id'] = input_id
    df.to_hdf(MANIFEST_PATH, key="df")
    return df

deleted_context = deleted_song_context_gen(loaded_h5=loaded_h5, playlists_id=playlists_id)
for context_before, playlist_row, context_after in deleted_context:
    print('', flush=True)
    if context_after is None and context_before is None:
        # continue
        try:
            loaded_h5 = get_id_input(loaded_h5, playlist_row['snippet.resourceId.videoId'])
        except KeyboardInterrupt:
            loaded_h5.to_hdf(MANIFEST_PATH, key="df")
            raise
        continue
    
    for txt_path in glob.glob(OCR_PATH):
        with open(txt_path, 'r', encoding='UTF-8') as file:
            txt_ocr = file.read()
        text = parse_ocr(txt_ocr, context_before['snippet.title'], context_after['snippet.title'])
        if text is not None:
            print(text, flush=True)
            print('--------------------------------------------', flush=True)
            try:
                loaded_h5 = get_id_input(loaded_h5, playlist_row['snippet.resourceId.videoId'])
            except KeyboardInterrupt:
                loaded_h5.to_hdf(MANIFEST_PATH, key="df")
                raise
        


In [None]:
# MANIFEST_PATH = r'C:\Users\lukas\Desktop\youtube-autobackup\data\recreated_sinus.h5'
df = pd.read_hdf(MANIFEST_PATH, key="df")
df = df[df['second_id'] != '']
df

In [None]:
'''
========================================
    Re-enter deleted playlists
========================================
'''

import os
import glob
import pandas as pd
from youtube_manager import YoutubeManager

RECREATED_PATH = r"C:\Users\lukas\Desktop\youtube-autobackup\data\recreated_*.h5"
recreated_data = glob.glob(RECREATED_PATH)

recreated_data = pd.concat([pd.read_hdf(file, key="df") for file in recreated_data])
recreated_data = recreated_data[(recreated_data["second_id"] != "") & (recreated_data["second_id"] != "NULL")]
print(len(recreated_data), "==", len(recreated_data.drop_duplicates(subset=["second_id", "contentDetails.videoId"])))


yt_manager = YoutubeManager(token_path=r'C:\Users\lukas\Desktop\youtube-autobackup\data\token.json')
all_videos = {}
all_playlists = yt_manager.playlist_list(os.getenv("USER_ID"))

for playlist in all_playlists:
    playlist_id = playlist["id"]
    playlist_vids = yt_manager.playlist_elements(playlist_id)
    all_videos[playlist_id] = playlist_vids

df = pd.DataFrame()
for playlist_id, playlist_vids in all_videos.items():
    batch = pd.json_normalize(playlist_vids)
    batch["playlist_id"] = playlist_id
    df = pd.concat([df, batch])

join_df = recreated_data.join(df.set_index("contentDetails.videoId"), on="contentDetails.videoId", rsuffix="_fetched")
join_df = join_df[["contentDetails.videoId", "second_id", "id_fetched", "snippet.position_fetched", "playlist_id_fetched"]]
join_df = join_df.dropna(subset=["snippet.position_fetched"])

added_id = ['vb9hgfngIbk']

for index, row in join_df.iterrows():
    if row['second_id'] in added_id:
        continue
    print(f"Adding {row['second_id']} for {row['contentDetails.videoId']} at position {row['snippet.position_fetched']} for playlist {row['playlist_id_fetched']}")
    yt_manager.playlistItems_insert(
        row["playlist_id_fetched"],
        row["second_id"],
        position=row["snippet.position_fetched"],
    )
    yt_manager.playlistItems_delete(row["id_fetched"])
    added_id.append(row["second_id"])