In [6]:
!pip install boto3
!pip install yt_dlp
!pip install --upgrade google-api-python-client
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1


In [54]:
import boto3
import hashlib
import json
import os
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from decimal import Decimal
import yt_dlp
from datetime import datetime
from dotenv import load_dotenv


def current_timestamp():
    """Returns the current timestamp formatted for readability."""
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def get_env_variables():
    """Fetch all necessary configurations from environment variables."""
    return {
        'DEVELOPER_KEY': os.getenv('DEVELOPER_KEY'),
        'AWS_ACCESS_KEY_ID': os.getenv('AWS_ACCESS_KEY_ID'),
        'AWS_SECRET_ACCESS_KEY': os.getenv('AWS_SECRET_ACCESS_KEY'),
        'SEARCH_CACHE_TABLE': os.getenv('SEARCH_CACHE_TABLE'),
        'RESULTS_TABLE_NAME': os.getenv('RESULTS_TABLE_NAME'),
        'SEARCH_QUERY': os.getenv('SEARCH_QUERY'),
        'MAX_RESULTS': int(os.getenv('MAX_RESULTS', 3)),
        'ORDER': os.getenv('ORDER', 'viewCount'),
        'VIDEO_DURATION': os.getenv('VIDEO_DURATION', 'medium'),
        'PUBLISHED_AFTER': os.getenv('PUBLISHED_AFTER', '2010-01-01T00:00:00Z'),
        'PUBLISHED_BEFORE': os.getenv('PUBLISHED_BEFORE', '2024-12-31T23:59:59Z'),
        'RELEVANCE_LANGUAGE': os.getenv('RELEVANCE_LANGUAGE', 'en'),
        # 'VIDEO_CATEGORY_ID': os.getenv('VIDEO_CATEGORY_ID', '10'),
        'AWS_REGION': os.getenv('AWS_REGION')  # Add AWS region to the environment variables
    }

def open_aws_dynamodb_session(options):
    """
    Configures a boto3 session using AWS credentials read from environment variable.
    Returns a boto3 DynamoDB resource configured with these credentials.
    """
    try:

        # Configure the boto3 session with the read credentials
        session = boto3.Session(
            aws_access_key_id=options['AWS_ACCESS_KEY_ID'],
            aws_secret_access_key=options['AWS_SECRET_ACCESS_KEY'],
            region_name=options['AWS_REGION']  # Specify your AWS region
        )
        dynamodb_resource = session.resource('dynamodb')
        print(f"Successfully open_aws_dynamodb_session with environment variables. at {current_timestamp()}\n")
        # Return the configured DynamoDB resource
        return dynamodb_resource

    except Exception as e:
        print(f"Error open_aws_dynamodb_session from environment variables: {e} at {current_timestamp()}\n")
        return None


def check_cache(options, dynamodb):
    """Check if search options are in the cache on AWS DynamoDB."""
    print(f"Checking cache at {current_timestamp()}\n")
    non_search_terms = ['DEVELOPER_KEY', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'SEARCH_CACHE_TABLE', 'RESULTS_TABLE_NAME', 'AWS_REGION', 'MAX_RESULTS']
    search_query_options = {k: v for k, v in options.items() if k not in non_search_terms}

    # Ensure consistent ordering by sorting keys
    cache_key = json.dumps(search_query_options, sort_keys=True)

    try:
        cache_table = dynamodb.Table(options['SEARCH_CACHE_TABLE'])
        response = cache_table.get_item(
            Key={
                'CacheKey': cache_key
            }
        )
        if 'Item' in response:
            print("Cache hit.")
            return json.loads(response['Item']['Results'])
    except Exception as e:
        print(f"Error accessing DynamoDB due to 'check_cache' malfunctioniong: {e} at {current_timestamp()}\n")
    print("Cache miss.\n")
    return None

