In [None]:
import s3fs
import os
import pandas as pd
import re
import tqdm

from datetime import datetime
from src.analysis import GPT
from src.scrapping import IMDb
from src.utils.db import PostgreSQLDatabase

In [None]:
movie_id = '0219822'  # Human nature
# movie_id = '0089885'  # Re-animator
# movie_id = '0101414'. # Beauty and the Best
# movie_id = '0029583'  # Snow White (1937)
# movie_id = '6208148'  # Snow White (2025)

# Scrapping

In [None]:
scrapper = IMDb()
movie_scrap_time = datetime.now().strftime("%Y%m%d_%H%M%S")
movie_title, release_date = scrapper.get_movie(movie_id)
total_reviews = scrapper.get_number_of_reviews(movie_id)
reviews_df = scrapper.get_reviews(movie_id, total_reviews)

In [None]:
# Get the text hidden behind spoiler markup
empty_reviews = reviews_df[pd.isnull(reviews_df["text"]) | (reviews_df["text"].str.strip() == "")]

if len(empty_reviews) > 0:
    print(f"[WARNING] Missing text for {len(empty_reviews)} reviews")
    print(f"[INFO] Getting text behind spoiler markups")
    
    for index, row in tqdm.tqdm(empty_reviews.iterrows(), total=len(empty_reviews), desc="Processing empty reviews"):
        review_id = row["review_id"]
        spoiler_text = scrapper.get_spoiler(review_id)  # Call the function to get the spoiler
        reviews_df.at[index, "text"] = spoiler_text  # Replace 'text' with the spoiler

In [None]:
# Check again for empty reviews
empty_reviews = reviews_df[reviews_df["text"].isna() | reviews_df["text"].str.strip().eq("") |
                           reviews_df["title"].isna() | reviews_df["title"].str.strip().eq("")].shape[0]

if empty_reviews > 0:
    print(f"[WARNING] Still missing text or title for {empty_reviews} reviews")
else:
    print(f"[INFO] No reviews missing text or title")

In [None]:
# Get exact vote counts for values >999
mask = reviews_df['upvotes'].astype(str).str.endswith('K') | reviews_df['downvotes'].astype(str).str.endswith('K')
print(f"[INFO] Found {len(reviews_df[mask])} reviews with rounded votes")

for index, row in reviews_df[mask].iterrows():
    review_id = row['review_id']
    exact_upvotes, exact_downvotes = scrapper.get_votes(review_id)
    reviews_df.loc[index, 'upvotes'] = exact_upvotes
    reviews_df.loc[index, 'downvotes'] = exact_downvotes

reviews_df['upvotes'] = reviews_df['upvotes'].astype(int)
reviews_df['downvotes'] = reviews_df['downvotes'].astype(int)

In [None]:
scrapper.close()

In [None]:
db = PostgreSQLDatabase()
db.connect()

In [None]:
# Update table for movies (data must be passed as a list of tuples)
movie_data = [(movie_id, movie_title, release_date, total_reviews, movie_scrap_time)]
db.upsert_movie_data(movie_data)

In [None]:
# Update table for reviews
# Create a variable to identify reviews needing sentiment analysis
reviews_df['to_process'] = 1

# Convert data to a list of tuples
reviews_list = reviews_df.apply(lambda row: (
    str(row['movie_id']), str(row['review_id']), 
    str(row['author']), str(row['title']), 
    str(row['text']), row['rating'],
    str(row['date']), row['upvotes'],  
    row['downvotes'], row['last_update'], row['to_process']  
), axis=1).tolist()

# Replace NaN with None to avoid errors with postgreSQL
reviews_list = [tuple(None if pd.isna(x) else x for x in row) for row in reviews_list]

# Upserting
db.upsert_review_data(reviews_list)

# Sentiment analysis

In [None]:
reviews_to_process = db.query_data('reviews_raw', condition=f'to_process = 1')
print(f"[INFO] Found {len(reviews_to_process)} reviews to analyze for {len(pd.DataFrame(reviews_to_process)[0].unique())} movies")

In [None]:
analyzer = GPT()
for review in tqdm.tqdm(reviews_to_process, desc="Analyzing reviews sentiment", unit="review"):
    review_id = review[1]
    GPT_results = analyzer.sentiment(review)
    data = [(review_id, *GPT_results)]
    db.update_sentiment_data(data)
    db.reset_indicator(review_id)

# Backup

In [None]:
S3_ENDPOINT_URL = 'https://' + os.environ['AWS_S3_ENDPOINT']
fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': S3_ENDPOINT_URL})

bucket_name = 'maeldieudonne'
destination = bucket_name + '/diffusion/'

In [None]:
for table in ['movies', 'reviews_raw', 'reviews_sentiments']:
    db.backup_table(table)

In [None]:
def get_latest_local_backup(table_name):
    backup_files = [f for f in os.listdir("data/backups") if f.startswith(table_name)]
    
    if not backup_files:
        print(f"[INFO] No local backup found for {table_name}")
        return None

    else:
        latest_backup = max(backup_files, key=lambda f: os.path.getctime(os.path.join("data/backups", f)))
        file_path = os.path.join("data/backups", latest_backup)
        return file_path

In [None]:
for table in ['movies', 'reviews_raw', 'reviews_sentiments']:   
    file_path = get_latest_local_backup(table)
        
    if file_path is not None:
        try:
            fs.put(file_path, destination, content_type="parquet", encoding="utf-8")
            os.remove(file_path)
            print(f"[INFO] Successfully uploaded {file_path} to {destination}")
        except Exception as e:
            print(f"[ERROR] Failed uploading {file_path} to {destination}: {e}")

In [None]:
db.close_connection()