# Import packages

In [1]:
import os
import pytubefix
import json
import pickle
import cv2
import time
import datetime
import shutil   # ONLY USED FOR CLEARING EXISTING FRAMES
from common_functionality import clear_folder
from collections import OrderedDict
import re
from translate import Translator
from googletrans import Translator as GoogleTranslator
from langdetect import detect
import spacy



# Interview Video Extraction

## Video website functions

In [2]:
def create_dir(dir_name):
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)

In [3]:
def create_json_file(file_path):
    with open(file_path, 'w') as f:
        f.write(json.dumps({}))

In [4]:
def init_processed_file(file_path):
    with open(file_path, 'w') as f:
        f.write(json.dumps({'processed_urls': []}))

In [5]:
def save_contents_pkl(file_path, data):
    with open(file_path, 'wb') as f:
        pickle.dump(data, f)

## Download functionality

In [6]:
def get_processed_videos(processed_file):
    with open(processed_file) as f:
        processed_data = json.load(f)

        if 'processed_urls' not in processed_data:
            init_processed_file(processed_file)

        return processed_data['processed_urls']

In [7]:
def set_processed_videos(url, processed_file, processed_file_pkl):
    with open(processed_file, 'r+') as f:
        processed_data = json.load(f)
        if 'processed_urls' not in processed_data.keys():
            processed_data['processed_urls'] = []
        
        if url not in processed_data['processed_urls']:
            processed_data['processed_urls'].append(url)
        f.seek(0)
        json.dump(processed_data, f, indent=4)
        save_contents_pkl(processed_file_pkl, processed_data)

In [8]:
def get_url_from_file(file_name):
    with open(file_name) as f:
        data = json.load(f)
        return data['episode_urls']

def get_processed_url_from_file(file_name):
    with open(file_name) as f:
        data = json.load(f)
        return data['processed_urls']

In [9]:
def get_url_from_file_pkl(file_name):
    with open(file_name) as f:
        data = pickle.load(file_name)
        return data['episode_urls']

def get_processed_url_from_file_pkl(file_name):
    with open(file_name) as f:
        data = pickle.load(file_name)
        return data['processed_urls']

In [10]:
def download_video_from_yt(videos_dir, url, processed_file):
    num_videos = len([f for f in os.listdir(videos_dir) if os.path.isfile(os.path.join(videos_dir, f))])

    finished_video_urls = get_processed_videos(processed_file)

    if url not in finished_video_urls:
        try:
            yt = pytubefix.YouTube(url)
        except:
            print("Connection error!")

        video = yt.streams.get_highest_resolution()

        try:
            print('Downloading video', video.title)
            video.download(videos_dir, f'video_{num_videos+1}.mp4')
            print(f"Youtube video \'{video.title}\' has been successfully downloaded!")
        except:
            print("Download error!")
    else:
        print(f"Video with following url is already in local storage in the directory \'{videos_dir}\': {url}")

## Downloading videos

In [11]:
def main(videos_dir, urls_file, processed_file, processed_file_pkl, data_dir):
    create_dir(videos_dir)
    create_dir(data_dir)

    url_path = data_dir+'/'+urls_file
    processed_path = data_dir+'/'+processed_file
    processed_path_pkl = data_dir+'/'+processed_file_pkl

    if not os.path.isfile(processed_path):
        create_json_file(processed_path)

    urls = get_url_from_file(url_path)

    print(urls)

    for index, url in enumerate(urls):
        download_video_from_yt(videos_dir, url, processed_path)
        set_processed_videos(url, processed_path, processed_path_pkl)

    print(f"Downloaded videos can be found in the diretory \"{videos_dir}\"")

In [12]:
urls_list = 'urls_list.json'
processed_urls_list = 'processed_urls_list.json'
processed_urls_list_pkl = 'processed_urls_list.pkl'
videos_dir = 'videos_db'
data_dir = 'data'

main(videos_dir, urls_list, processed_urls_list, processed_urls_list_pkl, data_dir)

