In [4]:
import os
import pickle
import google_auth_oauthlib.flow
import googleapiclient.discovery
import googleapiclient.errors
from google.auth.transport.requests import Request
from datetime import datetime, timedelta
from yt_dlp import YoutubeDL  # YouTubeDL module
import isodate  # To handle durations in ISO 8601 format
import warnings

warnings.filterwarnings("ignore")
# Hide warnings

# Google API credentials and YouTube Data API service configuration
scopes = ["https://www.googleapis.com/auth/youtube.readonly"]

def get_authenticated_service():
    credentials = None
    if os.path.exists("token.pickle"):
        with open("token.pickle", "rb") as token:
            credentials = pickle.load(token)

    if not credentials or not credentials valid:
        if credentials and credentials.expired and credentials.refresh_token:
            try:
                credentials.refresh(Request())
            except google.auth.exceptions.RefreshError:
                print("Token refresh error. Starting a new authentication process.")
                os.remove("token.pickle")  # Delete old token if there's an error
                flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(
                    "client_secret.json", scopes)
                credentials = flow.run_local_server(port=0)
        else:
            flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(
                "client_secret.json", scopes)
            credentials = flow.run_local_server(port=0)

        with open("token.pickle", "wb") as token:
            pickle.dump(credentials, token)

    return googleapiclient.discovery.build("youtube", "v3", credentials=credentials)

def get_video_details(youtube, video_ids):
    # Retrieve video details, including language and duration
    request = youtube.videos().list(
        part="snippet,contentDetails",  # contentDetails will get us the duration
        id=",".join(video_ids)
    )
    response = request.execute()
    return response['items']

def is_video_short_enough(video):
    # YouTube returns the duration in ISO 8601 format (e.g., PT30S)
    duration = video['contentDetails']['duration']
    
    # Convert duration to seconds
    duration_seconds = parse_duration_to_seconds(duration)
    
    # Filter out videos longer than 30 seconds
    return duration_seconds <= 30

def parse_duration_to_seconds(duration):
    # Function to convert ISO 8601 duration from YouTube (e.g., PT1M30S) into seconds
    return isodate.parse_duration(duration).total_seconds()

def is_video_in_english(video):
    # Language information is in the snippet section (defaultAudioLanguage or defaultLanguage)
    language = video['snippet'].get('defaultAudioLanguage', video['snippet'].get('defaultLanguage', ''))
    return language == 'en'

def get_trending_entertainment_shorts_videos_in_usa(youtube, max_results=10):
    # Calculate the date one day ago
    one_day_ago = datetime.utcnow() - timedelta(days=1)
    published_after = one_day_ago.isoformat("T") + "Z"
    
    # Make a search request to get the short video IDs
    request = youtube.search().list(
        part="snippet",
        type="video",
        order="viewCount",  # Get the most-watched videos
        videoDuration="short",  # Only get Shorts videos
        publishedAfter=published_after,
        regionCode="US",  # Content from the USA
        relevanceLanguage="en",  # Content in English
        videoCategoryId="10",  # Category ID for Entertainment
        maxResults=max_results * 2  # Get more results for filtering
    )
    response = request.execute()

    # List the video IDs
    video_ids = [item['id']['videoId'] for item in response['items']]
    
    # Retrieve video details including duration and language
    video_details = get_video_details(youtube, video_ids)
    
    # Filter videos by duration and language (English)
    short_english_videos = [video for video in video_details if is_video_short_enough(video) and is_video_in_english(video)]

    return short_english_videos[:max_results]

def download_video(video_id, output_filename="video1.mp4"):
    # If a file with the same name exists, delete it
    if os.path.exists(output_filename):
        os.remove(output_filename)
        print(f"Previous {output_filename} file deleted.")

    video_url = f"https://www.youtube.com/watch?v={video_id}"
    
    ydl_opts = {
        'format': 'bestvideo+bestaudio/best',  # Select the best video and audio quality
        'outtmpl': output_filename,  # Set the output file name and extension
    }
    
    with YoutubeDL(ydl_opts) as ydl:
        ydl.download([video_url])
    print(f"Video downloaded and saved as {output_filename}!")

if __name__ == "__main__":
    youtube = get_authenticated_service()
    
    # Get the most-watched English short videos from the USA
    trending_videos = get_trending_entertainment_shorts_videos_in_usa(youtube, max_results=10)

    # Print video details and download the videos
    for idx, video in enumerate(trending_videos):
        video_id = video['id']
        title = video['snippet']['title']
        duration = video['contentDetails']['duration']
        video_url = f"https://www.youtube.com/watch?v={video_id}"
        print(f"{idx + 1}. Video Title: {title}")
        print(f"Duration: {duration}")
        print(f"Video Link: {video_url}\n")
        
        # Download the video
        download_video(video_id, output_filename="video1.mp4")


1. Video Title: we are so excited to be performing for our EYEKONS again 💗 #mallofamerica #KATSEYE
Duration: PT6S
Video Link: https://www.youtube.com/watch?v=rYDI0Vgh_3Q

[youtube] Extracting URL: https://www.youtube.com/watch?v=rYDI0Vgh_3Q
[youtube] rYDI0Vgh_3Q: Downloading webpage
[youtube] rYDI0Vgh_3Q: Downloading ios player API JSON
[youtube] rYDI0Vgh_3Q: Downloading player a62d836d


         n = 0RVwsZBilV7AMara- ; player = https://www.youtube.com/s/player/a62d836d/player_ias.vflset/en_US/base.js
         n = xzUHaZam5AtGt4GEb ; player = https://www.youtube.com/s/player/a62d836d/player_ias.vflset/en_US/base.js


[youtube] rYDI0Vgh_3Q: Downloading m3u8 information
[info] Testing format 616
[info] rYDI0Vgh_3Q: Downloading 1 format(s): 616+140
[hlsnative] Downloading m3u8 manifest
[hlsnative] Total fragments: 1
[download] Destination: video1.f616.mp4
[download] 100% of    2.95MiB in 00:00:01 at 2.47MiB/s                 
[download] Destination: video1.f140.m4a
[download] 100% of   92.59KiB in 00:00:00 at 553.21KiB/s 
[Merger] Merging formats into "video1.mp4"
Deleting original file video1.f140.m4a (pass -k to keep)
Deleting original file video1.f616.mp4 (pass -k to keep)
Video başarıyla indirildi ve video1.mp4 olarak kaydedildi!


In [4]:
import re

def get_video_hashtags(description):
    # Using a simple regex to identify hashtags
    hashtags = re.findall(r"#\w+", description)
    return hashtags

def get_video_details_with_hashtags(youtube, video_ids):
    # Retrieve video details including description and other information
    request = youtube.videos().list(
        part="snippet",
        id=",".join(video_ids)
    )
    response = request.execute()
    return response['items']

if __name__ == "__main__":
    youtube = get_authenticated_service()
    
    # Example: using a video ID here
    video_ids = ["ZNt_GoOBHq8"]  # Enter a video ID here
    video_details = get_video_details_with_hashtags(youtube, video_ids)
    
    for video in video_details:
        title = video['snippet']['title']
        description = video['snippet']['description']
        hashtags = get_video_hashtags(description)
        
        print(f"Video Title: {title}")
        print(f"Hashtags: {hashtags}")


Video Title: Human vs Jet Engine
Hashtags: []
