In [12]:
import pandas as pd
import time
import os
import requests
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
import re
import nltk
from IPython.display import Markdown, display, YouTubeVideo
import google.generativeai as genai
from google.generativeai import caching
from google.auth.credentials import AnonymousCredentials
from google.auth import compute_engine
from IPython.display import Markdown, display
from kaggle_secrets import UserSecretsClient
from nltk.corpus import wordnet

! pip install -q youtube-search-python 
!pip install youtube-transcript-api 
from youtubesearchpython import VideosSearch
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import JSONFormatter



In [13]:
def mock_get_universe_domain(request):
    return "googleapis.com"

# Override the original metadata fetching function
compute_engine._metadata.get_universe_domain = mock_get_universe_domain

# Set the Google application credentials manually (replace with the actual path)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/kaggle/input/google-cloud-key/esoteric-cab-443306-n6-8b6ccf376c34.json"

In [14]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
secret_value_0 = user_secrets.get_secret("GEMINI-API-KEY")
secret_value_1 = user_secrets.get_secret("ncbi_api_key")

In [15]:
def fetch_pmid_list(query, max_results=100):
    base_url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?api_key={secret_value_1}"
    params = {
        "db": "pubmed",
        "term": query,
        "retmax": max_results,
        "usehistory": "y"
    }
    response = requests.get(base_url, params=params)
    time.sleep(1)
    if response.status_code == 200:
        root = ET.fromstring(response.content)
        webenv = root.find("WebEnv").text
        query_key = root.find("QueryKey").text
        pmids = [id.text for id in root.findall("IdList/Id")]
        return pmids, webenv, query_key
    else:
        print("Error fetching PMIDs.")
        return [], None, None
        

def fetch_article_details(pmids, webenv, query_key, retstart=0, retmax=100):
    base_url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?api_key={secret_value_1}"
    ids = ",".join(pmids)
    params = {
        "db": "pubmed",
        "id": ids,
        "retstart": retstart,
        "retmax": retmax,
        "WebEnv": webenv,
        "query_key": query_key,
        "rettype": "xml",
        "retmode": "xml"
    }
    response = requests.get(base_url, params=params)
    time.sleep(1)
    if response.status_code == 200:
        root = ET.fromstring(response.content)
        articles = []
        for docsum in root.findall("PubmedArticle"):
            article = {}
            medline_citation = docsum.find("MedlineCitation")
            if medline_citation is not None:
                article["pmid"] = medline_citation.find("PMID").text
                article["title"] = medline_citation.find("Article/ArticleTitle").text
                article["source"] = medline_citation.find("Article/Journal/Title").text
                article["authors"] = []
                for author in medline_citation.findall("Article/AuthorList/Author"):
                    last_name = author.find("LastName")
                    fore_name = author.find("ForeName")
                    if last_name is not None and fore_name is not None:
                        article["authors"].append(f"{fore_name.text} {last_name.text}")
                article["abstract"] = medline_citation.find("Article/Abstract/AbstractText")
                if article["abstract"] is not None:
                    article["abstract"] = article["abstract"].text
                articles.append(article)
        return articles
    else:
        print("Error fetching article details.")
        return []
        

def fetch_content(pmid):
    url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/?api_key={secret_value_1}"
    headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"
    }

    response = requests.get(url, headers=headers)
    time.sleep(1)
    if response.status_code == 200:
        #print(f"fetching full text for PMID: {pmid}")
        return response.content
    else:
        return None

In [16]:
def extract_article_sections(query, max_results):
    # Fetch the PMIDs
    pmids, webenv, query_key = fetch_pmid_list(query, max_results=max_results)
    
    # Fetch article details
    articles = fetch_article_details(pmids, webenv, query_key)
    
    # Loop through each article, fetch content, and extract sections
    for article in articles:
        pmid = article.get('pmid')
        if pmid:
            html_content = fetch_content(pmid)
            soup = BeautifulSoup(html_content, 'html.parser')
            paragraphs = soup.find_all('p')
            if paragraphs:
                for para in paragraphs:
                    para_content = para.text.strip()
                    if para_content.startswith('Introduction'):
                        article.update({'Introduction': para_content})
                    elif para_content.startswith('Clinical case'):
                        article.update({'Clinical case': para_content})
                    elif para_content.startswith('Methods'):
                        article.update({'Methods': para_content})
                    elif para_content.startswith('Results'):
                        article.update({'Results': para_content})
                    elif para_content.startswith('Conclusion'):
                        article.update({'Conclusion': para_content})
    
    df = pd.DataFrame(articles)
    return df

In [17]:
# Function to get the transcript of a video
def get_transcript(video_id):
    try:
        trnscrpt = YouTubeTranscriptApi.get_transcript(video_id)
        text = " ".join([entry['text'] for entry in trnscrpt])
        #print(f"Fetched transcript")
        return text
    except Exception as e:
        print(f"Error fetching transcript for video {video_id}: No transcript available for video ID: {video_id}.\n")
        return None

