In [2]:
import os
os.chdir("../")

In [3]:
%pwd

'/Users/anjalijha/Python/Project/YouTubeChannel-Analyzer'

In [20]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataTransformationConfig():
    root_dir: Path
    data_dir: Path

In [30]:
from YouTubeChannelAnalyzer.constants import *
from YouTubeChannelAnalyzer.utils.common import create_directories, read_yaml
from YouTubeChannelAnalyzer.logging import logger
from pymongo import MongoClient
import certifi, re
import pandas as pd

In [None]:
class ConfigurationManager():
    def __init__(self, config_filepath = CONFIG_FILE_PATH, params_filepath = PARAMS_FILE_PATH):
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        # Create necessary directories
        create_directories([self.config.artifacts_root])

    
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation  # Fetching the data_transformation section
        create_directories([config.root_dir])
        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,
            data_dir=config.data_dir
        )
        return data_transformation_config

    def get_youtube_api_key(self) -> str:
        """
        Retrieves the YouTube API key from environment variables.
        If missing, logs an error and raises an exception.
        """
        youtube_api_key = os.getenv('YOUTUBE_API_KEY')
        youtube_api_key = "AIzaSyBPJ64uexibg77DCSd6rSGU8loyOTvndjI"
        if not youtube_api_key:
            logger.error("YouTube API key is missing from the environment variables.")
            raise ValueError("YouTube API key is missing from environment variables.")
        return youtube_api_key

    def get_mongodb_connection(self) -> str:
        """
        Retrieves the MongoDB connection string from environment variables.
        If missing, logs an error and raises an exception.
        """
        mongodb_uri = os.getenv('MONGODB_URI')
        mongodb_uri = "mongodb+srv://anjalijha1507:U54OU4PFxPYlVc4S@youtubedata.shzzp.mongodb.net/?retryWrites=true&w=majority&appName=YoutubeData"
        if not mongodb_uri:
            logger.error("MongoDB URI is missing from the environment variables.")
            raise ValueError("MongoDB URI is missing from environment variables.")
        return mongodb_uri


In [32]:
class MongoDBStorage:
    def __init__(self, connection_string, db_name='Project1'):
        """
        Initialize the MongoDB storage handler.
        :param connection_string: MongoDB connection string.
        :param db_name: MongoDB database name (default is 'Project1').
        """
        self.connection_string = connection_string
        self.db_name = db_name
        self.client = None
        self.db = None
        self.connect()

    def connect(self):
        """
        Establish a connection to MongoDB.
        """
        try:
            self.client = MongoClient(self.connection_string, tls=True, tlsCAFile=certifi.where())
            self.db = self.client[self.db_name]
            logger.info(f"MongoDB connection successful. Connected to database: {self.db_name}")
        except Exception as e:
            logger.error(f"Failed to connect to MongoDB: {e}")
            raise

    def insert_or_update_channel_data(self, collection_name, channel_data):
        """
        Insert or update the channel data in the specified collection.
        :param collection_name: Name of the MongoDB collection.
        :param channel_data: The channel data to be inserted or updated.
        """
        try:
            collection = self.db[collection_name]
            # Use the 'channel_id' as the unique identifier for the update operation
            collection.update_one(
                {'channel_id': channel_data['channel_id']},
                {'$set': channel_data},
                upsert=True
            )
            logger.info(f"Data for channel {channel_data['channel_id']} successfully inserted/updated.")
        except Exception as e:
            logger.error(f"Failed to insert/update channel data: {e}")
            raise
    
    def mongo_connect(self, connection_string, db_name='Project1'):
        try:
            client = MongoClient(connection_string, tls=True, tlsCAFile=certifi.where())
            self.db = client[db_name]
            print("Connected")
            logger.info(f"Successfully connected to MongoDB database: {db_name}")
            return self.db
        except Exception as e:
            logger.error(f"Failed to connect to MongoDB: {e}")
            raise

    def close(self):
        """
        Close the MongoDB connection when done.
        """
        if self.client:
            self.client.close()
            logger.info("MongoDB connection closed.")



In [39]:
class DataTrasformation():

    def __init__(self, config : ConfigurationManager, db_storage : MongoDBStorage):
        self.config = config
        self.db_storage = db_storage

    def data_processing(self):
        try:
            collection = self.db_storage.db['youtube_channel_data'] 
            # Define the fields to retrieve
            fields = {
                'channel_id': 1,
                'channel_details.channel_name': 1,
                'channel_details.channel_start_date': 1,
                'channel_details.inception_date': 1,
                'channel_details.total_no_of_videos': 1,
                'channel_details.total_no_short_videos': 1,
                'channel_details.total_no_long_videos': 1,
                'channel_details.total_views': 1,
                'channel_details.total_likes': 1,
                'channel_details.total_comments': 1,
                'channel_details.total_subscribers': 1
            }

            # Fetch documents and project the required fields
            documents = collection.find({}, {field: 1 for field in fields})

            # Convert documents to a list of dictionaries
            data = list(documents)

            # Normalize nested data for DataFrame
            df_data = pd.json_normalize(data, sep='_')

            # Rename columns to remove 'channel_details_' prefix
            df_data.columns = df_data.columns.str.replace('channel_details_', '', regex=False)

            # Convert date fields to YYYY-MM-DD format
            date_columns = ['channel_start_date', 'inception_date']
            for column in date_columns:
                # Convert to datetime, handling potential microseconds
                df_data[column] = pd.to_datetime(df_data[column].str.replace(r'\.\d+', '', regex=True)).dt.strftime('%Y-%m-%d')

            # Drop the '_id' column if it exists
            df_data.drop('_id', axis=1, inplace=True, errors='ignore')

            # Save to CSV
            df_data.to_csv(self.config.data_dir + "Raw_Youtube_API_DATA.csv", index=False)
        except Exception as e:
            print(e)
            logger.error(f"Error processing data: {e}")

In [43]:
config_manager = ConfigurationManager()
# Get necessary configurations from .env
api_key = config_manager.get_youtube_api_key()  # Raises error if not found
db_connection_string = config_manager.get_mongodb_connection()  # This should work if MONGODB_URI is in the env

# Initialize MongoDBStorage with the correct URI
db_storage = MongoDBStorage(db_connection_string)

# Create DataIngestionConfig using the config manager
data_transformation_config = config_manager.get_data_transformation_config()


# Initialize DataIngestion with the correct arguments
data_transformation = DataTrasformation(config=data_transformation_config, db_storage=db_storage)
data_transformation.data_processing()


[2025-01-19 20:09:10,679: INFO: common: yaml file: config/config.yaml loaded successfully]
[2025-01-19 20:09:10,681: INFO: common: yaml file: params.yaml loaded successfully]
[2025-01-19 20:09:10,682: INFO: common: created directory at: artifacts]
[2025-01-19 20:09:10,833: INFO: 419572557: MongoDB connection successful. Connected to database: Project1]
[2025-01-19 20:09:10,835: INFO: common: created directory at: artifacts/data_transformation]
