<a href="https://colab.research.google.com/github/cecileloge/cs224v-truthsleuth-trendbender/blob/main/notebooks/TruthSleuth.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Truth Sleuth AI: Fact-Checker Agent for YouTube Videos**
**Cecile Loge ep. Baccari** | ceciloge@stanford.edu | cecileloge@google.com \\
**Mohammad Rehan Ghori** | rghori@stanford.edu | rehang@google.com \\


---

**Motivation:** Misinformation is one of the most pressing threats of our time, and YouTube videos serve as a major platform through which it can spread [1]. Providing fact-checked information to address misleading content has been shown to be more effective than simply removing it [2].

**Project:** Can we build an application that takes a YouTube video as input and not only generates a list of the main claims made in the video but also fact-checks them?

---

* [1] An open letter to YouTube’s CEO from the world’s fact-checkers (on poynter.org), 2022. \\
* [2] Ecker, Ullrich KH, et al. "The effectiveness of short‐format refutational fact‐checks." British journal of psychology 111.1 (2020): 36-54.

---
## **Setting Up Everything**
Choosing the YouTube video url, and installing/importing libraries.

---

In [None]:
# Provide the video url
VIDEO_URL = "https://www.youtube.com/watch?v=ssqucRUoRjs"

In [None]:
# Install & Import Libraries

# Youtube Extractors
!pip install youtube-transcript-api
!pip install pytube
!pip install -U yt-dlp
!apt install ffmpeg
from youtube_transcript_api import YouTubeTranscriptApi
from pytube import extract

# Assembly AI
!pip install assemblyai
import assemblyai as aai

# Data & Tools
import pandas as pd
from google.colab import userdata
from PIL import Image
import json
import os
current_dir = os.getcwd()
from datetime import datetime, date
import time

# Google API
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

# Markdown for the final output
from IPython.display import display, Markdown, Latex
import textwrap

# Gemini API
!pip install -q -U google-generativeai
import google.generativeai as genai
from google.colab import userdata

# LangChain Prompting
!pip install langchain
from langchain import PromptTemplate

# For Web Scraping
!pip install requests
!pip install beautifulsoup4
!pip install wikipedia
from bs4 import BeautifulSoup
import requests
import wikipedia
import googlesearch as g

In [None]:
# YouTube API Key (Google for Developers Platform)
DEVELOPER_KEY=userdata.get('DEVELOPER_KEY')

# Google Developer API Key for GenAI
GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
genai.configure(api_key=GOOGLE_API_KEY)
model = genai.GenerativeModel(model_name="gemini-1.5-flash")
GEM_SAFETY_SETTINGS = [
    {
      "category": "HARM_CATEGORY_HARASSMENT",
      "threshold": "BLOCK_NONE"
    },
    {
      "category": "HARM_CATEGORY_HATE_SPEECH",
      "threshold": "BLOCK_NONE"
    },
    {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_NONE"
    },
    {
      "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
      "threshold": "BLOCK_NONE"
    }
    ]

# Assembly AI API Key
AAI_API_KEY = userdata.get('AAI_API_KEY')
aai.settings.api_key = AAI_API_KEY

In [None]:
# Prompt Templates
!mkdir prompts
!curl -L -o prompts/reformat.prompt "https://drive.google.com/uc?export=download&id=1aykUMXxUR1gWOil5X83aF7aem1s5rjIT"
!curl -L -o prompts/claims.prompt "https://drive.google.com/uc?export=download&id=1AlKLI-IP05jMps4Ol7BN2RPcmAMH2H-J"
!curl -L -o prompts/factcheck.prompt "https://drive.google.com/uc?export=download&id=1cevbk44t7ZWJr3rwpu-b1ncqHYdq-p8x"

mkdir: cannot create directory ‘prompts’: File exists
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   224  100   224    0     0     81      0  0:00:02  0:00:02 --:--:--    96
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  8614  100  8614    0     0   3436      0  0:00:02  0:00:02 --:--:--  8562
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  4630  100  4630    0     0   1590      0  0:00:02  0:00:02 --:--:--  368

---
## **STEP 1 | Extracting info & audio from Video URL**

Functions to process a video from a provided YouTube link. Should output a text transcript from the audio - along with descriptions of the video (title, author, tags) and a summary of the comments.

