In [1]:
import pandas as pd

In [3]:
df = pd.read_csv("dataset/TMDB_movie_dataset_v11.csv")
duplicate_ids = df['id'].duplicated().sum()
print(f"Number of duplicate IDs: {duplicate_ids}")

Number of duplicate IDs: 1036


In [4]:
import pandas as pd

# Učitaj CSV
df = pd.read_csv("dataset/TMDB_movie_dataset_v11.csv")

print("=" * 60)
print("ANALIZA DUPLIKATA U CSV-u")
print("=" * 60)

# Osnovne statistike
total_rows = len(df)
unique_ids = df['id'].nunique()
duplicate_ids = df['id'].duplicated().sum()

print(f"\nOsnovne statistike:")
print(f"  Ukupno redova: {total_rows:,}")
print(f"  Jedinstvenih ID-ova: {unique_ids:,}")
print(f"  Duplikata: {duplicate_ids:,}")
print(f"  Procenat duplikata: {(duplicate_ids/total_rows)*100:.2f}%")

# Proveri da li postoje NULL ili prazni ID-ovi
null_ids = df['id'].isna().sum()
empty_ids = (df['id'] == '').sum()

print(f"\nProblematični ID-ovi:")
print(f"  NULL ID-ovi: {null_ids:,}")
print(f"  Prazni ID-ovi: {empty_ids:,}")

# Prikaži najčešće duplicirane ID-ove
print(f"\nTop 10 najčešće dupliciranih ID-ova:")
duplicate_counts = df[df['id'].duplicated(keep=False)].groupby('id').size().sort_values(ascending=False)
print(duplicate_counts.head(10))

# Prikaži primere duplikata
print(f"\nPrimeri duplikata (prvi 5 ID-ova sa duplikatima):")
duplicate_ids_list = df[df['id'].duplicated(keep=False)]['id'].unique()[:5]

for dup_id in duplicate_ids_list:
    print(f"\n--- ID: {dup_id} ---")
    dup_rows = df[df['id'] == dup_id][['id', 'title', 'release_date', 'imdb_id']]
    print(dup_rows.to_string(index=False))

# Proveri IMDB ID duplikate
print(f"\n{'=' * 60}")
print("ANALIZA IMDB ID DUPLIKATA")
print("=" * 60)

imdb_duplicate_ids = df['imdb_id'].duplicated().sum()
unique_imdb_ids = df['imdb_id'].nunique()

print(f"\nIMDB ID statistike:")
print(f"  Jedinstvenih IMDB ID-ova: {unique_imdb_ids:,}")
print(f"  IMDB ID duplikata: {imdb_duplicate_ids:,}")
print(f"  Procenat duplikata: {(imdb_duplicate_ids/total_rows)*100:.2f}%")

# Proveri koliko redova nema IMDB ID
missing_imdb = df['imdb_id'].isna().sum()
empty_imdb = (df['imdb_id'] == '').sum()

print(f"\nProblematični IMDB ID-ovi:")
print(f"  NULL IMDB ID-ovi: {missing_imdb:,}")
print(f"  Prazni IMDB ID-ovi: {empty_imdb:,}")

# Ako ima puno duplikata IMDB ID-a, prikaži top 10
if imdb_duplicate_ids > 1000:
    print(f"\nTop 10 najčešće dupliciranih IMDB ID-ova:")
    imdb_duplicate_counts = df[df['imdb_id'].duplicated(keep=False)].groupby('imdb_id').size().sort_values(ascending=False)
    print(imdb_duplicate_counts.head(10))

print(f"\n{'=' * 60}")
print("PREPORUKA ZA REŠENJE")
print("=" * 60)

if duplicate_ids < 2000:
    print("\n✓ Broj duplikata po TMDB ID je prihvatljiv (<2000)")
    print("  Strategija: Koristi TMDB ID + dodatni identifikator")
    print("\n  Opcija 1: TMDB ID + IMDB ID")
    print("    _id = f'tmdb_{movie_id}_imdb_{imdb_id}'")
    print("\n  Opcija 2: TMDB ID + Release Date")
    print("    _id = f'tmdb_{movie_id}_{release_date}'")
    print("\n  Opcija 3: Zadrži samo prvi (keep='first') ili poslednji (keep='last')")
