In [5]:
import pandas as pd
from pymongo import MongoClient
from sqlalchemy import create_engine

def transform_mongodb_to_mysql(channel_name):
    # Connect to MongoDB
    client = MongoClient('mongodb://localhost:27017/')
    db = client['youtube_data_lake']
    collection = db['channels']

    # Specify the query to retrieve the document by channel name
    query = {'channel_name': channel_name}

    # Retrieve the specific document by specifying the query
    document = collection.find_one(query)

    if document:
        # Extract the channel details
        channel_data = {
            'channel_id': [document['channel_id']],
            'channel_name': [document['channel_name']],
            'subscription_count': [document['subscription_count']],
            'channel_views': [document['channel_views']],
            'video_count': [document['video_count']],
            'channel_description': [document['channel_description']],
            'playlist_id': [document['playlist_id']],
        }

        # Extract the video details and comment details
        videos = document['videos']
        video_data = []
        comment_data = []
        for video_id, video_info in videos.items():
            video_data.append({
                'video_id': video_info['video_id'],
                'playlist_id': document['playlist_id'],
                'video_name': video_info['video_name'],
                'video_description': video_info['video_description'],
                'published_date': video_info['published_at'],
                'view_count': video_info['view_count'],
                'like_count': video_info['like_count'],
                'favorite_count': video_info['favorite_count'],
                'comment_count': video_info['comment_count'],
                'duration': video_info['duration'],
                'thumbnail': video_info['thumbnail'],
                'caption_status': video_info['caption_status'],
            })
            
            for comment_id, comment_info in video_info['comments'].items():
                comment_data.append({
                    'comment_id': comment_info['comment_id'],
                    'video_id': video_info['video_id'],
                    'comment_text': comment_info['comment_text'],
                    'comment_author': comment_info['comment_author'],
                    'comment_published_at': comment_info['comment_published_at']
                })

        # Create DataFrame for channel data
        channel_df = pd.DataFrame(channel_data)

        # Create DataFrame for video data
        video_df = pd.DataFrame(video_data)
        
        # Create DataFrame for comment data
        comment_df = pd.DataFrame(comment_data)

        # Define the MySQL connection details
        host = 'localhost'
        port = 3306
        user = 'root'
        password = 'Ajith568.'
        database = 'youtube_data'
        channel_table = 'channel'
        playlist_table = 'playlist'
        video_table = 'video'
        comment_table = 'comment'

        # Create a connection to MySQL
        engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{database}')
        connection = engine.connect()

        # Read existing channel data from MySQL table
        existing_channel_query = f"SELECT channel_id FROM {database}.{channel_table}"
        existing_channel_data = pd.read_sql_query(existing_channel_query, engine)

        # Check for channel duplicates
        channel_duplicates = pd.merge(existing_channel_data, channel_df, on='channel_id', how='inner')

        if not channel_duplicates.empty:
            print(f"Document with channel ID {channel_df['channel_id'][0]} already exists in MySQL. Skipping channel insertion.")
        else:
            # Write the channel DataFrame to MySQL
            channel_df.to_sql(channel_table, engine, if_exists='append', index=False)
            print("Channel document inserted into MySQL.")

        # Read existing playlist data from MySQL table
        existing_playlist_query = f"SELECT playlist_id FROM {database}.{playlist_table}"
        existing_playlist_data = pd.read_sql_query(existing_playlist_query, engine)

        # Check for playlist duplicates
        playlist_duplicates = pd.merge(existing_playlist_data, channel_df, on='playlist_id', how='inner')

        if not playlist_duplicates.empty:
            print(f"Document with playlist ID {channel_df['playlist_id'][0]} already exists in MySQL. Skipping playlist insertion.")
        else:
            # Create a new DataFrame for playlist data
            playlist_data = {
                'playlist_id': [document['playlist_id']],
                'channel_id': [document['channel_id']]
            }

            playlist_df = pd.DataFrame(playlist_data)

            # Write the playlist DataFrame to MySQL
            playlist_df.to_sql(playlist_table, engine, if_exists='append', index=False)
            print("Playlist details inserted into MySQL.")

        # Read existing video data from MySQL table
        existing_video_query = f"SELECT video_id FROM {database}.{video_table}"
        existing_video_data = pd.read_sql_query(existing_video_query, engine)

        # Check for video duplicates
        video_duplicates = pd.merge(existing_video_data, video_df, on='video_id', how='inner')

        if not video_duplicates.empty:
            print("Some videos already exist in the MySQL table. Skipping video insertion.")
        else:
            # Write the video DataFrame to MySQL
            video_df.to_sql(video_table, engine, if_exists='append', index=False)
            print("Video details inserted into MySQL.")
        
        # Read existing comment data from MySQL table
        existing_comment_query = f"SELECT comment_id FROM {database}.{comment_table}"
        existing_comment_data = pd.read_sql_query(existing_comment_query, engine)

        # Check for comment duplicates
        comment_duplicates = pd.merge(existing_comment_data, comment_df, on='comment_id', how='inner')

        if not comment_duplicates.empty:
            print("Some comments already exist in the MySQL table. Skipping comment insertion.")
        else:
            # Write the comment DataFrame to MySQL
            comment_df.to_sql(comment_table, engine, if_exists='append', index=False)
            print("Comment details inserted into MySQL.")

        
        connection.close()
        client.close()
        return channel_df

    else:
        print(f"No document found for channel name: {channel_name}")
        return None


# Usage example
channel_name = input('Enter channel name: ')
df = transform_mongodb_to_mysql(channel_name)

Enter channel name: Detective Mehul Tamil
Document with channel ID UCAKknX9QPzMG-PPgcyBURXg already exists in MySQL. Skipping channel insertion.
Document with playlist ID UUAKknX9QPzMG-PPgcyBURXg already exists in MySQL. Skipping playlist insertion.
Some videos already exist in the MySQL table. Skipping video insertion.
Some comments already exist in the MySQL table. Skipping comment insertion.