def update_cache(options, dynamodb, results):
    """Update the cache with new search results, including the search time."""
    print(f"Updating cache at {current_timestamp()}\n")
    non_search_terms = ['DEVELOPER_KEY', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'SEARCH_CACHE_TABLE', 'RESULTS_TABLE_NAME', 'AWS_REGION', 'MAX_RESULTS']
    search_query_options = {k: v for k, v in options.items() if k not in non_search_terms}
    # Serialize search_query_options to a string to use as a cache key
    # Ensure consistent ordering by sorting keys
    cache_key = json.dumps(search_query_options, sort_keys=True)

    # Assuming results is a dict or list; serialize it as well
    serialized_results = json.dumps(results)

    timestamp = current_timestamp()  # Capture the search timestamp
    
    try:
        search_cache_table = options['SEARCH_CACHE_TABLE']
        cache_table = dynamodb.Table(search_cache_table)
        cache_table.put_item(
            Item={
                'CacheKey': cache_key,
                'Results': serialized_results,
                'Timestamp': timestamp  # Store the search time
            }
        )
        print("Cache updated successfully.\n")
    except Exception as e:
        print(f"Error updating DynamoDB due to 'update_cache' malfunctioniong: {e} at {current_timestamp()}\n")

def youtube_search_all_videos(options, dynamodb):
    """Perform a search on YouTube Data APIand return all videos based on the options, with caching and paging."""
    print(f"Performing YouTube search at {current_timestamp()}\n")
    cached_results = check_cache(options, dynamodb)
    if cached_results is not None:
        print(f"Function 'youtube_search_all_videos' was run at {current_timestamp()}\n")
        return cached_results

    developer_key = options['DEVELOPER_KEY']
    # CREATE YOUTUBE OBJECT calling the youtbue data api 
    youtube = build('youtube', 'v3', developerKey=developer_key)
    
    all_videos = []
    page_token = None
    max_iterations = 1  # Adjust based on how many pages you want to retrieve (50 results max with 25 per page)

    try:
        for _ in range(max_iterations):
            search_response = youtube.search().list(
                q=options['SEARCH_QUERY'],
                part='id,snippet',
                maxResults=options['MAX_RESULTS'],
                order=options['ORDER'],
                type='video',
                videoDuration=options['VIDEO_DURATION'],
                publishedAfter=options['PUBLISHED_AFTER'],
                publishedBefore=options['PUBLISHED_BEFORE'],
                relevanceLanguage=options['RELEVANCE_LANGUAGE'],
                # videoCategoryId=options['VIDEO_CATEGORY_ID'], # When there is a category ID assigned, if number 10, then results are mostly music.
                pageToken=page_token
            ).execute()

            all_videos.extend(search_response.get('items', []))
    
            video_ids = [item['id']['videoId'] for item in search_response.get('items', [])]
    
            # Retrieve additional details for each video by their IDs, excluding restricted parts
            if video_ids:
                details_response = youtube.videos().list(
                    id=','.join(video_ids),
                    part='contentDetails,statistics,status,topicDetails,recordingDetails'
                ).execute()
    
                all_videos.extend(details_response.get('items', []))
    
    except HttpError as e:
        print(f"An HTTP error occurred: {e.resp.status} {e.content} at {current_timestamp()}\n")
    return all_videos

def download_subtitles(video_id):
    """Download subtitles for a given YouTube video ID."""
    video_url = f'https://www.youtube.com/watch?v={video_id}'
    ydl_opts = {
        'writeautomaticsub': True,
        'subtitleslangs': ['en'],
        'skip_download': True,
        'outtmpl': f'subtitles/{video_id}.%(ext)s',
        'quiet': True
    }
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([video_url])
            subtitle_file = f'subtitles/{video_id}.en.vtt'
            if os.path.exists(subtitle_file):
                with open(subtitle_file, 'r', encoding='utf-8') as file:
                    subtitle_text = file.read()
                    print(f"Downloaded video subtitles for video ID: {video_id} into the file {subtitle_file} at {current_timestamp()}\n")
                return subtitle_file, subtitle_text
            return None, None
    except Exception as e:
        print(f"Failed to download subtitles for video ID: {video_id}: {e} at {current_timestamp()}\n")
        return None, None

