In [None]:
import requests
import json
from datetime import datetime
import logging
from typing import Dict, List, Optional
import time
from API_KEY import API_KEY

class bronze:

    API_KEY = API_KEY
    url = "https://youtube.googleapis.com/youtube/v3/search"

    looger = logging.getLogger(__name__)

    def __init__(self, q, regionCode, type, maxresults):
        self.API_KEY = bronze.API_KEY
        self.url = bronze.url
        self.part = "snippet"
        self.q = q
        self.type = type
        self.regionCode = regionCode
        self.maxresults = maxresults
        self.total_results = 0
        self.quota_limit = 10000  # Daily quota limit
        self.quota_used = 0

    def request_data(self, max_channels: int = 1000000) -> Optional[Dict]:
        """
        Function to request data from YouTube API with pagination support
        Args:
            max_channels (int): Maximum number of channels to retrieve
        Returns:
            Dict: Combined results from all API requests
        """
        all_items = []
        next_page_token = None
        params = {
            "part": self.part,
            "q": self.q,
            "type": self.type,
            "regionCode": self.regionCode,
            "maxResults": min(50, self.maxresults),  # YouTube API limit is 50 per request
            "key": self.API_KEY
        }

        try:
            while len(all_items) < max_channels:
                if next_page_token:
                    params['pageToken'] = next_page_token

                self.looger.info(f"Sending request to YouTube API. Total items so far: {len(all_items)}")
                request = requests.get(self.url, params=params)
                
                # Check for quota exhaustion (403 status code)
                if request.status_code == 403:
                    self.looger.warning("API quota exhausted. Waiting for quota reset...")
                    time.sleep(3600)  # Wait for 1 hour
                    continue

                if request.status_code != 200:
                    self.looger.error(f"Request failed with status code {request.status_code}")
                    break

                response_data = request.json()
                items = response_data.get('items', [])
                
                if not items:
                    self.looger.info("No more items found")
                    break

                all_items.extend(items)
                self.looger.info(f"Retrieved {len(items)} items. Total: {len(all_items)}")

                next_page_token = response_data.get('nextPageToken')
                if not next_page_token:
                    self.looger.info("No more pages available")
                    break

                time.sleep(0.5)

                if len(all_items) >= max_channels:
                    self.looger.info(f"Reached maximum number of channels: {max_channels}")
                    break

            self.total_results = len(all_items)
            self.looger.info(f"Total channels retrieved: {self.total_results}")
            return {"items": all_items[:max_channels]}

        except Exception as e:
            self.looger.error(f"An error occurred: {e}")
            return None

    def save_data(self, data, filename="../data/bronze/bronze_data.json"):
        """
        Function to save data to a JSON file
        Args:
            data (dict): Data to be saved
            filename (str): Name of the file to save the data
        Returns: None

        """
        try:
            self.looger.info("Getting data from API")
            channels = []
            for item in data['items']:
                id = item['id']['channelId']
                created_date = item['snippet']['publishedAt']
                title = item['snippet']['title']
                description = item['snippet']['description']
                publishTime = item['snippet']['publishTime']
                channels.append({
                    "id": id,
                    "created_date": created_date,
                    "title": title,
                    "description": description,
                    "publishTime": publishTime
                })
            data = {"channels": channels}
            today_date = datetime.today().strftime('%Y-%m-%d')
            filename = f"../data/bronze/bronze_data_{today_date}.json"
            with open(filename, 'w') as f:
                json.dump(data, f, indent=4)
            self.looger.info(f"Data saved to {filename}")
        except Exception as e:
            self.looger.error(f"An error occurred while saving data: {e}")
        logging.basicConfig(level=logging.INFO)
    
    def extract_statictics(self, filename_in="../data/bronze/bronze_data_.json", 
                           filename_out="../data/silver/statistics_data_.json"):
        """
        Function to extract statistics from YouTube API and save to JSON file
        Args:
            today (str): Date for which to extract statistics
            filename_in (str): Input filename to read data from
            filename_out (str): Output filename to save statistics data
        Returns: None
        """
        try:
            today_date = datetime.today().strftime('%Y-%m-%d')
            self.looger.info("Extracting statistics data from API")
            filename_in = f"../data/bronze/bronze_data_{today_date}.json"
            with open(filename_in, 'r') as f:
                data = json.load(f)
            statistics_data = []
            for channel in data['channels']:
                channel_id = channel['id']
                stats_url = "https://youtube.googleapis.com/youtube/v3/channels"
                params = {
                    "part": "statistics",
                    "id": channel_id,
                    "key": self.API_KEY
                }
                response = requests.get(stats_url, params=params)
                if response.status_code == 200:
                    stats = response.json()
                    if 'items' in stats and len(stats['items']) > 0:
                        statistics = stats['items'][0]['statistics']
                        statistics_data.append({
                            "channel_id": channel_id,
                            "viewCount": statistics.get('viewCount', 0),
                            "subscriberCount": statistics.get('subscriberCount', 0),
                            "videoCount": statistics.get('videoCount', 0)
                        })

                time.sleep(0.1)  

            filename_out = f"../data/bronze/statistics_data_{today_date}.json"
            with open(filename_out, 'w') as f:
                json.dump({"statistics": statistics_data}, f, indent=4)
            self.looger.info(f"Statistics data saved to {filename_out}")
        except Exception as e:
            self.looger.error(f"An error occurred while extracting statistics: {e}")

