In [1]:
from googleapiclient.errors import HttpError

In [2]:
pip install google-auth google-auth-oauthlib google-auth-httplib2



In [4]:
!pip install googletrans==3.1.0a0

Collecting googletrans==3.1.0a0
  Downloading googletrans-3.1.0a0.tar.gz (19 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting httpx==0.13.3 (from googletrans==3.1.0a0)
  Downloading httpx-0.13.3-py3-none-any.whl.metadata (25 kB)
Collecting hstspreload (from httpx==0.13.3->googletrans==3.1.0a0)
  Downloading hstspreload-2025.1.1-py3-none-any.whl.metadata (2.1 kB)
Collecting chardet==3.* (from httpx==0.13.3->googletrans==3.1.0a0)
  Downloading chardet-3.0.4-py2.py3-none-any.whl.metadata (3.2 kB)
Collecting idna==2.* (from httpx==0.13.3->googletrans==3.1.0a0)
  Downloading idna-2.10-py2.py3-none-any.whl.metadata (9.1 kB)
Collecting rfc3986<2,>=1.3 (from httpx==0.13.3->googletrans==3.1.0a0)
  Downloading rfc3986-1.5.0-py2.py3-none-any.whl.metadata (6.5 kB)
Collecting httpcore==0.9.* (from httpx==0.13.3->googletrans==3.1.0a0)
  Downloading httpcore-0.9.1-py3-none-any.whl.metadata (4.6 kB)
Collecting h11<0.10,>=0.8 (from httpcore==0.9.*->httpx==0.13.3->googletrans==3.1.0a0

In [5]:
import time
import logging
import os
import pandas as pd
from datetime import datetime, timedelta
from googleapiclient.discovery import build
from textblob import TextBlob
from googletrans import Translator  # Add this import
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import os.path


In [6]:
# Set up logging
logging.basicConfig(filename='script.log', level=logging.INFO, format='%(asctime)s - %(message)s')

In [7]:
def get_trending_videos(api_key, max_results=200):
    youtube = build('youtube', 'v3', developerKey=api_key)
    videos = []
    request = youtube.videos().list(
        part="snippet,contentDetails,statistics",
        chart="mostPopular",
        regionCode="IN",
        maxResults=50
    )
    while request and len(videos) < max_results:
        response = request.execute()
        for item in response['items']:
            video_details = {
                'video_id': item['id'],
                'title': item['snippet']['title'],
                'description': item['snippet']['description'],
                'published_at': item['snippet']['publishedAt'],
                'channel_id': item['snippet']['channelId'],
                'channel_title': item['snippet']['channelTitle'],
                'category_id': item['snippet'].get('categoryId', None),
                'tags': item['snippet'].get('tags', []),
                'duration': item['contentDetails']['duration'],
                'definition': item['contentDetails']['definition'],
                'caption': item['contentDetails'].get('caption', 'false'),
                'view_count': int(item['statistics'].get('viewCount', 0)),
                'like_count': int(item['statistics'].get('likeCount', 0)),
                'dislike_count': int(item['statistics'].get('dislikeCount', 0)),
                'favorite_count': int(item['statistics'].get('favoriteCount', 0)),
                'comment_count': int(item['statistics'].get('commentCount', 0))
            }
            videos.append(video_details)
        request = youtube.videos().list_next(request, response)
    return videos[:max_results]

def get_video_categories(api_key, region_code="IN"):
    youtube = build('youtube', 'v3', developerKey=api_key)
    request = youtube.videoCategories().list(
        part="snippet",
        regionCode=region_code
    )
    response = request.execute()
    categories = {
        item['id']: item['snippet']['title']
        for item in response['items']
    }
    return categories

def get_comments(api_key, video_id, max_comments=5):
    youtube = build('youtube', 'v3', developerKey=api_key)
    comments = []
    request = youtube.commentThreads().list(
        part="snippet",
        videoId=video_id,
        maxResults=max_comments
    )
    response = request.execute()
    for item in response.get('items', []):
        comments.append(item['snippet']['topLevelComment']['snippet']['textDisplay'])
    return comments

In [8]:
def get_comments(api_key, video_id, max_comments=5):
    youtube = build('youtube', 'v3', developerKey=api_key)
    comments = []
    try:
        request = youtube.commentThreads().list(
            part="snippet",
            videoId=video_id,
            maxResults=max_comments
        )
        response = request.execute()
        for item in response.get('items', []):
            comments.append(item['snippet']['topLevelComment']['snippet']['textDisplay'])
    except Exception as e:
        # Handle the "commentsDisabled" error
        if "commentsDisabled" in str(e):
            logging.warning(f"Comments are disabled for video {video_id}. Skipping.")
        else:
            logging.error(f"Error fetching comments for video {video_id}: {e}")
    return comments

def translate_comments(comments):
    translated_comments = []
    for comment in comments:
        try:
            translated = translator.translate(comment, dest='en')
            translated_comments.append(translated.text)
        except Exception as e:
            logging.error(f"Error translating comment: {comment}. Error: {e}")
            translated_comments.append(comment)  # Keep the original comment if translation fails
    return translated_comments



In [9]:
def get_trending_videos_with_details(api_key, max_results=200):
    """Fetch trending videos with details, including captions."""
    videos = get_trending_videos(api_key, max_results=max_results)
    categories = get_video_categories(api_key)
    credentials = authenticate_oauth()  # Authenticate using OAuth 2.0
    for video in videos:
        video['category_name'] = categories.get(video['category_id'], "Unknown")
        video['comments'] = get_comments(api_key, video['video_id'], max_comments=5)
        if video['comments']:
            video['sentiments'] = analyze_sentiment(video['comments'])
            video['average_sentiment'] = sum(video['sentiments']) / len(video['sentiments'])
            video['sentiment_label'] = sentiment_label(video['average_sentiment'])
        else:
            video['sentiments'] = []
            video['average_sentiment'] = None
            video['sentiment_label'] = "Unknown"
        # Fetch captions only if caption is true
        if video['caption'] == 'true':
            video['captions'] = get_captions(video['video_id'], credentials)
        else:
            video['captions'] = []
    return videos

In [10]:
def analyze_sentiment(comments):
    sentiments = []
    for comment in comments:
        analysis = TextBlob(comment)
        sentiments.append(analysis.polarity)
    return sentiments

def sentiment_label(polarity):
    if polarity > 0.2:
        return "Good"
    elif polarity < -0.2:
        return "Bad"
    else:
        return "Neutral"

In [11]:
def get_trending_videos_with_details(api_key, max_results=200):
    videos = get_trending_videos(api_key, max_results=max_results)
    categories = get_video_categories(api_key)
    for video in videos:
        video['category_name'] = categories.get(video['category_id'], "Unknown")
        video['comments'] = get_comments(api_key, video['video_id'], max_comments=5)
        if video['comments']:
            video['sentiments'] = analyze_sentiment(video['comments'])
            video['average_sentiment'] = sum(video['sentiments']) / len(video['sentiments'])
            video['sentiment_label'] = sentiment_label(video['average_sentiment'])
        else:
            video['sentiments'] = []
            video['average_sentiment'] = None
            video['sentiment_label'] = "Unknown"
    return videos

def save_data_to_file(data, output_dir="C:\\Users\\9999\\cmtyoutube"):
    # Create the output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Generate the filename with the current date
    current_date = datetime.now().strftime("%Y-%m-%d")
    filename = os.path.join(output_dir, f"youtube_trending_videos_{current_date}.csv")

    # Convert data to DataFrame
    df = pd.DataFrame(data)

    # Add a timestamp for when the data was fetched
    df['fetch_date'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # Save the DataFrame to the file
    df.to_csv(filename, index=False)
    return filename

In [13]:
def wait_until(target_time):
    """
    Pause the script until the target time is reached.
    """
    now = datetime.now()
    if now > target_time:
        # If the target time has already passed today, wait until the same time tomorrow
        target_time += timedelta(days=1)

    # Calculate the time difference in seconds
    time_to_wait = (target_time - now).total_seconds()
    logging.info(f"Waiting for {time_to_wait:.2f} seconds until {target_time}...")
    time.sleep(time_to_wait)

def main():
    logging.info("Script started")

    # Set the target time (11:00 AM)
    target_time = datetime.now().replace(hour=2, minute=33, second=0, microsecond=0)
    logging.info(f"Target time set to: {target_time}")

    # Wait until the target time
    wait_until(target_time)
    logging.info("Target time reached. Fetching data...")

    # Fetch and save trending videos
    try:
        api_key = 'AIzaSyCs7DMEx_bOkZEwKMOdClxC67GXSudGDBQ'
        logging.info("Fetching trending videos...")
        trending_videos = get_trending_videos_with_details(api_key, max_results=50)
        logging.info(f"Fetched {len(trending_videos)} videos.")

        logging.info("Saving data to file...")
        filename = save_data_to_file(trending_videos)
        logging.info(f"Data fetched and saved successfully to {filename}.")
    except Exception as e:
        logging.error(f"An error occurred: {e}")
        import traceback
        logging.error(traceback.format_exc())

In [None]:
if __name__ == "__main__":
    main()