def convert_floats_to_decimals(obj):
    """Recursively convert float values to decimals in the given object."""
    if isinstance(obj, float):
        return Decimal(str(obj))
    elif isinstance(obj, dict):
        return {k: convert_floats_to_decimals(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_floats_to_decimals(v) for v in obj]
    return obj

def send_to_dynamodb(options, dynamodb, video_details):
    """Send Youtube Data API results on the video and subtitle transcripts to DynamoDB."""
    try:
        # Convert float values to decimals
        video_details_decimal = convert_floats_to_decimals(video_details)

        results_table_name = options['RESULTS_TABLE_NAME']
        results_table = dynamodb.Table(results_table_name)
        response = results_table.put_item(Item=video_details_decimal)
        print(f"Successfully inserted API results and converted transcripts into DynamoDB at {current_timestamp()}\n", response)
    except Exception as e:
        print(f"Error inserting into DynamoDB: {e}")

def process_videos_and_store(data, parent_key='', result_dict={}):
    """Process each video from the search results and store them."""
    if isinstance(data, dict):
        for k, v in data.items():
            # Only pass the current key, not the entire path
            extract_last_key_values_to_dict(v, k, result_dict)
    elif isinstance(data, list):
        for i, item in enumerate(data):
            # For list items, keep the index in the key but also indicate it's an item in a list
            current_key = f"{parent_key}[{i}]"
            extract_last_key_values_to_dict(item, parent_key, result_dict)
    else:
        # Directly use parent_key as the final key
        result_dict[parent_key] = data
    send_to_dynamodb(options, dynamodb, result_dict)
    print(f"\nFunction 'process_videos_and_store' was run at {current_timestamp()} \n")
    return result_dict
        

def main():

    # Load the .env file
    load_dotenv()
    
    # Ensure the subtitles directory exists}
    os.makedirs('subtitles', exist_ok=True)
    
    # Get environment variables
    options = get_env_variables()
    
    # Only proceed if options were successfully retrieved
    if options:
        dynamodb = open_aws_dynamodb_session(options)
        
        # Only proceed if DynamoDB was successfully configured
        if dynamodb:
    
            all_videos = youtube_search_all_videos(options, dynamodb)
            formatted_result = json.dumps(all_videos, indent=4, sort_keys=True)
            print(formatted_result)
            # Only proceed if videos were successfully retrieved
            if all_videos:
                process_videos_and_store(all_videos, parent_key='', result_dict={})
            else:
                print(f"Failed to retrieve videos at {current_timestamp()}. Exiting...\n")
        else:
            print(f"Failed to configure boto3 with Docker secrets at {current_timestamp()}. Exiting...\n")
    else:
        print(f"Failed to retrieve environment variables at {current_timestamp()} Exiting...\n")

if __name__ == "__main__":
    main()


Successfully open_aws_dynamodb_session with environment variables. at 2024-02-24 14:01:56

Performing YouTube search at 2024-02-24 14:01:56

Checking cache at 2024-02-24 14:01:56

Error accessing DynamoDB due to 'check_cache' malfunctioniong: An error occurred (UnrecognizedClientException) when calling the GetItem operation: The security token included in the request is invalid. at 2024-02-24 14:01:57

Cache miss.

[
    {
        "etag": "jGmLZdvgw4Xyr1nqkM0e_6QWexg",
        "id": {
            "kind": "youtube#video",
            "videoId": "fG1oNm2tCro"
        },
        "kind": "youtube#searchResult",
        "snippet": {
            "channelId": "UCBqFKDipsnzvJdt6UT0lMIg",
            "channelTitle": "Sandeep Maheshwari",
            "description": "Positive daily affirmations are very powerful... when these affirmations are repeated over and over again, they begin to take ...",
            "liveBroadcastContent": "none",
            "publishTime": "2020-09-16T06:56:10Z",
      

In [53]:
import boto3

# Assuming you have your AWS credentials set up in your environment or AWS credentials file
# Create a DynamoDB resource
dynamodb = boto3.resource('dynamodb', region_name='us-east-2') # Replace 'your-region' with the AWS region your table is in

# Reference your table
table_name = 'YoutubeSearchResults'  # Replace with your table's name
table = dynamodb.Table(table_name)

# Example: Putting an item into the table
response = table.put_item(
    Item={
        'VideoID': 'firstTest',  # Your partition key value
        # Add other attributes here
    }
)

print(response)

ClientError: An error occurred (UnrecognizedClientException) when calling the PutItem operation: The security token included in the request is invalid.