else:
    print("\n⚠️  VELIKI broj duplikata - potrebna detaljna analiza!")

print("\n" + "=" * 60)

ANALIZA DUPLIKATA U CSV-u

Osnovne statistike:
  Ukupno redova: 1,282,561
  Jedinstvenih ID-ova: 1,281,525
  Duplikata: 1,036
  Procenat duplikata: 0.08%

Problematični ID-ovi:
  NULL ID-ovi: 0
  Prazni ID-ovi: 0

Top 10 najčešće dupliciranih ID-ova:
id
1525030    4
1199539    4
1199214    4
1195933    4
1192968    4
1192967    4
1192966    4
1192965    4
1192964    4
1192962    4
dtype: int64

Primeri duplikata (prvi 5 ID-ova sa duplikatima):

--- ID: 1398836 ---
     id        title release_date imdb_id
1398836 I love caffè   2024-12-03     NaN
1398836 I love caffè          NaN     NaN

--- ID: 1426233 ---
     id       title release_date imdb_id
1426233 Poker Night   2024-11-28     NaN
1426233 Poker Night          NaN     NaN

--- ID: 1329292 ---
     id title release_date imdb_id
1329292  9/11   2023-09-12     NaN
1329292  9/11          NaN     NaN

--- ID: 1326022 ---
     id              title release_date   imdb_id
1326022 Yoink of the yukon   1995-07-30 tt1216995
1326022 Yoink 

In [3]:
print(df.dtypes)
print(df.shape)

id                        int64
title                    object
vote_average            float64
vote_count                int64
status                   object
release_date             object
revenue                   int64
runtime                   int64
adult                      bool
backdrop_path            object
budget                    int64
homepage                 object
imdb_id                  object
original_language        object
original_title           object
overview                 object
popularity              float64
poster_path              object
tagline                  object
genres                   object
production_companies     object
production_countries     object
spoken_languages         object
keywords                 object
dtype: object
(1281555, 24)


In [4]:
for i in df.columns:
    print(i, df[i].nunique())

id 1280521
title 1093289
vote_average 5025
vote_count 3598
status 6
release_date 43258
revenue 14490
runtime 794
adult 2
backdrop_path 326155
budget 6184
homepage 125097
imdb_id 634688
original_language 177
original_title 1129251
overview 970024
popularity 20186
poster_path 844854
tagline 171644
genres 14226
production_companies 221776
production_countries 10565
spoken_languages 7295
keywords 189721


In [5]:
for i in df.columns:
    print(i, df[i].unique())

id [  27205  157336     155 ...  803881  803883 1539893]
title ['Inception' 'Interstellar' 'The Dark Knight' ... 'Max and Maxine'
 "Erste Christmas: Edgar's Christmas" 'Permission 2']
vote_average [8.364 8.417 8.512 ... 0.8   0.    0.75 ]
vote_count [34495 32571 30619 ...     2     1     0]
status ['Released' 'Planned' 'Rumored' 'Post Production' 'In Production'
 'Canceled']
release_date ['2010-07-15' '2014-11-05' '2008-07-16' ... '1918-11-19' '1947-11-16'
 '1928-01-17']
