In [1]:
pip install pytube moviepy openai-whisper requests transformers ultralytics torch opencv-python

Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl.metadata (5.0 kB)
Collecting openai-whisper
  Downloading openai-whisper-20240930.tar.gz (800 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/800.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m800.5/800.5 kB[0m [31m37.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting ultralytics
  Downloading ultralytics-8.3.28-py3-none-any.whl.metadata (35 kB)
Collecting tiktoken (from openai-whisper)
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting triton>=2.0.0 (from openai-whisper)
  Downloading triton-3.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.3 kB)
Collecting ultralytics-thop>=2.0.0 

In [6]:
!pip install yt-dlp

Collecting yt-dlp
  Downloading yt_dlp-2024.11.4-py3-none-any.whl.metadata (172 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/172.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.1/172.1 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading yt_dlp-2024.11.4-py3-none-any.whl (3.2 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m90.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: yt-dlp
Successfully installed yt-dlp-2024.11.4


In [15]:
import os
import yt_dlp
from moviepy.editor import *
import whisper
import requests
import cv2
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForTokenClassification
from ultralytics import YOLO
from torchvision import models, transforms
from torch.nn.functional import softmax
from difflib import SequenceMatcher
import yt_dlp
from moviepy.editor import VideoFileClip

# Step 1: Download YouTube Video
def download_youtube_video(url, output_dir='/content/downloaded_videos'):
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Define the full path to where the video will be saved
    video_path = os.path.join(output_dir, 'downloaded_video.mp4')

    ydl_opts = {
        'format': 'bestvideo+bestaudio/best',  # Download best quality available
        'outtmpl': video_path,                 # Use specified directory and filename
        'merge_output_format': 'mp4',          # Force merge in .mp4 format
    }

    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
            # Check if the video was downloaded successfully
            if os.path.exists(video_path):
                print(f"Downloaded video to: {video_path}")
                return video_path
            else:
                print("Download failed: File not found.")
                return None
    except Exception as e:
        print("Download failed:", e)
        return None

# Step 2: Extract Audio from Video
def extract_audio_from_video(video_path):
    # Check if video file exists before proceeding
    if not os.path.exists(video_path):
        print(f"Video file not found: {video_path}")
        return None

    # Define audio path
    audio_path = video_path.replace(".mp4", ".wav")

    try:
        # Extract audio
        video = VideoFileClip(video_path)
        video.audio.write_audiofile(audio_path)
        print(f"Extracted audio to: {audio_path}")
        return audio_path
    except Exception as e:
        print("Audio extraction failed:", e)
        return None






# Step 3: Transcribe Audio to Text Using Whisper (Large model for multilingual support)
def transcribe_audio(audio_path):
    model = whisper.load_model("large")  # Large model for better multilingual support
    result = model.transcribe(audio_path)
    return result["text"], result["language"]

# Step 4: Extract Medical Terms Using Multilingual Model (e.g., XLM-RoBERTa fine-tuned for biomedical terms)
def extract_medical_terms_multilingual(text, language):
    model_name = "xlm-roberta-large"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name)

    tokens = tokenizer(text, return_tensors="pt", truncation=True)
    output = model(**tokens)
    predictions = np.argmax(output.logits.detach().numpy(), axis=2)

    medical_terms = []
    for i, token in enumerate(tokens["input_ids"][0]):
        label = predictions[0][i]
        if label > 0:  # B- and I- tags for medical terms
            term = tokenizer.decode([token])
            medical_terms.append(term.strip())

    return list(set(medical_terms))

# Step 5: Multilingual Fact-Checking Function
def fact_check_medical_content_all(medical_terms, language):
    flagged_terms = []

    # Updated database endpoints to include more multilingual or regional databases
    databases = {
        "Primary Care": ["ClinicalTrials.gov", "WHO", "Europe PMC"],
        "Surgical Specialties": ["ClinicalTrials.gov", "MedlinePlus", "Medscape"],
        "Medical Specialties": ["PubMed", "ClinicalTrials.gov", "Europe PMC"],
        "Diagnostic Specialties": ["PubMed", "ClinicalTrials.gov"],
        "Mental Health": ["Psychiatric Database", "PubMed", "Europe PMC"],
        "Sports Medicine": ["Sports Medicine Research Journals", "PubMed"],
        "Medical Genetics": ["GeneOntology", "NCBI Genetic Databases", "Europe PMC"],
        "Rehabilitation and Pain Management": ["PubMed", "ClinicalTrials.gov"],
        "Other Specialties": ["PubMed", "ClinicalTrials.gov"]
    }

    for term in medical_terms:
        validated = False

        for db_type, db_list in databases.items():
            for db in db_list:
                if db == "ClinicalTrials.gov":
                    response = requests.get(f"https://clinicaltrials.gov/api/query/full_studies?expr={term}&min_rnk=1&max_rnk=1&fmt=json")
                    if response.status_code == 200 and len(response.json()['FullStudiesResponse']['FullStudies']) > 0:
                        validated = True

                elif db == "WHO":
                    response = requests.get(f"https://www.who.int/api/v1/search?q={term}&lang={language}")
                    if response.status_code == 200 and len(response.json().get('results', [])) > 0:
                        validated = True

                elif db == "Europe PMC":
                    response = requests.get(f"https://www.ebi.ac.uk/europepmc/webservices/rest/search?query={term}&format=json")
                    if response.status_code == 200 and len(response.json().get('resultList', {}).get('result', [])) > 0:
                        validated = True

                elif db == "PubMed":
                    response = requests.get(f"https://pubmed.ncbi.nlm.nih.gov/?term={term}")
                    if response.status_code == 200:
                        validated = True

        if not validated:
            flagged_terms.append(term)

    return flagged_terms

# Step 6: Analyze Video Frames Using YOLOv8 and CNN for Enhanced Medical Object Detection
def analyze_video_frames_yolo_cnn(video_path):
    # Load YOLO and CNN models
    yolo_model = YOLO('yolov8m_finetuned.pt')  # YOLO model fine-tuned for medical objects
    cnn_model = models.resnet50(pretrained=True)
    cnn_model.eval()

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    cap = cv2.VideoCapture(video_path)
    medical_objects = ["stethoscope", "scalpel", "ECG", "syringe", "dental-tool", "herb", "surgical-mask"]
    detected_items = []
    frame_skip = 5  # Process every 5th frame for efficiency

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % frame_skip == 0:
            # YOLO object detection
            results = yolo_model(frame)
            for result in results:
                detected_labels = [yolo_model.names[int(cls)] for cls in result.boxes.cls]
                detected_medical_objects = [label for label in detected_labels if label in medical_objects]

                # If YOLO detects a relevant medical object, verify with CNN
                for detected_object in detected_medical_objects:
                    cnn_input = transform(frame).unsqueeze(0)  # Preprocess the frame for CNN input
                    with torch.no_grad():
                        output = cnn_model(cnn_input)
                        prob = softmax(output, dim=1)
                        confidence = prob.max().item()
                        if confidence > 0.6:  # Confidence threshold for CNN
                            detected_items.append(detected_object)

        frame_count += 1

    cap.release()
    return list(set(detected_items))

# Step 7: Flag Video for Misinformation with Enhanced Scoring
def flag_video(medical_terms, flagged_terms, visual_detections):
    print("Extracted Medical Terms:", medical_terms)
    print("Flagged Terms:", flagged_terms)
    print("Detected Visuals:", visual_detections)

    misinformation_score = len(flagged_terms) + (0 if visual_detections else 1)

    if misinformation_score > 0:
        print("Video contains potential medical misinformation.")
    else:
        print("No misinformation detected.")
    return flagged_terms

# Main Function
def main():
    url = input("Enter the YouTube video URL: ")
    video_path = download_youtube_video(url)

    if video_path:
        audio_path = extract_audio_from_video(video_path)
        transcription, language = transcribe_audio(audio_path)

        # Multilingual Medical Term Extraction
        medical_terms = extract_medical_terms_multilingual(transcription, language)

        # Multilingual Fact-Checking Across All Medical Fields
        flagged_terms = fact_check_medical_content_all(medical_terms, language)

        # YOLO and CNN Hybrid Detection for Medical Imagery
        visual_detections = analyze_video_frames_yolo_cnn(video_path)

        # Flag Video
        flag_video(medical_terms, flagged_terms, visual_detections)

if __name__ == "__main__":
    main()


Enter the YouTube video URL: https://youtu.be/otbjLNdx7T4?si=UZinRp5fvaWNJsLV
[youtube] Extracting URL: https://youtu.be/otbjLNdx7T4?si=UZinRp5fvaWNJsLV
[youtube] otbjLNdx7T4: Downloading webpage
[youtube] otbjLNdx7T4: Downloading ios player API JSON
[youtube] otbjLNdx7T4: Downloading mweb player API JSON
[youtube] otbjLNdx7T4: Downloading m3u8 information
[info] otbjLNdx7T4: Downloading 1 format(s): 313+251
[download] /content/downloaded_videos/downloaded_video.mp4 has already been downloaded
Downloaded video to: /content/downloaded_videos/downloaded_video.mp4
MoviePy - Writing audio in /content/downloaded_videos/downloaded_video.wav




MoviePy - Done.
Extracted audio to: /content/downloaded_videos/downloaded_video.wav



Some weights of XLMRobertaForTokenClassification were not initialized from the model checkpoint at xlm-roberta-large and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


FileNotFoundError: [Errno 2] No such file or directory: 'yolov8m_finetuned.pt'

In [None]:
import yt_dlp
import os

# Function to download YouTube video
def download_youtube_video(url, output_dir='/content/downloaded_videos'):
    os.makedirs(output_dir, exist_ok=True)
    video_path = os.path.join(output_dir, 'downloaded_video.mp4')

    ydl_opts = {
        'format': 'bestvideo+bestaudio/best',
        'outtmpl': video_path,
        'merge_output_format': 'mp4',  # Ensure the video is in .mp4 format
    }

    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
            if os.path.exists(video_path):
                print(f"Downloaded video to: {video_path}")
                return video_path
            else:
                print("Download failed: File not found.")
                return None
    except Exception as e:
        print("Download failed:", e)
        return None

# Example Usage
url = "https://www.youtube.com/watch?v=otbjLNdx7T4"  # Replace with your URL
video_path = download_youtube_video(url)

if video_path:
    print("Video downloaded at:", video_path)
else:
    print("Failed to download video.")


In [None]:
from moviepy.editor import VideoFileClip

def extract_audio_from_video(video_path):
    if not os.path.exists(video_path):
        print(f"Video file not found: {video_path}")
        return None

    audio_path = video_path.replace(".mp4", ".wav")
    try:
        video = VideoFileClip(video_path)
        video.audio.write_audiofile(audio_path)
        print(f"Extracted audio to: {audio_path}")
        return audio_path
    except Exception as e:
        print("Audio extraction failed:", e)
        return None

# Example usage
if video_path:
    audio_path = extract_audio_from_video(video_path)
    if audio_path:
        print("Audio extracted:", audio_path)
    else:
        print("Audio extraction failed.")


In [None]:
import whisper

def transcribe_audio(audio_path):
    model = whisper.load_model("medium")
    result = model.transcribe(audio_path)
    return result["text"]

# Example usage
if audio_path:
    transcription = transcribe_audio(audio_path)
    print("Transcription:\n", transcription)
else:
    print("Audio file not found.")


In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
import torch
import numpy as np

def extract_medical_terms_with_biobert(text):
    model_name = "dmis-lab/biobert-base-cased-v1.1"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name)

    tokens = tokenizer(text, return_tensors="pt")
    output = model(**tokens)
    predictions = np.argmax(output.logits.detach().numpy(), axis=2)

    medical_terms = []
    for i, token in enumerate(tokens["input_ids"][0]):
        label = predictions[0][i]
        if label > 0:  # B- and I- tags for medical terms
            term = tokenizer.decode([token])
            medical_terms.append(term.strip())

    return list(set(medical_terms))

# Example usage
if transcription:
    medical_terms = extract_medical_terms_with_biobert(transcription)
    print("Extracted Medical Terms:", medical_terms)
else:
    print("No transcription found.")


In [None]:
from ultralytics import YOLO
import cv2

# Load YOLO model (make sure to upload or download the fine-tuned model first)
model = YOLO('yolov8m.pt')  # Change to your custom fine-tuned model if available

def analyze_video_frames_yolo(video_path):
    cap = cv2.VideoCapture(video_path)
    detected_items = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Perform object detection using YOLO
        results = model(frame)

        # Process results and extract detected labels
        detected_labels = [model.names[int(cls)] for cls in results[0].boxes.cls]
        detected_items.extend(detected_labels)

    cap.release()
    return list(set(detected_items))

# Example usage
if video_path:
    detected_items = analyze_video_frames_yolo(video_path)
    print("Detected Items:", detected_items)
else:
    print("Video not found.")


In [None]:
def flag_video(medical_terms, flagged_terms, visual_detections):
    print("Extracted Medical Terms:", medical_terms)
    print("Flagged Terms:", flagged_terms)
    print("Detected Visuals:", visual_detections)

    if flagged_terms or not visual_detections:
        print("Video contains potential medical misinformation.")
    else:
        print("No misinformation detected.")
    return flagged_terms

# Example usage
if medical_terms and detected_items:
    flagged_terms = []  # This should be populated based on your fact-checking logic
    flag_video(medical_terms, flagged_terms, detected_items)


**BIOBERT MODEL**

In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
import torch
import numpy as np

def extract_medical_terms_with_biobert(text):
    model_name = "dmis-lab/biobert-base-cased-v1.1"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name)

    tokens = tokenizer(text, return_tensors="pt")
    output = model(**tokens)
    predictions = np.argmax(output.logits.detach().numpy(), axis=2)

    medical_terms = []
    for i, token in enumerate(tokens["input_ids"][0]):
        label = predictions[0][i]
        if label > 0:  # B- and I- tags for medical terms
            term = tokenizer.decode([token])
            medical_terms.append(term.strip())

    return list(set(medical_terms))

