In [12]:
import pymongo
import pandas as pd
from dotenv import load_dotenv
import os
import logging

logging.basicConfig(level=logging.INFO)

In [16]:
class MongoDataExtractor:
    def __init__(self):
        """Initialize MongoDB connection."""
        load_dotenv()
        self.db = self.connect_to_mongo()

    def connect_to_mongo(self):
        """Connect to MongoDB."""
        mongo_uri = os.getenv('MONGO_URI')
        client = pymongo.MongoClient(mongo_uri)
        db_name = os.getenv('MONGODB_DATABASE', 'default_db_name').replace(' ', '_')
        return client[db_name]

    def load_collection_as_dataframe(self, collection_name):
        """Load data from MongoDB collection as DataFrame."""
        data = list(self.db[collection_name].find({}))
        if not data:
            logging.warning(f"No data found in collection: {collection_name}")
            return pd.DataFrame()  
        return pd.DataFrame(data)

    def mark_movie_genres_processed(self):
        """Mark movie_genres collection as processed."""
        self.db['processing_flags'].update_one(
            {'collection': 'movie_genres'}, {'$set': {'processed': True}}, upsert=True
        )

    def is_movie_genres_processed(self):
        """Check if movie_genres collection has been processed."""
        return self.db['processing_flags'].find_one({'collection': 'movie_genres'}) is not None

    def process_movie_genres(self, df):
        return df.drop(columns=['_id'], errors='ignore') if not df.empty else df

    def process_movie_details(self, df):
        movie_genre = pd.DataFrame(
            [{'movie_id': row['id'], 'genre_id': genre['id']} for _, row in df.iterrows() for genre in row['genres']]
        )
        movie = df[['id', 'title', 'budget', 'homepage', 'overview', 'popularity', 
                    'release_date', 'revenue', 'runtime', 'status', 'tagline', 
                    'vote_average', 'vote_count']].rename(columns={'id': 'movie_id'}).drop_duplicates()
        return movie, movie_genre

    def process_movie_actor_credits(self, df):
        return df[['id', 'character', 'order', 'movie_tmdb_id']].rename(
            columns={'id': 'actor_id', 'movie_tmdb_id': 'movie_id'}
        ).drop_duplicates()

    def process_movie_director_credits(self, df):
        return df[['id', 'known_for_department', 'movie_tmdb_id']].rename(
            columns={'id': 'director_id', 'movie_tmdb_id': 'movie_id'}
        ).drop_duplicates()

    def process_actor_details(self, df):
        return df[['id', 'name', 'gender', 'birthday', 'deathday', 'popularity', 'place_of_birth']].rename(
            columns={'id': 'actor_id'}
        ).replace(
            {'gender': {0: 'Not set / not specified', 1: 'Female', 2: 'Male', 3: 'Non-binary'}}
        ).drop_duplicates()

    def process_all_collections(self):
        """Load and process collections from MongoDB."""
        collections = {
            'movie_genres': self.process_movie_genres,
            'movie_details': self.process_movie_details,
            'movie_actor_credits': self.process_movie_actor_credits,
            'actor_details': self.process_actor_details,
            'movie_director_credits': self.process_movie_director_credits
        }

        transformed_data = {}

        for collection_name, process_func in collections.items():
            df = self.load_collection_as_dataframe(collection_name)
            if collection_name == 'movie_genres':
                if self.is_movie_genres_processed():
                    logging.info(f"{collection_name} already processed, skipping.")
                    continue
                transformed_data['genre'] = process_func(df)
                self.mark_movie_genres_processed()
            else:
                processed_data = process_func(df)
                if isinstance(processed_data, tuple):
                    transformed_data.update(dict(zip(['movie', 'movie_genre'], processed_data)))
                else:
                    # Update 'movie_director_credits' to 'movie_direction'
                    if collection_name == 'movie_director_credits':
                        transformed_data['movie_direction'] = processed_data
                    else:
                        transformed_data[collection_name] = processed_data

        return transformed_data

In [17]:
extractor = MongoDataExtractor()
transformed_data = extractor.process_all_collections()  

INFO:root:movie_genres already processed, skipping.


In [18]:
specific_collection = 'movie_direction'

if specific_collection in transformed_data:
    df = transformed_data[specific_collection]
    print(f"DataFrame for collection: {specific_collection}, Data shape: {df.shape}")
    display(df.head())  

DataFrame for collection: movie_direction, Data shape: (59, 3)


Unnamed: 0,director_id,known_for_department,movie_id
0,4347786,Directing,1148172
1,2968474,Editing,1327649
2,24,Directing,940139
3,1183515,Directing,1063877
4,1183516,Directing,1063877
