In [None]:
pip install isodate

In [None]:
from googleapiclient.discovery import build
import isodate
import pandas as pd

API_KEY = "your Youtube API KEY"
CHANNEL_ID = "your Channel id"  # Example: Google Developers

# Build YouTube API service
youtube = build("youtube", "v3", developerKey=API_KEY)

# Get channel details
channel_response = youtube.channels().list(
    part="snippet,contentDetails,statistics",
    id=CHANNEL_ID
).execute()

channel_data = channel_response["items"][0]
channel_title = channel_data["snippet"]["title"]
channel_description = channel_data["snippet"]["description"]
channel_country = channel_data["snippet"].get("country", "N/A")
channel_thumbnail = channel_data["snippet"]["thumbnails"]["high"]["url"]
channel_subscribers = channel_data["statistics"].get("subscriberCount", "0")
channel_video_count = channel_data["statistics"].get("videoCount", "0")

# Get Uploads playlist ID
uploads_id = channel_data["contentDetails"]["relatedPlaylists"]["uploads"]

# Get videos from uploads playlist
videos = youtube.playlistItems().list(
    part="snippet,contentDetails",
    playlistId=uploads_id,
    maxResults=50  # fetch latest 50 videos
).execute()

video_list = []
print("🎥 Latest Videos:\n")

for item in videos["items"]:
    vid_id = item["contentDetails"]["videoId"]

    # Get video details
    video = youtube.videos().list(
        part="snippet,contentDetails,statistics,status",
        id=vid_id
    ).execute()

    if not video["items"]:
        continue

    data = video["items"][0]

    title = data["snippet"]["title"]
    desc = data["snippet"]["description"][:100] + "..."
    published_at = data["snippet"]["publishedAt"]
    category_id = data["snippet"]["categoryId"]
    default_language = data["snippet"].get("defaultLanguage", "N/A")
    duration = isodate.parse_duration(data["contentDetails"]["duration"]).total_seconds()
    thumbnails = data["snippet"]["thumbnails"]["high"]["url"]
    views = data["statistics"].get("viewCount", "0")
    likes = data["statistics"].get("likeCount", "0")
    comments = data["statistics"].get("commentCount", "0")
    privacy_status = data.get("status", {}).get("privacyStatus", "N/A")

    print(f"Video ID: {vid_id}")
    print(f"Title: {title}")
    print(f"Published At: {published_at}")
    print(f"Views: {views}")
    print(f"Likes: {likes}")
    print(f"Comments: {comments}\n")

    video_list.append({
        "Video ID": vid_id,
        "Title": title,
        "Description": desc,
        "Published At": published_at,
        "Category ID": category_id,
        "Default Language": default_language,
        "Thumbnail (High)": thumbnails,
        "Duration (seconds)": duration,
        "View Count": views,
        "Like Count": likes,
        "Comment Count": comments,
        "Privacy Status": privacy_status,
        "Channel ID": CHANNEL_ID,
        "Channel Title": channel_title,
        "Channel Description": channel_description,
        "Channel Country": channel_country,
        "Channel Thumbnail": channel_thumbnail,
        "Channel Subscribers": channel_subscribers,
        "Channel Video Count": channel_video_count
    })

# Save to CSV
df = pd.DataFrame(video_list)
df.to_csv("youtube_videos_with_channel.csv", index=False)

print("\n✅ Data saved to youtube_videos_with_channel.csv successfully!")


In [None]:
import requests
import csv
import concurrent.futures

API_KEY = "your search api key"
INPUT_CSV = " your file path"
OUTPUT_CSV = "output_transcripts.csv"

def fetch_transcript(video_id):
    """Fetch transcript for one video"""
    url = "https://www.searchapi.io/api/v1/search"
    headers = {"Authorization": f"Bearer {API_KEY}"}
    params = {
        "engine": "youtube_transcripts",
        "video_id": video_id,
        "lang": "en",
        "transcript_type": "auto"
    }

    try:
        response = requests.get(url, headers=headers, params=params)
        if response.status_code != 200:
            return video_id, f"❌ Error {response.status_code}"
        data = response.json()

        if "transcripts" not in data or not data["transcripts"]:
            return video_id, "⚠ No transcript available"

        transcript_text = " ".join(seg["text"] for seg in data["transcripts"])
        return video_id, transcript_text.strip()

    except Exception as e:
        return video_id, f"🚨 Error: {e}"

def process_videos():
    results = []
    with open(INPUT_CSV, "r", encoding="utf-8") as infile:
        reader = csv.DictReader(infile)
        video_ids = [row["id"].strip() for row in reader]

    print(f"🚀 Fetching transcripts for {len(video_ids)} videos in parallel...")

    # Use up to 10 threads for speed
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(fetch_transcript, vid): vid for vid in video_ids}
        for future in concurrent.futures.as_completed(futures):
            video_id, transcript = future.result()
            results.append({"video_id": video_id, "transcript": transcript})
            print(f"✅ {video_id} processed")

    with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as outfile:
        writer = csv.DictWriter(outfile, fieldnames=["video_id", "transcript"])
        writer.writeheader()
        writer.writerows(results)

    print("\n🎉 All transcripts saved successfully to:", OUTPUT_CSV)

if _name_ == "_main_":
    process_videos()

In [None]:
# 🚀 Final FIXED VERSION — Ensures transcript is always in one line, fully CSV-safe

import pandas as pd
import re
import csv
from google.colab import files

# --- Step 1: File paths ---
META_CSV = "/content/youtube_videos_with_channel.csv"         # Metadata file
TRANSCRIPT_CSV = "/content/latest_video_transcripts.csv"  # Transcript file
FINAL_CSV = "Final_YouTube_Dataset.csv"    # Output file

# --- Step 2: Load both CSVs ---
print("📂 Loading CSV files...")
meta_df = pd.read_csv(META_CSV)
trans_df = pd.read_csv(TRANSCRIPT_CSV)

# --- Step 3: Clean transcript text thoroughly ---
def clean_text(text):
    if pd.isna(text):
        return ""
    text = str(text)
    # Remove all kinds of line breaks, tabs, and excessive spaces
    text = re.sub(r'[\r\n\t\u2028\u2029]+', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

trans_df["text"] = trans_df["text"].apply(clean_text)

# --- Step 4: Merge metadata with transcripts ---
print("🔄 Merging metadata with transcripts...")
final_df = pd.merge(meta_df, trans_df, left_on="Video ID", right_on="video_id", how="left")

# --- Step 5: Drop duplicate column ---
final_df.drop(columns=["video_id"], inplace=True, errors="ignore")

# --- Step 6: Add 'is_transcript_available' column ---
final_df["is_transcript_available"] = final_df["text"].apply(
    lambda x: bool(isinstance(x, str) and x and "No transcript available" not in x)
)

# --- Step 7: Reorder columns (availability before transcript) ---
cols = list(final_df.columns)
if "is_transcript_available" in cols and "text" in cols:
    cols.remove("is_transcript_available")
    cols.insert(cols.index("text"), "is_transcript_available")
    final_df = final_df[cols]

# --- Step 8: Save final CSV safely with all fields quoted ---
final_df.to_csv(FINAL_CSV, index=False, encoding="utf-8", quoting=csv.QUOTE_ALL)

# --- Step 9: Show preview and download file ---
print("\n✅ Final dataset created successfully!")
print(f"📁 Saved as: {FINAL_CSV}\n")
print("🔍 Preview of cleaned dataset:")
print(final_df.head())

files.download(FINAL_CSV)