In [1]:
import pandas as pd
import io
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn import svm
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
from scipy.sparse import hstack
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:

from google.colab import files
uploaded = files.upload()

def train_model():

    #df = pd.read_csv('latest_videos.csv')
    df = pd.read_csv(io.BytesIO(uploaded['latest_videos.csv']))
    df['text'] = df['Title'] + ' ' + df['Description']
    df = df.dropna(subset=['text'])

    X_train, X_test, y_train, y_test = train_test_split(df['text'], df['Political Affiliation'], test_size=0.2, random_state=42, stratify=df['Political Affiliation'])
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42, stratify=y_train)

    vectorizer = TfidfVectorizer(use_idf=True, stop_words='english')
    X_train_tfidf = vectorizer.fit_transform(X_train)
    X_val_tfidf = vectorizer.transform(X_val)
    X_test_tfidf = vectorizer.transform(X_test)

    X_train_len = pd.DataFrame([len(t) for t in X_train]).values
    X_val_len = pd.DataFrame([len(t) for t in X_val]).values
    X_test_len = pd.DataFrame([len(t) for t in X_test]).values

    scaler = StandardScaler()
    X_train_len = scaler.fit_transform(X_train_len)
    X_val_len = scaler.transform(X_val_len)
    X_test_len = scaler.transform(X_test_len)

    X_train_tfidf = hstack([X_train_tfidf, X_train_len])
    X_val_tfidf = hstack([X_val_tfidf, X_val_len])
    X_test_tfidf = hstack([X_test_tfidf, X_test_len])

    clf = svm.SVC(C=1000, gamma=0.001, kernel='rbf')
    clf.fit(X_train_tfidf, y_train)

    return clf, vectorizer, scaler, X_val_tfidf, y_val, X_test_tfidf, y_test

Saving latest_videos.csv to latest_videos.csv


In [3]:
def predict_political_affiliation(title, description, clf, vectorizer, scaler):
    text = title + ' ' + description
    text_tfidf = vectorizer.transform([text])
    text_len = pd.DataFrame([len(text)]).values
    text_len = scaler.transform(text_len)
    text_tfidf = hstack([text_tfidf, text_len])
    prediction = clf.predict(text_tfidf)
    return prediction[0]

In [5]:
import time
import pandas as pd
from random import choice
!pip install undetected_chromedriver
from undetected_chromedriver import Chrome, ChromeOptions
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import html

import csv
from collections import Counter
from csv import DictWriter
import os


clf, vectorizer, scaler, _, _, _, _ = train_model()
with open('latest_videos.csv', 'r', encoding='utf-8') as infile:
    reader = csv.DictReader(infile)
    right_video_titles = [row['Title'] for row in reader]


def remove_non_bmp_characters(text):
    return ''.join([c for c in text if ord(c) < 0x10000])



In [6]:
def check_and_record_recommendations(driver):
    recommendations = []
    political_affiliation_counts = Counter()
    try:
        video_elements = WebDriverWait(driver, 20).until(
            EC.presence_of_all_elements_located((By.CSS_SELECTOR, "yt-formatted-string#video-title"))
        )

        with open('watched_videos.txt', 'a', encoding='utf-8') as file:
            file.write("---- Homepage Recommendations ----\n")

        for index, video in enumerate(video_elements):
            title = video.text.strip()
            if not title:
                continue

            political_affiliation = predict_political_affiliation(title, title, clf, vectorizer, scaler)
            political_affiliation_counts[political_affiliation] += 1

            print(f"Video title {index + 1}: {title} (Political Affiliation: {political_affiliation})")

            with open('watched_videos.txt', 'a', encoding='utf-8') as file:
                file.write(f"{title} (Political Affiliation: {political_affiliation})\n")

            recommendation = {
                "Title": title,
                "Political Affiliation": political_affiliation
            }
            recommendations.append(recommendation)

        print("Number of videos for each Political Affiliation:")
        for affiliation, count in political_affiliation_counts.items():
            print(f"{affiliation}: {count}")

        # Save the counts to a CSV file
        with open('numbers.csv', 'a', newline='', encoding='utf-8') as csv_file:
            csv_writer = csv.writer(csv_file)
            csv_writer.writerow(["Left", "Right", "Non-political"])
            csv_writer.writerow([
                political_affiliation_counts["Left"],
                political_affiliation_counts["Right"],
                political_affiliation_counts["Non-political"]
            ])

    except Exception as e:
        print("Error while collecting recommendations:", e)

    return recommendations, political_affiliation_counts