---

In [None]:
def get_comments(video_id):
  """
  Function to get top 100 comments from a YouTube video.
  Saves them into comments.csv. Returns a panda dataframe.
  """
  youtube = build("youtube", "v3", developerKey=DEVELOPER_KEY)

  try:
    # Retrieve comment thread using the youtube.commentThreads().list() method
    response = youtube.commentThreads().list(
        part="snippet",
        videoId=video_id,
        maxResults=100,
        order="relevance"
    ).execute()

    comments = []
    for item in response["items"]:
      comment_text = item["snippet"]["topLevelComment"]["snippet"]["textDisplay"]
      likes = item["snippet"]["topLevelComment"]["snippet"]["likeCount"]
      comments.append({"comment": comment_text, "num_of_likes": likes})

      #if 'nextPageToken' in response:
      #  response = youtube.commentThreads().list(
      #      part="snippet",
      #      videoId=video_id,
      #      maxResults=100,
      #      order="relevance"
      #      pageToken = response['nextPageToken']
      #  ).execute()
      #else:
      #  break
      comments_df = pd.DataFrame(comments).sort_values(by=['num_of_likes'], ascending=False)
      comments_df.to_csv("comments.csv", index=False)
    return comments_df
  except HttpError as error:
    print(f"An HTTP error {error.http_status} occurred:\n {error.content}")
    return None

In [None]:
def get_video_details(video_id):
  """
  Function to get details from a YouTube video.
  Returns a tuple for title, channel, tags, views, likes.
  """
  youtube = build('youtube', 'v3', developerKey=DEVELOPER_KEY)
  request = youtube.videos().list(part='snippet,statistics', id=video_id)
  details = request.execute()
  thumbnail_url = details['items'][0]['snippet']['thumbnails']['high']['url']
  channel = details['items'][0]['snippet']['channelTitle']
  title = details['items'][0]['snippet']['title']
  tags = details['items'][0]['snippet'].get('tags')
  likes = int(details['items'][0]['statistics']['likeCount'])
  views = int(details['items'][0]['statistics']['viewCount'])
  videodate = details['items'][0]['snippet']['publishedAt']
  return channel, title, tags, likes, views, thumbnail_url, videodate


In [None]:
def get_captions(video_url, video_id):
  """
  Function to get audio captions in 'en' (English) from a YouTube video.
  Either from the YouTube subtitles if they exist, or from Assembly AI.
  Uses Gemini to format the raw audio captions, and returns a string.
  """
  try:
    yt = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'])
    captions = ''
    for i in yt:
      captions += i['text']+" "
  except Exception as e:
    print(f"Error: {e}")
    print(f"Using Assembly AI instead...")
    !yt-dlp --get-url -f bestaudio $VIDEO_URL > audio.txt
    with open('audio.txt', 'r') as file:
      AUDIO_URL = file.read()
    config = aai.TranscriptionConfig(auto_highlights=True)
    transcriber = aai.Transcriber()
    transcript = transcriber.transcribe(AUDIO_URL, config)
    captions = transcript.text

  # Using Gemini to format the raw audio captions
  with open("prompts/reformat.prompt", "r") as f:
    text = f.read()
  prompt_template = PromptTemplate.from_template(template=text, template_format="jinja2")
  prompt: str = prompt_template.format(captions=captions)
  response = model.generate_content(prompt, safety_settings=GEM_SAFETY_SETTINGS)
  audio_captions_formatted = response.text

  return audio_captions_formatted

---
## **STEP 2 | Extracting the claims to fact-check from Video audio**

Functions to extract the top claims made in the video. We will be using Google's Gemini with robust prompt engineering - leveraging the LangChain library.

---