In [44]:
import pandas as pd
from datetime import datetime


class silver:

    looger = logging.getLogger(__name__)
    
    def __init__(self):
        pass

    def load_data_bronze(self, filename="../data/bronze/bronze_data_.json"):
        """
        Function to load bronze data from JSON file into a pandas DataFrame
        Args:
            filename (str): Name of the file to load the data from
        Returns:
            pd.DataFrame: DataFrame containing the bronze data
        """
        try:
            today_date = datetime.today().strftime('%Y-%m-%d')
            filename = f"../data/bronze/bronze_data_{today_date}.json"
            with open(filename, 'r') as f:
                data = json.load(f)

            df = pd.json_normalize(data['channels'])
            self.looger.info(f"Bronze data loaded from {filename}")
            return df
        except Exception as e:
            self.looger.error(f"An error occurred while loading bronze data: {e}")
            return pd.DataFrame()
    
    def load_statistics_data(self, filename="../data/bronze/statistics_data_.json"):
        """
        Function to load statistics data from JSON file into a pandas DataFrame
        Args:
            filename (str): Name of the file to load the data from
        Returns:
            pd.DataFrame: DataFrame containing the statistics data
        """
        try:
            today_date = datetime.today().strftime('%Y-%m-%d')
            filename = f"../data/bronze/statistics_data_{today_date}.json"
            with open(filename, 'r') as f:
                data = json.load(f)

            df = pd.json_normalize(data['statistics'])
            self.looger.info(f"Statistics data loaded from {filename}")
            return df
        except Exception as e:
            self.looger.error(f"An error occurred while loading statistics data: {e}")
            return pd.DataFrame()
        
    def merge_data(self, bronze_df: pd.DataFrame, statistics_df: pd.DataFrame) -> pd.DataFrame:
        """
        Function to merge bronze data with statistics data
        Args:
            bronze_df (pd.DataFrame): DataFrame containing bronze data
            statistics_df (pd.DataFrame): DataFrame containing statistics data
        Returns:
            pd.DataFrame: Merged DataFrame
        """
        try:
            merged_df = pd.merge(bronze_df, statistics_df, left_on='id', right_on='channel_id', how='inner')
            self.looger.info("Data merged successfully")
            merged_df.to_csv(f"../data/silver/silver_data_{datetime.today().strftime('%Y-%m-%d')}.csv", index=False)
            return merged_df
        except Exception as e:
            self.looger.error(f"An error occurred while merging data: {e}")
            return pd.DataFrame()
        
    def clean_data(self, merged_df: pd.DataFrame) -> pd.DataFrame:
        """
        Function to clean the merged DataFrame
        Args:
            merged_df (pd.DataFrame): Merged DataFrame to be cleaned
        Returns:
            pd.DataFrame: Cleaned DataFrame
        """
        try:
            logging.info("Data cleaning process started")
            df = merged_df.copy()
            df = merged_df[['id', 'created_date', 'title', 'description',
                           'viewCount', 'subscriberCount', 'videoCount']]
            
            df.drop_duplicates(subset='id', keep='first', inplace=True)
            df = df[df['videoCount'].apply(lambda x: int(x) > 100)]
            df = df[df['subscriberCount'].apply(lambda x: int(x) > 100)]

            logging.info("Data cleaning process completed")
            df.to_csv(f"../data/silver/cleaned_silver_data_{datetime.today().strftime('%Y-%m-%d')}.csv", index=False)

        except Exception as e:
            logging.error(f"An error occurred during data cleaning: {e}")
            return pd.DataFrame()        


In [None]:
class gold:

    looger = logging.getLogger(__name__)
    
    def __init__(self):
        pass

    

In [None]:
# Configurar logging

logging.basicConfig(level=logging.INFO)

# Crear instancia y solicitar datos
bronze_instance = bronze(q="Data Engineering", regionCode="US", type="channel", maxresults=50)
data = bronze_instance.request_data(max_channels=100)  # Obtener hasta 1 millón de canales

if data:
    bronze_instance.save_data(data)
    bronze_instance.extract_statictics()
else:
    print("No se pudieron obtener los datos")

logging.info("Bronce process completed")

In [None]:
# Silver process

logging.basicConfig(level=logging.INFO)

silver_instance = silver()

logging.info("Extracting the Bronze file")
bronze_df = silver_instance.load_data_bronze()
if bronze_df is not None:
    logging.info("Bronze file extracted successfully")

logging.info("Extract the Statistics file")
statistics_df = silver_instance.load_statistics_data()
if statistics_df is not None:
    logging.info("Statistics extracted succesfully")

logging.info("Merged process start")
merged_df = silver_instance.merge_data(bronze_df, statistics_df)
if merged_df is not None:
    logging.info("Dataframe merge successfully")


silver_instance.clean_data(merged_df)

logging.info("Silver process completed")