In [18]:
# Function to process transcripts and find claims with fact-checking keywords
def find_top_claims_and_keywords(video):
    model = genai.GenerativeModel("gemini-1.5-flash")
    
    title = video['title']
    description_snippet = video.get('descriptionSnippet', [])
    description = "".join([desc['text'] for desc in description_snippet]) if isinstance(description_snippet, list) else ""
    video_link = video['link']
    video_id = video['id']

    print(f"Processing video: {title} (ID: {video_id})")

    # Get transcript for the video
    transcript = get_transcript(video_id)  # Replace with your transcript-fetching function
    if not transcript:
        print(f"No transcript found for video: {title}")
        return None

    # Split the transcript into chunks of max 1000 characters (you can adjust this size)
    max_chunk_size = 50000
    chunks = [transcript[i:i + max_chunk_size] for i in range(0, len(transcript), max_chunk_size)]
    
    claims_list = []

    # Process each chunk
    for i, chunk in enumerate(chunks):
        #print(f"Processing chunk {i + 1}/{len(chunks)}...")
        #print("Generating claims from transcript...")
        # Create the claims prompt for each chunk
        claims_prompt = (
            f"Extract up to 3 unique, health-related, evidence-based claims from the following transcript chunk.:\n\n{chunk}"
        )

        # Generate claims for this chunk
        claims_response = model.generate_content([claims_prompt])
        claims_text = claims_response.text.strip()

        if claims_text:
            claims_list.append(claims_text)
        else:
            print(f"No claims found in chunk {i + 1}")

    # Combine the claims from all chunks
    all_claims = "\n".join(claims_list)
    #claims_prompt = (
        #f"Extract up to 3 unique, evidence-based claims from the transcript below. "
        #f"Exclude general knowledge or common sense:\n\n{transcript[:1000]}"
    #)
    
    #claims_response = model.generate_content([claims_prompt])

    # Check if claims are available
    #claims_text = claims_response.text.strip()
    #if not claims_text:
        #print("No claims generated")
        #return {"title": title, "link": video_link, "claims": [], "keywords": []}


    #print("Extracting claims from response...")
    pattern = r"\d+\.\s\*\*(.*?)\*\*\s*(.*?)(?=\n\d+\.|\Z)"
    claims = re.findall(pattern, all_claims, re.DOTALL)    
    claims_list = [f"{claim[0]} {claim[1]}" for claim in claims]
    #print(f"Found {len(claims_list)} claims.")

    keyword_dict = {}
    for i, claim in enumerate(claims_list):
        time.sleep(1)
        
        # Extract keywords for fact-checking each claim
        #print(f"Generating keywords for Claim {i}")
        keywords_prompt = (
            f"Identify up to 3 highly relevant and specific keywords or key phrases that are essential for "
            f"accurately fact-checking the following claim:\n\n{claims}\n\nFocus on the following: \n"
            f"1. Terms that directly address the core subject of the claim.\n"
            f"2. Phrases that are likely to lead to reliable and precise information when searching in academic or scientific sources.\n"
            f"3. Keywords that would yield articles, studies, or data relevant to the claim, especially those related to "
            f"the topic's evidence, expert opinions, and public health implications.\n\n"
            f"Ensure the selected keywords are both specific and comprehensive for effective fact-checking."
        )
        
        keywords_response = model.generate_content([keywords_prompt])

        keywords_text = keywords_response.text.strip()
        keywords = re.findall(r'"([^"]+)"', keywords_text)
        keywords_string = ', '.join(keywords)
        keyword_dict[claim] = keywords_string
        #print(claim)
        #print(keywords_string)
    
    # Display results
    if len(claims_list)>0:
        display(Markdown(
            f"### {title}\n"
            f"[Watch Video]({video_link})\n\n"
            f"**Top Claims:**\n\n" +
            "\n\n".join([f"{idx + 1}. {claim}" for idx, claim in enumerate(claims_list)]) 
        ))
    else:
        print("No claim found.")
   
    return {
        "title": title,
        "link": video_link,
        "claims": claims_text,
        "keywords": keyword_dict,
        'claim_list': claims_list
    }

