<a href="https://colab.research.google.com/github/BandaAkshith/YouTube-Data-Scraper/blob/main/youtube_data_scraping.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [17]:
!pip install youtube-transcript-api
!pip install pytz



In [18]:
import os
from getpass import getpass
os.environ["YOUTUBE_DATA_API_v3"] = getpass()

··········


In [20]:
import os
import pandas as pd
import datetime
import logging
import pytz
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Constants
YOUTUBE_API = os.environ.get("YOUTUBE_DATA_API_v3", "YOUR_API_KEY")
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"

# Initialize YouTube API client
youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=YOUTUBE_API)


def fetch_video_ids(genre, max_results=10):
    """
    Fetches video IDs for a given genre using the YouTube Data API.
    """
    video_ids = []
    next_page_token = None

    while len(video_ids) < max_results:
        try:
            search_response = youtube.search().list(
                q=genre,
                type="video",
                part="id,snippet",
                maxResults=min(50, max_results - len(video_ids)),
                pageToken=next_page_token,
                order="relevance"
            ).execute()

            for item in search_response['items']:
                video_ids.append(item['id']['videoId'])

            next_page_token = search_response.get('nextPageToken')
            if not next_page_token:
                break
        except HttpError as e:
            if e.resp.status == 403:
                logging.error("Quota exceeded! Try again later or reduce the max_results value.")
                break
            else:
                logging.error(f"An error occurred: {e}")
                break

    return video_ids


def fetch_video_details(video_id):
    """
    Fetches details for a single video using the YouTube Data API.
    """
    try:
        video_response = youtube.videos().list(
            part="snippet,statistics,contentDetails,topicDetails,recordingDetails",
            id=video_id
        ).execute()

        if not video_response['items']:
            return None

        video_data = video_response['items'][0]
        snippet = video_data.get('snippet', {})
        statistics = video_data.get('statistics', {})
        content_details = video_data.get('contentDetails', {})
        topic_details = video_data.get('topicDetails', {})
        recording_details = video_data.get('recordingDetails', {})

        captions_available, captions_text = check_captions(video_id)

        return {
            "Video URL": f"https://www.youtube.com/watch?v={video_id}",
            "Title": snippet.get("title", ""),
            "Description": snippet.get("description", ""),
            "Channel Title": snippet.get("channelTitle", ""),
            "Keyword Tags": ",".join(snippet.get("tags", [])) if snippet.get("tags") else "",
            "YouTube Video Category": snippet.get("categoryId", ""),
            "Topic Details": ",".join(topic_details.get("topicCategories", [])) if topic_details else "",
            "Video Published at": snippet.get("publishedAt", ""),
            "Video Duration": content_details.get("duration", ""),
            "View Count": statistics.get("viewCount", 0),
            "Comment Count": statistics.get("commentCount", 0),
            "Captions Available": captions_available,
            "Caption Text": captions_text,
            "Location of Recording": recording_details.get("locationDescription", "")
        }
    except HttpError as e:
        logging.error(f"An error occurred while fetching video details: {e}")
        return None


def check_captions(video_id):
    """
    Checks if captions are available and fetches them if possible.
    """
    try:
        captions = YouTubeTranscriptApi.get_transcript(video_id)
        captions_text = " ".join([item['text'] for item in captions])
        return True, captions_text
    except (TranscriptsDisabled, NoTranscriptFound):
        return False, ""
    except Exception as e:
        logging.warning(f"An error occurred while fetching captions for {video_id}: {e}")
        return False, ""


def main():
    # Input genre and number of videos dynamically
    genre = input("Enter the genre: ")
    max_results = int(input("Enter the number of videos to fetch: "))

    # Fetch video IDs
    video_ids = fetch_video_ids(genre, max_results=max_results)
    logging.info(f"Fetched {len(video_ids)} video IDs for genre: {genre}")

    # Collect details for all videos
    video_data = []
    for idx, video_id in enumerate(video_ids):
        try:
            logging.info(f"Processing video {idx + 1}/{len(video_ids)}: {video_id}")
            details = fetch_video_details(video_id)
            if details:
                video_data.append(details)
        except Exception as e:
            logging.error(f"Failed to process video {video_id}: {e}")

    # Save data to CSV
    ist = pytz.timezone('Asia/Kolkata')  # Set IST timezone
    current_time = datetime.datetime.now(ist)  # Get current time in IST
    output_file = f"{genre.replace(' ', '_')}_videos_{current_time.strftime('%y-%m-%d_%H:%M:%S')}.csv"
    df = pd.DataFrame(video_data)
    df.to_csv(output_file, index=False, encoding='utf-8-sig')

    logging.info(f"Data saved to {output_file}")


if __name__ == "__main__":
    main()


Enter the genre: music
Enter the number of videos to fetch: 500