In [None]:
def extract_claims(video_url):
  """
  Function to extract the top claims that should be fact-checked.
  Uses Gemini with the claims.prompt prompt.
  Returns a tuple with: title, channel, thumbnail_url,
      and a json object with fields 'claim', 'questions', 'passage', 'relevance'
  """
  video_id = extract.video_id(video_url)
  channel, title, _, _, _, thumbnail_url, videodate = get_video_details(video_id)
  videodate = datetime.strptime(videodate[:10], '%Y-%m-%d').strftime("%Y-%m-%d")
  audio_captions_formatted = get_captions(video_url, video_id)

  # Using the prompt template and calling Gemini
  with open("prompts/claims.prompt", "r") as f:
    text = f.read()

  prompt_template = PromptTemplate.from_template(template=text, template_format="jinja2")
  claims_prompt: str = prompt_template.format(
      todaydate=date.today().strftime("%Y-%m-%d"),
      videodate=videodate,
      channel=channel,
      title=title,
      captions=audio_captions_formatted,
      )
  response = model.generate_content(claims_prompt, safety_settings=GEM_SAFETY_SETTINGS)

  json_promt = "Make sure the following text can be read directly by json.loads(): <<< " + response.text + " >>>. '\
  Don't output anything else than your version of the text."
  response = model.generate_content(json_promt, safety_settings=GEM_SAFETY_SETTINGS)
  claims = json.loads(response.text[8:-4])

  return title, channel, thumbnail_url, claims

---
## **STEP 3 | Fact-checking the claims by cross-checking reliable sources**

Cross-reference claims with reliable sources, and classify claims into true, unsure and false (ideally with links / sources).


This step leverages the Google FactCheck Claim Search API, the Wikipedia API and the Google Search API.

The LLM is called several times throughout this step via robust prompt engineering to interpret, cross-reference, and ultimately classify the claims.  

---

In [None]:
# Our reliable sources for Fact-Checking
# 1 - Google Fact-Check API
GOOGLE_FACT_CHECK_API_KEY = userdata.get('GFC_API_KEY')
GOOGLE_FACT_CHECK_URL = 'https://factchecktools.googleapis.com/v1alpha1/claims:search'

# 2 - Wikipedia
wikipedia.set_lang('en')

# 3 - Google Search
GOOGLE_USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"

In [None]:
def call_googlefacts(claim):
  """
  Function to call the Google Fact Check API
  Returns the claim reviews in the Claim Review structured format
  https://developers.google.com/search/docs/appearance/structured-data/factcheck
  """

  params = {
      'query': claim,
      'key': GOOGLE_FACT_CHECK_API_KEY
  }
  response = requests.get(GOOGLE_FACT_CHECK_URL, params=params)
  if response.status_code == 200:
      data = response.json()
      return data.get('claims', [])
  else:
      return None

def check_googlefacts(claim):
  """
  Function to process the Google Fact Check claim reviews
  Returns a string = concatenation of relevant results
  """
  results = call_googlefacts(claim)
  summary = ''
  if results:
      for i, r in enumerate(results):
          source = r.get('claimReview', [{}])[0].get('publisher').get('name')
          url = r.get('claimReview', [{}])[0].get('url')
          claimant = r.get('claimant')
          date = r.get('claimDate')
          text = r.get('text')
          truthfulness = r.get('claimReview', [{}])[0].get('textualRating')
          summary += f"Source #{i+1}: {source} at {url}.\n     Claimant: {claimant}\n     Description: {text}\n     Truthfulness: {truthfulness}\n\n"

  return summary

def check_googlesearch(claim, limit=2):
  """
  Function to call the Google Search API
  Calls Gemini to process the results and generate a summary
  Returns a string = concatenation of relevant results
  """
  urls = list(g.search(claim, stop=limit, lang='en'))
  headers = {"user-agent": GOOGLE_USER_AGENT}
  summary = ''
  for url in urls:
    session = requests.Session()
    website = session.get(url, headers=headers)
    web_soup = BeautifulSoup(website.text, 'html.parser')
    summary_promt = "I need you to give me the key information contained in a specific webpage article. I will provide you with html code. '\
    Do not describe the webpage or the article. Focus on extracting the key claims, and summarizing the information the page is giving in a precise, comprehensive yet concise way. '\
    Your answer should ideally help answer the question: " + claim + ".\n'\
    HTLM Code: <<< " + str(web_soup) + " >>>"
    retry_count = 0
    while retry_count < 3:
      try:
         response = model.generate_content(summary_promt, safety_settings=GEM_SAFETY_SETTINGS)
         summary += f"Source: {url}.\nDescription: {response.text}\n"
         retry_count = 3
      except Exception as e:
         time.sleep(2)
      retry_count += 1
    session.close()
  return summary