In [19]:
# Function to perform fact-checking using Gemini AI
def fact_check_claims_with_confidence(articles_df, claim):
    # Initialize a Gemini model (make sure you have the appropriate API and model)
    model = genai.GenerativeModel("gemini-1.5-flash")

    # Initialize a dictionary to store the fact-check results
    fact_check_results = {}

    # Extract relevant sections from the articles (e.g., abstract, results, and conclusion)
    articles_text = ""
    for _, row in articles_df.iterrows():
        article_content = (
            f"PMID: {row.get('pmid', 'N/A')}\n"
            f"Title: {row.get('title', 'N/A')}\n"
            f"Abstract: {row.get('abstract', 'N/A')}\n"
            f"Methods: {row.get('Methods', 'N/A')}\n"
            f"Results: {row.get('Results', 'N/A')}\n"
            f"Conclusion: {row.get('Conclusion', 'N/A')}\n"
        )
        articles_text += article_content + "\n"

    # Prepare the prompt for Gemini AI to fact-check the claim based on articles
    time.sleep(1)
    
    prompt = f"""
    Fact-check the following claim based on web data and provided articles. 
    Provide the fact-check result (True/False/Not able to validate/Conflicting results reported) 
    and a confidence score between 0 and 1:

    Claim: '{claim}'

    Articles:
    {articles_text}

    Please respond with:
    1. The fact-check result: True/False/Not able to validate/Conflicting results reported
    2. The confidence score: A numerical value between 0 and 1
    """
        
    # Generate response from Gemini AI
    response = model.generate_content([prompt])

    # Extract the fact-checking result and confidence score from the response text
    result_text = response.text.strip()
    print(f"\n ### Fact check results for claim: {claim[:claim.find(':')]}.")
    print(f"### Fact Check Analysis:\n{result_text}\n")


    try:
        # Try to parse the fact-check result and confidence score from the response
        fact_check_result = result_text.split(",")[0].split(":")[1].strip()
        confidence_score = float(result_text.split(",")[1].split(":")[1].strip())

        # Store the results in a dictionary
        fact_check_results[claim] = {'result': fact_check_result, 'confidence_score': confidence_score}

    except (IndexError, ValueError) as e:
        # Handle cases where the response format is not as expected
        fact_check_results[claim] = {'result': 'Error', 'confidence_score': 0.0, 'error': str(e)}

    return fact_check_results

In [20]:
def search_academic_articles(keywords):
    query = "+".join(keywords)
    url = f"https://api.semanticscholar.org/v1/paper/search?query={query}&limit=5"
    response = requests.get(url)
    articles = response.json()
    return articles['data'] if 'data' in articles else []

In [21]:
search = VideosSearch('health', limit=1)  # Adjust the limit as needed
results = search.result()


# Check each result for claim-related keywords in the title, description, or transcript
for video in results['result']:
    claim_results = find_top_claims_and_keywords(video)
    if claim_results:
        for claim in claim_results['claim_list']:
            articles_df = pd.DataFrame()
            if claim_results['keywords'][claim]:
                keywords = claim_results['keywords'][claim]
                for item in keywords.split(','):
                    if len(item)>0:
                        new_articles_df = extract_article_sections(query=item, max_results=20)
                        articles_df = pd.concat([articles_df, new_articles_df])
                        if len(new_articles_df)>0:
                            print(f"Fetched {len(new_articles_df)} articles with keywrods: {item}.")
                if len(articles_df)>0:
                    fact_check_results = fact_check_claims_with_confidence(articles_df, claim)
            
                else:
                    print(f"No articles found for the claim with keywords: {claim_results['keywords'][claim]}.\n")
                    search_academic_articles(keywords)

Processing video: America’s Health Crisis EXPOSED - Why Toxic Food Industry FEARS RFK Jr. (ID: vfI5xQo7XiY)


### America’s Health Crisis EXPOSED - Why Toxic Food Industry FEARS RFK Jr.
[Watch Video](https://www.youtube.com/watch?v=vfI5xQo7XiY)

**Top Claims:**

1. Higher US healthcare spending correlates with lower life expectancy: The transcript highlights that the US spends a significantly higher percentage of its GDP on healthcare than other countries, yet has a lower life expectancy. This is a widely documented fact supported by various international health data comparisons (e.g., from the OECD, WHO).


2. Higher rates of avoidable deaths in the US: The transcript points to the US having a substantially higher rate of avoidable deaths (due to factors like smoking, excessive alcohol consumption, and poor diet) compared to peer nations.  This claim can be verified through mortality data analyzing causes of death and comparing across countries.


3. High US rates of infant and maternal mortality: The transcript cites significantly higher rates of infant and maternal mortality in the US compared to OECD averages. This is a well-established disparity supported by data from organizations like the CDC and WHO.

Fetched 20 articles with keywrods: avoidable.

 ### Fact check results for claim: Higher US healthcare spending correlates with lower life expectancy.
### Fact Check Analysis:
1. **Fact-check result:** True

2. **Confidence score:** 0.8

**Explanation:**

The provided articles do not directly address the correlation between US healthcare spending and life expectancy.  However, the claim itself is widely supported by numerous readily available data sources from organizations like the OECD and WHO.  These sources consistently show that the US spends far more on healthcare per capita and as a percentage of GDP than many other developed nations, yet its life expectancy is comparatively lower.  Therefore, while the articles are irrelevant, the claim's veracity is easily verifiable through publicly available data, making "True" the appropriate fact-check result.  The confidence score is not 1.0 because the supporting evidence is not directly within the provided text; it relies on external, w