revenue [ 825532764  701729206 1004558444 ...      55390        895     542849]
runtime [  148   169   152   162   143   108   149   139   121   154   142   126
   165   181   136   194   122   179   201   138   180   141   147   130
   135   115   153   133   161   137   124   123   140   131    95   127
   157    96   117   175   116   100   101   132   146   105    98    92
    81    89   155   103   112   120   144   111   113   104   189    90
   129   102   107   109   119   125   151   195    91    99   188   11

- id - Unique identifier for each movie. (type: int)
- title Title of the movie. (type: str)
- vote_average - Average vote or rating given by viewers. (type: float)
- vote_count - Total count of votes received for the movie. (type: int)
- status - The status of the movie (e.g., Released, Rumored, Post Production, etc.). (type: str)
- release_date - Date when the movie was released. (type: str)
- revenue - Total revenue generated by the movie. (type: int)
- runtime - Duration of the movie in minutes. (type: int)
- adult - Indicates if the movie is suitable only for adult audiences. (type: bool)
- backdrop_path - URL of the backdrop image for the movie. (type: str)
- budget - Budget allocated for the movie. (type: int)
- homepage - Official homepage URL of the movie. (type: str)
- imdb_id - IMDb ID of the movie. (type: str)
- original_language - Original language in which the movie was produced. (type: str)
- original_title - Original title of the movie. (type: str)
- overview - Brief description or summary of the movie. (type: str)
- popularity - Popularity score of the movie. (type: float)
- poster_path - URL of the movie poster image. (type: str)
- tagline - Catchphrase or memorable line associated with the movie. (type: str)
- genres - List of genres the movie belongs to. (type: str)
- production_companies - List of production companies involved in the movie. (type: str)
- production_countries - List of countries involved in the movie production. (type: str)
- spoken_languages - List of languages spoken in the movie. (type: str)
- keywords - Keywords associated with the movie. Do `.split(", ")` to convert to a list. (type: str)

- id - Jedinstveni identifikator za svaki film.
- title - Naslov filma.
- vote_average - Prosečan glas ili ocena koju su dali gledaoci.
- vote_count - Ukupan broj glasova dobijenih za film.
- status - Status filma (npr. Objavljen, Glasine, Postprodukcija, itd.).
- release_date - Datum objavljivanja filma.
- revenue - Ukupan prihod koji je film generisao.
- runtime - Trajanje filma u minutima.
- adult - Označava da li je film pogodan samo za odraslu publiku.
- backdrop_path - URL slike pozadine za film.
- budget - Budžet dodeljen za film.
- homepage - Zvanični URL početne stranice filma.
- imdb_id - IMDb ID filma.
- original_language - Originalni jezik na kojem je film snimljen.
- original_title - Originalni naslov filma.
- overview - Kratak opis ili rezime filma.
- popularity - Ocena popularnosti filma.
- poster_path - URL slike postera filma.
- tagline - Krilatica ili nezaboravna rečenica povezana sa filmom.
- genres - Lista žanrova kojima film pripada.
- production_companies - Lista produkcijskih kuća uključenih u film.
- production_countries - Lista zemalja uključenih u produkciju filma.
- spoken_languages - Lista jezika koji se govore u filmu.
- keywords - Ključne reči povezane sa filmom. 


In [None]:
from pymongo import MongoClient
from datetime import datetime
import re
import traceback
from pymongo.errors import BulkWriteError

def transform_movie_document(doc):
    """Transformiše flat dokument u nested strukturu"""
    
    # Parsiranje datuma
    release_date_obj = {}
    if doc.get('release_date'):
        try:
            if isinstance(doc['release_date'], str):
                date_parts = doc['release_date'].split('-')
                release_date_obj = {
                    "year": int(date_parts[0]),
                    "month": int(date_parts[1]),
                    "day": int(date_parts[2]),
                    "full_date": doc['release_date']
                }
            elif hasattr(doc['release_date'], 'year'):  # datetime objekat
                release_date_obj = {
                    "year": doc['release_date'].year,
                    "month": doc['release_date'].month,
                    "day": doc['release_date'].day,
                    "full_date": doc['release_date'].strftime('%Y-%m-%d')
                }
        except (ValueError, IndexError, AttributeError):
            release_date_obj = {"full_date": str(doc.get('release_date', ''))}
    
    # Parsiranje string lista u nizove
    def parse_string_to_array(field_value):
        if not field_value or field_value == "":
            return []
        return [item.strip() for item in str(field_value).split(', ')]
    
    # Kreiranje jedinstvenog _id - kombinacija imdb_id i originalnog _id
    original_mongo_id = str(doc.get('_id', ''))
    imdb_id = doc.get('imdb_id', '')
    
    # Kreiraj jedinstveni _id
    if imdb_id and imdb_id != '':
        unique_id = f"{imdb_id}_{original_mongo_id}"
    else:
        unique_id = original_mongo_id
    
    # Kreiranje novog dokumenta
    new_doc = {
        "_id": unique_id,  # Jedinstveni _id
        "original_mongo_id": original_mongo_id,  # Čuvamo originalni MongoDB _id
        "movie_id": doc.get('id'),
        "title": doc.get('title', ''),
        "original_title": doc.get('original_title', ''),
        "overview": doc.get('overview', ''),
        "tagline": doc.get('tagline', ''),
        
        "ratings": {
            "vote_average": doc.get('vote_average', 0),
            "vote_count": doc.get('vote_count', 0),
            "popularity": doc.get('popularity', 0)
        },
        
        "release_info": {
            "status": doc.get('status', ''),
            "release_date": release_date_obj,
            "original_language": doc.get('original_language', ''),
            "spoken_languages": parse_string_to_array(doc.get('spoken_languages', ''))
        },
        
        "content_info": {
            "adult": doc.get('adult', False),
            "runtime": doc.get('runtime', 0),
            "genres": parse_string_to_array(doc.get('genres', ''))
        },
        
        "financial": {
            "budget": doc.get('budget', 0),
            "revenue": doc.get('revenue', 0)
        },
        
        "production": {
            "companies": parse_string_to_array(doc.get('production_companies', '')),
            "countries": parse_string_to_array(doc.get('production_countries', ''))
        },
        
        "media": {
            "poster_path": doc.get('poster_path', ''),
            "backdrop_path": doc.get('backdrop_path', ''),
            "homepage": doc.get('homepage', ''),
            "imdb_id": doc.get('imdb_id', '')
        },
        
        "keywords": parse_string_to_array(doc.get('keywords', ''))
    }
    
    return new_doc

def transform_mongodb_collection():
    """Glavna funkcija za transformaciju kolekcije"""
    
    try:
        # Konekcija na MongoDB
        print("Povezujem se na MongoDB...")
        client = MongoClient('mongodb://localhost:27017/')  #! Prilagodi svoj connection string
        db = client['SBP_DB']  # Ime tvoje baze
        
        # Test konekcije
        client.admin.command('ping')
        print("✓ Uspešno povezano na MongoDB")
        
    except Exception as e:
        print(f"❌ GREŠKA PRI POVEZIVANJU NA MONGODB:")
        print(f"   Tip greške: {type(e).__name__}")
        print(f"   Poruka: {str(e)}")
        print(f"   Proverite da li je MongoDB pokrenut i dostupan na localhost:27017")
        return False
    
    try:
        # Originalna i nova kolekcija
        old_collection = db['SBP_COL']  # Stara kolekcija
        new_collection = db['movies_restructured']  # Nova kolekcija
        
        # Proveri da li stara kolekcija postoji
        if 'SBP_COL' not in db.list_collection_names():
            print(f"❌ GREŠKA: Kolekcija 'SBP_COL' ne postoji u bazi 'SBP_DB'")
            print(f"   Dostupne kolekcije: {db.list_collection_names()}")
            return False
        
        # Kreiranje backup-a postojeće 'movies' kolekcije
        print("Pravljem backup postojećih dokumenata...")
        if 'movies' in db.list_collection_names():
            existing_docs = list(db['movies'].find())
            if existing_docs:
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                backup_collection_name = f'movies_backup_{timestamp}'
                db[backup_collection_name].insert_many(existing_docs)
                print(f"✓ Backup kreiran u '{backup_collection_name}' sa {len(existing_docs)} dokumenata")
        
        # Obriši novu kolekciju ako postoji
        new_collection.drop()
        print("✓ Nova kolekcija pripremljena")
        
    except Exception as e:
        print(f"❌ GREŠKA PRI PRIPREMI KOLEKCIJA:")
        print(f"   Tip greške: {type(e).__name__}")
        print(f"   Poruka: {str(e)}")
        print(f"   Detalji:")
        print(traceback.format_exc())
        return False
    
    # Transformacija
    print("Počinje transformacija...")
    batch_size = 1000
    processed = 0
    batch = []
    duplicate_errors = 0
    other_errors = 0
    
    try:
        total_docs = old_collection.count_documents({})
        print(f"Ukupno dokumenata za transformaciju: {total_docs}")
        
        for doc_index, doc in enumerate(old_collection.find(), 1):
            try:
                transformed_doc = transform_movie_document(doc)
                batch.append(transformed_doc)
                
                # Batch insert
                if len(batch) >= batch_size:
                    try:
                        new_collection.insert_many(batch, ordered=False)
                        processed += len(batch)
                        print(f"✓ Transformisano {processed}/{total_docs} dokumenata...")
                        batch = []
                        
                    except BulkWriteError as bulk_error:
                        # Rukovanje sa BulkWriteError - ne prekidaj proces
                        details = bulk_error.details
                        write_errors = details.get('writeErrors', [])
                        
                        # Broji tipove grešaka
                        current_duplicates = sum(1 for err in write_errors if err.get('code') == 11000)
                        current_other_errors = len(write_errors) - current_duplicates
                        
                        duplicate_errors += current_duplicates
                        other_errors += current_other_errors
                        
                        # Dodaj broj uspešno unesenih dokumenata
                        inserted_count = details.get('nInserted', 0)
                        processed += inserted_count
                        
                        print(f"⚠️  Batch greške: {current_duplicates} duplikata, {current_other_errors} drugih")
                        print(f"✓ Uspešno uneseno {inserted_count} od {len(batch)} dokumenata")
                        print(f"✓ Ukupno transformisano {processed}/{total_docs} dokumenata...")
                        
                        # Nastavi sa sledećim batch-om
                        batch = []
                    
            except Exception as e:
                print(f"\n⚠️  GREŠKA PRI TRANSFORMACIJI DOKUMENTA {doc_index}:")
                print(f"   Document ID: {doc.get('_id', 'nepoznat')}")
                print(f"   IMDB ID: {doc.get('imdb_id', 'nepoznat')}")
                print(f"   Naslov: {doc.get('title', 'nepoznat')}")
                print(f"   Tip greške: {type(e).__name__}")
                print(f"   Poruka: {str(e)}")
                other_errors += 1
                # Nastavi sa sledećim dokumentom
                continue
        
        # Insert poslednji batch
        if batch:
            try:
                new_collection.insert_many(batch, ordered=False)
                processed += len(batch)
                print(f"✓ Poslednji batch: {len(batch)} dokumenata")
            except BulkWriteError as bulk_error:
                details = bulk_error.details
                write_errors = details.get('writeErrors', [])
                current_duplicates = sum(1 for err in write_errors if err.get('code') == 11000)
                current_other_errors = len(write_errors) - current_duplicates
                
                duplicate_errors += current_duplicates
                other_errors += current_other_errors
                
                inserted_count = details.get('nInserted', 0)
                processed += inserted_count
                
                print(f"⚠️  Poslednji batch greške: {current_duplicates} duplikata, {current_other_errors} drugih")
                print(f"✓ Uspešno uneseno {inserted_count} dokumenata iz poslednjeg batch-a")
        
        print(f"\n✓ Transformacija završena!")
        print(f"✓ Uspešno transformisano: {processed} dokumenata")
        if duplicate_errors > 0:
            print(f"⚠️  Preskočeno zbog duplikata: {duplicate_errors}")
        if other_errors > 0:
            print(f"⚠️  Druge greške: {other_errors}")
        
    except Exception as e:
        print(f"\n❌ KRITIČNA GREŠKA TOKOM TRANSFORMACIJE:")
        print(f"   Tip greške: {type(e).__name__}")
        print(f"   Poruka: {str(e)}")
        print(f"   Obrađeno dokumenata do greške: {processed}")
        print(f"   Stack trace:")
        print(traceback.format_exc())
        return False
    
    try:
        # Kreiranje indeksa
        print("Kreiranje indeksa...")
        new_collection.create_index([("ratings.vote_average", -1)])
        new_collection.create_index([("release_info.release_date.year", -1)])
        new_collection.create_index([("content_info.genres", 1)])
        new_collection.create_index([("financial.revenue", -1)])
        new_collection.create_index([("keywords", 1)])
        new_collection.create_index([("media.imdb_id", 1)])  # Indeks za IMDB ID pretragu
        print("✓ Indeksi kreirani")
        
        # Zameni staru kolekciju novom
        print("Zamenjujem kolekcije...")
        if 'movies' in db.list_collection_names():
            db['movies'].drop()  # Obriši staru
        new_collection.rename('movies')  # Preimenuj novu
        print("✓ Kolekcije zamenjene")
        
        print("\n🎉 Transformacija kompletno završena!")
        
        # Prikaži primer dokumenta
        sample_doc = db['movies'].find_one()
        print("\nPrimer transformisanog dokumenta:")
        print("="*50)
        print(sample_doc)
        
        return True
        
    except Exception as e:
        print(f"\n❌ GREŠKA PRI FINALIZACIJI:")
        print(f"   Tip greške: {type(e).__name__}")
        print(f"   Poruka: {str(e)}")
        print(f"   Stack trace:")
        print(traceback.format_exc())
        return False

def verify_transformation():
    """Verifikacija transformacije"""
    try:
        client = MongoClient('mongodb://localhost:27017/')
        db = client['SBP_DB']
        
        print("\n=== VERIFIKACIJA TRANSFORMACIJE ===")
        
        # Brojanje dokumenata
        old_count = db['SBP_COL'].count_documents({})
        new_count = db['movies'].count_documents({})
        print(f"Stara kolekcija: {old_count} dokumenata")
        print(f"Nova kolekcija: {new_count} dokumenata")
        
        if old_count != new_count:
            difference = old_count - new_count
            print(f"⚠️  Razlika: {difference} dokumenata (verovatno duplikati)")
        else:
            print(f"✓ Broj dokumenata se slaže")
        
        # Proveri duplikate po IMDB ID
        pipeline = [
            {"$group": {
                "_id": "$media.imdb_id",
                "count": {"$sum": 1},
                "titles": {"$push": "$title"}
            }},
            {"$match": {"count": {"$gt": 1}}},
            {"$limit": 5}  # Prikaži samo prvih 5
        ]
        
        duplicates = list(db['movies'].aggregate(pipeline))
        if duplicates:
            print(f"\n⚠️  Pronađeni duplikati po IMDB ID:")
            for dup in duplicates:
                print(f"   IMDB ID: {dup['_id']} -> {dup['count']} puta")
                print(f"   Naslovi: {dup['titles'][:3]}...")  # Prikaži prva 3
        else:
            print("✓ Nema duplikata po IMDB ID u finalnoj kolekciji")
        
        # Proveri strukture
        sample = db['movies'].find_one()
        if sample:
            print(f"\nStruktura novog dokumenta:")
            print(f"- Glavna polja: {list(sample.keys())}")
            print(f"- ratings polja: {list(sample.get('ratings', {}).keys())}")
            print(f"- genres tip: {type(sample.get('content_info', {}).get('genres', []))}")
            print(f"- keywords tip: {type(sample.get('keywords', []))}")
        else:
            print("❌ Nema dokumenata u novoj kolekciji!")
            
    except Exception as e:
        print(f"\n❌ GREŠKA PRI VERIFIKACIJI:")
        print(f"   Tip greške: {type(e).__name__}")
        print(f"   Poruka: {str(e)}")
        print(f"   Stack trace:")
        print(traceback.format_exc())

if __name__ == "__main__":
    # Pokreni transformaciju
    success = transform_mongodb_collection()
    
    if success:
        # Verifikuj rezultate
        verify_transformation()
    else:
        print("\n💥 TRANSFORMACIJA PREKINUTA ZBOG KRITIČNE GREŠKE!")
        print("Proverite greške iznad i pokušajte ponovo.")

Povezujem se na MongoDB...
✓ Uspešno povezano na MongoDB
Pravljem backup postojećih dokumenata...
✓ Nova kolekcija pripremljena
Počinje transformacija...
Ukupno dokumenata za transformaciju: 1282561
Ukupno dokumenata za transformaciju: 1282561
✓ Transformisano 1000/1282561 dokumenata...
✓ Transformisano 2000/1282561 dokumenata...
✓ Transformisano 3000/1282561 dokumenata...
✓ Transformisano 4000/1282561 dokumenata...
✓ Transformisano 1000/1282561 dokumenata...
✓ Transformisano 2000/1282561 dokumenata...
✓ Transformisano 3000/1282561 dokumenata...
✓ Transformisano 4000/1282561 dokumenata...
✓ Transformisano 5000/1282561 dokumenata...
✓ Transformisano 6000/1282561 dokumenata...
✓ Transformisano 7000/1282561 dokumenata...
✓ Transformisano 8000/1282561 dokumenata...
✓ Transformisano 9000/1282561 dokumenata...
✓ Transformisano 10000/1282561 dokumenata...
✓ Transformisano 5000/1282561 dokumenata...
✓ Transformisano 6000/1282561 dokumenata...
✓ Transformisano 7000/1282561 dokumenata...
✓ Trans