['https://www.youtube.com/watch?v=qyxkN-xr7tw', 'https://www.youtube.com/watch?v=yKPHOhfMBpE', 'https://www.youtube.com/watch?v=g6_4k0_Dpho', 'https://www.youtube.com/watch?v=WvC4srvFIm4', 'https://www.youtube.com/watch?v=bpwP-m9x-U0', 'https://www.youtube.com/watch?v=0Agt9eA6KK8', 'https://www.youtube.com/watch?v=hGvouXV27MA', 'https://www.youtube.com/watch?v=WbJoyLEveIA', 'https://www.youtube.com/watch?v=VxGUnKG78Eg', 'https://www.youtube.com/watch?v=u3e2Ubtua98', 'https://www.youtube.com/watch?v=TqMcSBy6gaw', 'https://www.youtube.com/watch?v=KF60nsc9Stg', 'https://www.youtube.com/watch?v=GeTsMrMqmCY', 'https://www.youtube.com/watch?v=sd7igZiKJQM', 'https://www.youtube.com/watch?v=wZcCBdAWXgE', 'https://www.youtube.com/watch?v=z8e8StV8cNk', 'https://www.youtube.com/watch?v=_RKje9yVjy8', 'https://www.youtube.com/watch?v=Vg346770llI', 'https://www.youtube.com/watch?v=RQOx3OXTZVc', 'https://www.youtube.com/watch?v=l5arBrIVqYA', 'https://www.youtube.com/watch?v=DAvTAM1JIaA', 'https://www

In [13]:
#NOTE: ONLY TO BE USED TO PROCESS THE VIDEOS DONE BY MARK LAWRENCE
urls_list_ml = 'urls_list_ml.json'
processed_urls_list_ml = 'processed_urls_list_ml.json'
processed_urls_list_pkl_ml = 'processed_urls_list_ml.pkl'
ml_videos_dir = 'videos_db_ml'
data_dir = 'data'

main(ml_videos_dir, urls_list_ml, processed_urls_list_ml, processed_urls_list_pkl_ml, data_dir)

['https://www.youtube.com/watch?v=EJ1dlym0KI8', 'https://www.youtube.com/watch?v=NjwqPOgz3pc']
Video with following url is already in local storage in the directory 'videos_db_ml': https://www.youtube.com/watch?v=EJ1dlym0KI8
Video with following url is already in local storage in the directory 'videos_db_ml': https://www.youtube.com/watch?v=NjwqPOgz3pc
Downloaded videos can be found in the diretory "videos_db_ml"


In [14]:
def create_json_file(file_path):
    with open(file_path, 'w') as f:
        f.write(json.dumps({}))

In [15]:
def update_json_content(file_path, pic_dict):
    with open(file_path, 'r+') as f:
        processed_data = json.load(f)
        if 'frames_list' not in processed_data.keys():
            processed_data['frames_list'] = []
        
        processed_data['frames_list'].append(pic_dict)
        f.seek(0)
        json.dump(processed_data, f, indent=4)

In [16]:
def add_video_date(url, file_path):
    try:
        yt = pytubefix.YouTube(url)
    except:
        print("Connection error!")
    
    video_date = yt.publish_date.date()

    date_format = "%d-%m-%Y"

    video_date = video_date.strftime(date_format)

    print(str(video_date))
    
    with open(file_path, 'r+') as f:
        processed_data = json.load(f)
        #print(not bool(processed_data))
        if 'video_date' not in processed_data.keys():
            if bool(processed_data):
                processed_data_ordered = OrderedDict(processed_data)
                # Make a list of key-value pairs
                items = list(processed_data_ordered.items())

                # Find where 'frames_list' is
                index = next(i for i, (k, v) in enumerate(items) if k == "frames_list")

                # Insert new key-value pair before it
                items.insert(index, ('video_date', str(video_date)))

                # Rebuild the OrderedDict
                processed_data_ordered = OrderedDict(items)

                processed_data = dict(processed_data_ordered)
            else:
                processed_data['video_date'] = str(video_date)
        else:
            pass

        print(processed_data)
        
        f.seek(0)
        json.dump(processed_data, f, indent=4)
        f.truncate()  # This prevents leftover characters from previous write

In [17]:
videos_dir = 'videos_db'
imgs_dir = 'vid_frames'

data_folder = 'data'
video_data_folder = 'video_data'
full_vid_data_path = os.path.join(data_folder, video_data_folder)

if not os.path.exists(full_vid_data_path):
    os.makedirs(full_vid_data_path)

processed_path = os.path.join(data_dir, processed_urls_list)
urls = get_processed_url_from_file(processed_path)

for url_index, url in enumerate(urls):
    full_data_file_path = os.path.join(full_vid_data_path, f'video_{url_index+1}.json')
    if not os.path.exists(full_data_file_path):
        create_json_file(full_data_file_path)
    add_video_date(url, full_data_file_path)

19-10-2023
{'video_date': '19-10-2023'}
09-02-2023
{'video_date': '09-02-2023'}
02-02-2023
{'video_date': '02-02-2023'}
04-07-2024
{'video_date': '04-07-2024'}
25-04-2024
{'video_date': '25-04-2024'}
01-06-2023
{'video_date': '01-06-2023'}
12-01-2023
{'video_date': '12-01-2023'}
22-09-2022
{'video_date': '22-09-2022'}
08-12-2022
{'video_date': '08-12-2022'}
06-04-2023
{'video_date': '06-04-2023'}
29-08-2024
{'video_date': '29-08-2024'}
22-08-2024
{'video_date': '22-08-2024'}
16-05-2024
{'video_date': '16-05-2024'}
07-03-2024
{'video_date': '07-03-2024'}
08-02-2024
{'video_date': '08-02-2024'}
21-12-2023
{'video_date': '21-12-2023'}
09-11-2023
{'video_date': '09-11-2023'}
29-07-2021
{'video_date': '29-07-2021'}
07-11-2024
{'video_date': '07-11-2024'}
19-12-2024
{'video_date': '19-12-2024'}
21-11-2024
{'video_date': '21-11-2024'}
29-12-2024
{'video_date': '29-12-2024'}


In [32]:
#NOTE: ONLY TO BE USED TO PROCESS THE VIDEOS DONE BY MARK LAWRENCE
videos_dir = 'videos_db_ml'
imgs_dir = 'vid_frames_ml'

data_folder = 'data'
video_data_folder = 'video_data_ml'
full_vid_data_path = os.path.join(data_folder, video_data_folder)

if not os.path.exists(full_vid_data_path):
    os.mkdir(full_vid_data_path)

processed_path = os.path.join(data_dir, processed_urls_list_ml)
urls = get_processed_url_from_file(processed_path)

for url_index, url in enumerate(urls):
    full_data_file_path = os.path.join(full_vid_data_path, f'video_{url_index+1}.json')
    if not os.path.exists(full_data_file_path):
        create_json_file(full_data_file_path)
    add_video_date(url, full_data_file_path)

26-02-2025
{'video_date': '26-02-2025'}
12-03-2025
{'video_date': '12-03-2025'}


# Interview Name Extraction

In [19]:
# Load the spaCy English model
nlp = spacy.load('en_core_web_trf')

In [20]:
interviewee_data = 'interviewee_names_data.json'
interviewer_data = 'interviewer_names_data.json'
processed_data = {}
interviewees_by_video = {}
interviewers_by_video = {}

In [33]:
#NOTE: ONLY TO BE USED TO PROCESS THE VIDEOS DONE BY MARK LAWRENCE
interviewee_data = 'interviewee_names_data_ml.json'
interviewer_data = 'interviewer_names_data_ml.json'
processed_data = {}
interviewees_by_video = {}
interviewers_by_video = {}

In [21]:
if os.path.exists(data_folder):
    if os.path.exists(os.path.join(data_folder, processed_urls_list)):
        with open(os.path.join(data_folder, processed_urls_list), 'r') as f:
            processed_data = json.load(f)['processed_urls']

In [34]:
#NOTE: ONLY TO BE USED TO PROCESS THE VIDEOS DONE BY MARK LAWRENCE
if os.path.exists(data_folder):
    if os.path.exists(os.path.join(data_folder, processed_urls_list_ml)):
        with open(os.path.join(data_folder, processed_urls_list_ml), 'r') as f:
            processed_data = json.load(f)['processed_urls']

In [22]:
def detect_name_from_text(text):
    doc = nlp(text)
    detected_names = []

    for ent in doc.ents:
        if ent.label_ in ['PERSON', 'ORG']:  # Check for misclassified names
            # Check if it's a misclassified person
            #if ent.label_ == "ORG" and len(ent.text.split()) > 1:  
                #print(f"Correcting {ent.text} from ORG to PERSON")
                #detected_names.append(ent.text)
            if ent.label_ == "PERSON":
                detected_names.append(ent.text)

    if detected_names:
        interviewer = " ".join(detected_names)  # Handle partial name detection
        #persons_list.append(full_name)
        #interviewees_by_video[f'video_{index+1}'] = full_name
        print(interviewer, "- PERSON")
    else:
        print("No names have been detected from the video title!")
        return ""
    
    return interviewer

## Perform translation

In [23]:
# Initialise translators
translator = Translator(to_lang="en", from_lang="mt")
google_translator = GoogleTranslator()

In [24]:
# Method to check the language of the video title
def detect_title_language(text):
    try:
        detected_language = detect(text)
        return detected_language
    except Exception as e:
        print("An error occurred:", e)
        return None

In [25]:
# Translation using the translate package
def translate_text_to_english(text):
    text_en = translator.translate(text)
    return text_en

# Translation using the Google Translate API
async def translate_text_to_english_google(text):
    text_en = await google_translator.translate(text, dest="en")
    text_en = text_en.text
    return text_en

In [26]:
# Example first names and surnames lists
first_names = ["jon", "mark", "dylan", "chris", "ian", "michael", "matthew", "laurence", "joseph", "andrew"]
surnames = ["mallia", "zammit", "fearne", "borg", "schembri", "fenech", "galea", "camilleri"]

def smart_name_split(name_raw):
    name_raw = name_raw.lower()

    # Try all possible split points
    for i in range(2, len(name_raw) - 2):  # avoid splitting too early or too late
        first = name_raw[:i]
        last = name_raw[i:]
        if first in first_names and last in surnames:
            return f"{first.capitalize()} {last.capitalize()}"

    # fallback: just capitalize first letter
    return name_raw.capitalize()

def extract_interviewer(description):
    patterns = [
        r"interview with ([A-Z][a-z]+(?: [A-Z][a-z]+){0,2})(?: and|,|\.|$)",
        r"produced by ([A-Z][a-z]+(?: [A-Z][a-z]+){0,2})(?: and|,|\.|$)",
        r"hosted by ([A-Z][a-z]+(?: [A-Z][a-z]+){0,2})(?: and|,|\.|$)",
        r"presented by ([A-Z][a-z]+(?: [A-Z][a-z]+){0,2})(?: and|,|\.|$)",
        r"moderated by ([A-Z][a-z]+(?: [A-Z][a-z]+){0,2})(?: and|,|\.|$)",
    ]
    
    for pattern in patterns:
        match = re.search(pattern, description, re.IGNORECASE)
        if match:
            name = match.group(1).strip()
            return name

    # Fallback to website domain
    # website_match = re.search(r"https?://(?:www\.)?([a-z]+)\.mt", description)
    # if website_match:
    #     name_raw = website_match.group(1)
    #     return smart_name_split(name_raw)

    # Fallback to hashtag
    hashtag_match = re.search(r"#([a-z]+)", description)
    if hashtag_match:
        name_raw = hashtag_match.group(1)
        return smart_name_split(name_raw)

    return ''

In [35]:
phrases_to_remove_videos = [['Podcast'] for i in processed_data]
print(len(phrases_to_remove_videos))
words_to_ignore = ['Episode', 'with', 'Part']

interviewers_list = []
for index, url in enumerate(processed_data):
    yt = pytubefix.YouTube(url)
    print(f'Detected Author of video_{index+1}: {yt.author}')

    detected_language = detect_title_language(yt.author)
    video_author_en = ""
    video_description_en = ""

    # Translate title to English
    if detected_language != 'en':
        video_author_en = translate_text_to_english(yt.author)
        #video_title_en = GoogleTranslator(source='mt', target='en').translate(yt.title)
        print('Video Title Name translated to English:', video_author_en)
    else:
        video_author_en = yt.author

    # Process the text with spaCy
    detected_names = []

    detected_interviewer = detect_name_from_text(yt.author)
    if detected_interviewer == '':
        #print(yt.description)
        detected_desc_language = detect_title_language(yt.description)
        if detected_language != 'en':
            #video_description_en = translate_text_to_english(yt.description)
            video_description_en = translate_text_to_english_google(yt.description)
            print(video_description_en)
        else:
            video_description_en = yt.description
        detected_interviewer = extract_interviewer(video_description_en)

    print('No name' if detected_interviewer == '' else detected_interviewer)
    interviewers_list.append(detected_interviewer)
    interviewers_by_video[f'video_{index+1}'] = detected_interviewer
    phrases_to_remove_videos[index].extend(word for word in list(detected_interviewer.split()))
    
    print('------------------------------------------------------------')

2


Detected Author of video_1: Times of Malta
No names have been detected from the video title!
Mark Laurence Zammit
------------------------------------------------------------
Detected Author of video_2: Times of Malta
No names have been detected from the video title!
Mark Laurence Zammit
------------------------------------------------------------


In [36]:
persons_list = []
for index, url in enumerate(processed_data):
    yt = pytubefix.YouTube(url)
    print(f'Detected Title Name of video_{index+1}: {yt.title}')

    detected_language = detect_title_language(yt.title)
    video_title_en = ""

    # Translate title to English
    if detected_language != 'en':
        video_title_en = translator.translate(yt.title)
        #video_title_en = GoogleTranslator(source='mt', target='en').translate(yt.title)
        print('Video Title Name translated to English:', video_title_en)
    else:
        video_title_en = yt.title

    # Remove unwanted phrases
    title_words = video_title_en.split()
    for word in phrases_to_remove_videos[index]:
        if word != phrases_to_remove_videos[index][-1]:
            if word in title_words:
                title_words.remove(word)
        else:
            pass
            #if phrases_to_remove_videos[index][1] in title_words:
                #title_words.remove(word)
    
    #video_title_mod = ' '.join(title_words).replace("|", "").strip()
    video_title_mod = ' '.join(title_words).split('|')[0]
    print(video_title_mod)

    # Process the text with spaCy
    doc = nlp(video_title_mod)
    detected_names = []

    for ent in doc.ents:
        if ent.label_ in ['PERSON', 'ORG']:  # Check for misclassified names
            # Check if it's a misclassified person
            #if ent.label_ == "ORG" and len(ent.text.split()) > 1:  
                #print(f"Correcting {ent.text} from ORG to PERSON")
                #detected_names.append(ent.text)
            if ent.label_ == "PERSON":
                detected_names.append(ent.text)

    if detected_names:
        full_name = " ".join(detected_names)  # Handle partial name detection
        persons_list.append(full_name)
        interviewees_by_video[f'video_{index+1}'] = full_name
        print(full_name, "- PERSON")


    # ents = list(doc.ents)
    # for ent in ents:
    #     print(ent, "-", ent.label_)
    
    # for ent in doc.ents:
    #     if ent.label_ == 'PERSON' and ent.text not in words_to_ignore:
    #         person_name = ent.text
    #         persons_list.append(person_name)
    #         interviewees_by_video[f'video_{index+1}'] = person_name
    #         print(person_name, '-', ent.label_)
    
    print('------------------------------------------------------------')

Detected Title Name of video_1: Ian Borg, the man with many plans
Ian Borg, the man with many plans
Ian Borg - PERSON
------------------------------------------------------------
Detected Title Name of video_2: What a pandemic taught Chris Fearne
What a pandemic taught Chris Fearne
Chris Fearne - PERSON
------------------------------------------------------------


## Checking if all names have been extracted

In [37]:
print('Number of persons detected:',len(interviewers_list))

if len(interviewers_list) == len(interviewers_list):
    print('All possible names have been detected!')

print(interviewers_by_video)

print('-----------------------------------------------------------------')

print('Number of persons detected:',len(persons_list))

if len(persons_list) == len(processed_data):
    print('All possible names have been detected!')

print(interviewees_by_video)

Number of persons detected: 2
All possible names have been detected!
{'video_1': 'Mark Laurence Zammit', 'video_2': 'Mark Laurence Zammit'}
-----------------------------------------------------------------
Number of persons detected: 2
All possible names have been detected!
{'video_1': 'Ian Borg', 'video_2': 'Chris Fearne'}


## Saving names to data folder

In [38]:
with open(os.path.join(data_folder, interviewer_data), 'w') as file:
    json.dump(interviewers_by_video, file, indent=4)

In [39]:
with open(os.path.join(data_folder, interviewee_data), 'w') as file:
    json.dump(interviewees_by_video, file, indent=4)