In [7]:
def get_video_description(driver, video_title):
    # Open new tab
    driver.execute_script("window.open('');")
    driver.switch_to.window(driver.window_handles[-1])

    # Navigate to YouTube
    driver.get("https://www.youtube.com")

    # Search for video
    search_box = driver.find_element(By.CSS_SELECTOR, 'input#search')
    search_box.send_keys(video_title)
    search_box.send_keys(Keys.RETURN)
    time.sleep(2)

    # Attempt to get the video description
    try:
        video_element = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "ytd-video-renderer.ytd-item-section-renderer"))
        )
        video_element.click()
        time.sleep(2)

        description_element = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "yt-formatted-string.content.style-scope.ytd-video-secondary-info-renderer"))
        )

        description = description_element.text
    except TimeoutException:
        description = ""
        print(f"TimeoutException: Could not retrieve description for video {video_title}")
    except Exception as e:
        description = ""
        print(f"An error occurred while retrieving description for video {video_title}: {e}")

    # Close the tab and switch back to the original tab
    driver.close()
    driver.switch_to.window(driver.window_handles[0])

    return description

In [8]:
def watch_video(driver, video_title, channel_name):
    video_title = html.unescape(video_title)  # decode HTML entities
    video_title = remove_non_bmp_characters(video_title)
    channel_name = remove_non_bmp_characters(channel_name)
    try:
        # Clear the search bar if there is any existing text
        search_box = driver.find_element(By.CSS_SELECTOR, 'input#search')
        driver.execute_script("arguments[0].value = '';", search_box)  # Clear using JavaScript

        # Search for the video using its title and channel name
        search_query = f"{channel_name} {video_title}"
        search_box.send_keys(search_query)
        search_box.send_keys(Keys.RETURN)
        time.sleep(2)

        # Find the video elements with a matching title
        video_elements = WebDriverWait(driver, 15).until(
            EC.presence_of_all_elements_located((By.CSS_SELECTOR, "ytd-video-renderer.ytd-item-section-renderer")))

        matched_video = None
        for i, video_element in enumerate(video_elements[:2]):  # Compare against the first 2 videos
            title_element = video_element.find_element(By.CSS_SELECTOR, "#video-title")
            if video_title.lower() in title_element.text.lower():
                matched_video = video_element
                break

        if matched_video:
            matched_video.click()
        else:
            print(f"Video not found: {video_title} by {channel_name}")

        # Skip ad if present
        skip_ad(driver)

        # Watch video for 10 minutes
        time.sleep(600)

        # Pause the video
        pause_button = driver.find_element(By.CSS_SELECTOR, '.ytp-play-button')
        pause_button.click()
    except Exception as e:
        print("Error watching video:", e)

In [9]:
def skip_ad(driver):
    try:
        skip_ad_button = WebDriverWait(driver, 15).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, "button.ytp-ad-skip-button")))
        skip_ad_button.click()
    except TimeoutException:
        pass

In [10]:
def login_and_clear_history(driver):
    try:
        login_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, '//a[@aria-label="Sign in"]')))
        login_button.click()
        time.sleep(3)

        username_input = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//*[@id="identifierId"]')))
        username_input.send_keys("EMAIL")

        next_button = driver.find_element(By.XPATH, '//*[@id="identifierNext"]/div/button/span')
        next_button.click()
        time.sleep(3)

        password_input = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//*[@id="password"]/div[1]/div/div[1]/input')))
        password_input.send_keys("PASSWORD")

        login_button = driver.find_element(By.XPATH, '//*[@id="passwordNext"]/div/button/span')
        login_button.click()
        time.sleep(3)

        history_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, '//*[text()="History"]')))
        driver.execute_script("arguments[0].click();", history_button)
        time.sleep(3)

        clear_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, '//*[text()="Clear all watch history"]')))
        driver.execute_script("arguments[0].click();", clear_button)

        confirm_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, '//*[text()="Clear watch history"]')))
        driver.execute_script("arguments[0].click();", confirm_button)

    except Exception as e:
        print("Error:", e)