def check_wikipedia(claim):
  """
  Function to call the Wikipedia API and process the results
  Returns a string = concatenation/summary of relevant Wikipedia articles
  """
  summary = ''
  search_results = wikipedia.search(claim)
  for r in search_results[:2]:
    try:
      call = wikipedia.page(r)
      summary += f"From the \"{r}\" Wikipedia page ({call.url}): "+ call.content +"\n\n"
    except wikipedia.exceptions.DisambiguationError:
      summary += ''
    except wikipedia.exceptions.PageError:
      summary += ''
    except wikipedia.exceptions.WikipediaException:
      summary += ''
    except Exception:
      summary += ''
  return summary

In [None]:
def get_claim_summary(claim, questions):
  """
  Function to get the final say on a specific claim.
  Calls Gemini with the factcheck.prompt prompt.
  Returns json object with fields: `claim`, `verdict`, `reason`, `sources`.
  """
  summary_gfc = ''
  summary_search = ''
  for q in questions:
    gfc = check_googlefacts(q)
    if gfc:
      summary_gfc += gfc + "\n"
  for q in questions:
    search = check_googlesearch(q)
    summary_search += search + "\n"
  summary_wiki = check_wikipedia(claim)

  with open("prompts/factcheck.prompt", "r") as f:
        text = f.read()
  prompt_template = PromptTemplate.from_template(template=text, template_format="jinja2")
  fact_check_prompt: str = prompt_template.format(
      claim=claim,
      report_GFC=summary_gfc + "\n" + summary_search,
      report_wiki=summary_wiki,
      )

  retry_count = 0
  while retry_count < 3:
      try:
         response = model.generate_content(fact_check_prompt, safety_settings=GEM_SAFETY_SETTINGS)
         return response.text
      except Exception as e:
         #print(f"Error: {e}")
         time.sleep(2)
      retry_count += 1
  return None

---
## **TRUTH SLEUTH AGENT | Putting it all together & Generating the Fact-Check Report**

Generating the final report, leveraging Markdown for formatting. Option to skip printing the "Unsure" claims.

---


In [None]:
def generate_report(video_url, skip_unsure = True):

  # Extracting the claims
  title, channel, thumbnail_url, claims = extract_claims(video_url)

  # Formatting
  color = {"question": "black", "true": "MediumSpringGreen", "partly true": "LightGreen", "partly false": "lightcoral", "false": "red", "unsure": "grey"}
  capped = {"true": "TRUE", "partly true": "PARTLY TRUE", "partly false": "PARTLY FALSE", "false": "FALSE", "unsure": "UNSURE"}
  delimiter = "\n"+"_"*100+"\n"

  # Generating Report Header
  thumb = Image.open(requests.get(thumbnail_url, stream=True).raw)
  formatted_title1 = f"<font size='+2' color='Bisque'><blockquote>📓📓 🔍 **TRUTH SLEUTH FACT-CHECK REPORT** 🔍 📓📓</blockquote></font>"
  formatted_title2 = f"<font size='+2' color='white'><blockquote>**{title}** by {channel}</blockquote></font>"
  display(Markdown(formatted_title1 + formatted_title2))
  display(thumb)

  # Getting each claim's verdict + Formatting again
  for i, c in enumerate(claims["claims"]):
    text = get_claim_summary(c['claim'], c['questions'])
    if text == None:
      continue
    text = text.replace("$", "USD ")

    try:
      verdict = json.loads(text)
    except json.JSONDecodeError:
      verdict = json.loads(text[8:-4])
    if (verdict.get('verdict') == "unsure" and skip_unsure):
      continue

    formatted_claim = f"<font size='+1' color='white'><blockquote>**• {c['claim']}**</blockquote></font>"
    formatted_verdict = f"<font size='+1' color='{color[verdict.get('verdict')]}'><blockquote>**{capped[verdict.get('verdict')]}**\n\n</blockquote></font>"
    links = ""
    for l in verdict.get('sources'):
      links += "\n\n• " + l
    display(Markdown(delimiter + formatted_claim+formatted_verdict+verdict.get('reason')+links))
  display(Markdown(delimiter))

  return None

In [None]:
generate_report(VIDEO_URL, True)