In [11]:
def sample_videos(df, n):
    left_sample = df[df["Political Affiliation"] == "Left"].sample(n * 8 // 10)
    right_sample = df[df["Political Affiliation"] == "Right"].sample(n // 10)
    non_political_sample = df[df["Political Affiliation"] == "Non-political"].sample(n // 10)
    sampled_videos = pd.concat([left_sample, right_sample, non_political_sample])
    return sampled_videos.sample(frac=1)


def record_title_to_file(title):
    with open('watched_videos.txt', 'a', encoding='utf-8') as file:
        file.write(title + '\n')

In [12]:
def watch_recommended_video(driver, recommendations, right_video_titles, target_affiliation='Right'):
    for video in recommendations:
        if video['Political Affiliation'] == target_affiliation:
            watch_video(driver, video['Title'], video['Channel Name'])
            return

    print(f"No video with affiliation {target_affiliation} found in recommendations.")
    random_right_video = choice(right_video_titles)
    print(f"Suggested video from Right dataset: {random_right_video}")
    watch_video(driver, random_right_video, '')

In [16]:
def main():
    # Initialize ChromeOptions
    options = ChromeOptions()
    options.add_argument("--disable-blink-features=AutomationControlled")
    options.add_argument("--mute-audio")

    # Initialize CSV file for counts only if it doesn't exist
    for csv_file_path in ['affiliation_counts.csv', 'numbers.csv']:
        if not os.path.exists(csv_file_path):
            with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file:
                csv_writer = DictWriter(csv_file, fieldnames=['Left', 'Right', 'Non-political'])
                csv_writer.writeheader()

    with Chrome(options=options) as driver:
        # Navigate to YouTube and login
        driver.get("https://www.youtube.com")
        login_and_clear_history(driver)

        # Read the DataFrame
        df = pd.read_csv("latest_videos.csv")
        n = 100  # Number of videos to establish bias
        sampled_videos = sample_videos(df, n)

        # Step 1: Establish Bias
        for _, row in sampled_videos.iterrows():
            decoded_title = html.unescape(row["Title"])
            watch_video(driver, decoded_title, row["Channel Name"])
            record_title_to_file(decoded_title)

        i = 0
        while i <= 100:
            i += 1

            # Step 2: First Check
            driver.get("https://www.youtube.com")
            time.sleep(3)
            recommendations, counts = check_and_record_recommendations(driver)

            # Append counts to CSV
            with open('affiliation_counts.csv', 'a', newline='', encoding='utf-8') as csv_file:
                csv_writer = DictWriter(csv_file, fieldnames=['Left', 'Right', 'Non-political'])
                # No header writing here
                csv_writer.writerow({
                    'Left': counts.get('Left', 0),
                    'Right': counts.get('Right', 0),
                    'Non-political': counts.get('Non-political', 0)
                })

            # Step 3: Refresh Mechanism
            found_opposing_view = False
            # Refresh up to 3 times
            for _ in range(3):
                opposing_video = next((video for video in recommendations if video['Political Affiliation'] == 'Right'), None)
                if opposing_video:
                    found_opposing_view = True
                    break
                else:
                    driver.refresh()
                    time.sleep(2)
                    recommendations, _ = check_and_record_recommendations(driver)

            # Step 4: Iterative View & Check
            if found_opposing_view:
                print(f"Found opposing view: {opposing_video['Title']}")
                watch_video(driver, opposing_video.get('Title', ''), opposing_video.get('Channel Name', ''))

            else:
                # Step 5: Active Search
                print("No opposing view found. Actively searching...")
                random_left_video = choice(right_video_titles)
                watch_video(driver, random_left_video, '')


if __name__ == "__main__":
    main()

TypeError: ignored