In [2]:
import csv
import vertexai
from vertexai.generative_models import GenerativeModel, Part
from pathlib import Path
from google.cloud import storage
import os
import time
import re

# Google Cloud project and storage details
project_id = "dylanericsp25"
region = "us-central1"
bucket_name = "movies-entertainment"
gcs_in_folder = "initial-loads/imdb-reviews/"
local_folder = "out-csv"
gcs_out_folder = "initial-loads/imdb-reviews/out"
model_name = "gemini-1.5-flash-001"

# Ensure local folder exists
os.makedirs(local_folder, exist_ok=True)

# Prompt to extract structured review data
prompt = """
You are extracting structured IMDb movie review data. Extract **only the following fields**:

1. **Movie Name**: Extract the correct movie name. If unclear, use the filename without ".txt".
2. **Sentiment**: "Positive", "Negative", or "Neutral".
3. **Key Themes**: Identify up to 3 themes ("Cinematography", "Acting", "Plot", "Soundtrack"); separate using semicolons (`;`).
4. **Named Entities**: Extract actors, directors, and studios (separate using `;`).
5. **Emotional Tone**: Choose from: "Excitement", "Disappointment", "Nostalgia", "Frustration", or "Satisfaction".
6. **Star Rating**: Extract **only numerical ratings** (e.g., `"7/10"`, `"4 stars"`, `"3.5/5"`). If missing, return `"N/A"`.

### **STRICT CSV FORMAT**
- **Return EXACTLY in this format:**
  `"movie_name","sentiment","key_themes","named_entities","emotional_tone","star_rating"`
- **Use DOUBLE QUOTES (`""`) around all text fields.**
- **Commas (`,`) must only separate fields, never inside values.**
- **If a field is missing, return `"N/A"` (not empty).**
- **DO NOT include extra explanations, markdown, or new lines.**

### **Example Correct Output**
For *Inception* (2010):
"Inception","Positive","Cinematography;Plot;Acting","Leonardo DiCaprio (Actor); Christopher Nolan (Director); Warner Bros (Studio)","Excitement","9/10"
Ensure perfect adherence to this format.
"""








def clean_star_rating(rating):
    """Ensure star ratings are formatted correctly (e.g., '7/10', '4 stars')."""
    match = re.search(r'(\d+(\.\d+)?)(/10| stars|/5)?', rating)  # Extract only valid numerical ratings
    return match.group(0) if match else "N/A"

def format_row(row, filename):
    """Ensure CSV output from Gemini is parsed correctly without shifts."""
    try:
        csv_reader = csv.reader([row], quotechar='"', delimiter=',', skipinitialspace=True)
        parsed_row = next(csv_reader)  # Extract parsed row

        # Ensure exactly 6 expected columns
        while len(parsed_row) < 6:
            parsed_row.append("N/A")
        if len(parsed_row) > 6:
            parsed_row = parsed_row[:6]

        # Fix star rating format
        parsed_row[5] = clean_star_rating(parsed_row[5])

        return [filename] + parsed_row

    except Exception as e:
        print(f"❌ Error formatting row: {row} | {e}")
        return [filename, "N/A", "N/A", "N/A", "N/A", "N/A", "N/A"]  # Default fallback




def main():
    """Process IMDb reviews from GCS and extract structured insights using Gemini AI."""
    vertexai.init(project=project_id, location=region)
    model = GenerativeModel(model_name)
    storage_client = storage.Client()
    
    # Fetch all .txt files in GCS
    blobs = [blob for blob in storage_client.list_blobs(bucket_name, prefix=gcs_in_folder) if blob.name.endswith(".txt")]
    print(f"✅ Found {len(blobs)} review files in GCS bucket.")

    if not blobs:
        print("❌ No valid review files found. Check the GCS bucket.")
        return
    
    all_reviews = []
    batch_size = 5  # Process files in small batches to manage rate limits

    for i in range(0, len(blobs), batch_size):  # Limit to first 10 files
        batch = blobs[i : i + batch_size]

        for blob in batch:
            print(f"Processing file: {blob.name}")
            filename = blob.name.split("/")[-1]
            movie_name = filename.replace(".txt", "").replace("_", " ").strip()

            print(f"🎬 Extracting insights for: {movie_name}")

            # Read IMDb review text from GCS
            review_text = blob.download_as_text().strip()
            if not review_text:
                print(f"❌ Skipping {movie_name} (empty file).")
                continue

            # Send to Gemini model for processing
            text_input = Part.from_text(review_text)
            try:
                time.sleep(2)  # Prevent exceeding API quota
                response = model.generate_content([text_input, prompt])
                csv_output = response.text.replace("```csv", "").replace("```", "").strip()

                # Save raw Gemini response for debugging
                with open(f"{local_folder}/raw_{filename}.txt", "w", encoding="utf-8") as f:
                    f.write(csv_output)

                print(f"🔹 Gemini Full Response for {filename}: {csv_output}")  # Print full response
            except Exception as e:
                if "Quota exceeded" in str(e):
                    print(f"❌ Skipping {filename} due to quota limits. Consider increasing quota.")
                else:
                    print(f"❌ Error processing {filename}: {e}")
                continue

            # Ensure valid response format
            csv_lines = [line for line in csv_output.split("\n") if "filename" not in line]  # Remove headers if present

            for row in csv_lines:
                formatted_row = format_row(row, filename)
                all_reviews.append(formatted_row)





    # Prevent saving an empty file
    if not all_reviews:
        print("❌ No valid reviews processed. Skipping CSV upload.")
        return
    
    # Save all reviews in one CSV file
    output_csv_path = f"{local_folder}/all_imdb_reviews.csv"
    with open(output_csv_path, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["filename", "movie_name", "sentiment", "key_themes", "named_entities", "emotional_tone", "star_rating"])
        writer.writerows(all_reviews)




def copy_to_GCS():
    """Uploads the combined IMDb review CSV to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    file_path = f"{local_folder}/all_imdb_reviews.csv"
    blob_name = f"{gcs_out_folder}/all_imdb_reviews.csv"

    # Ensure file exists before uploading
    if not os.path.exists(file_path) or os.stat(file_path).st_size == 0:
        print("❌ CSV file is missing or empty. Skipping upload.")
        return

    blob = bucket.blob(blob_name)
    blob.upload_from_filename(file_path)

    print(f"✅ Uploaded single combined file: {blob_name}")


if __name__ == "__main__":
    main()
    copy_to_GCS()


✅ Found 1256 review files in GCS bucket.
Processing file: initial-loads/imdb-reviews/neg/1000_3.txt
🎬 Extracting insights for: 1000 3
🔹 Gemini Full Response for 1000_3.txt: "N/A","Negative","Plot;Acting","N/A","Disappointment","N/A"
Processing file: initial-loads/imdb-reviews/neg/100_4.txt
🎬 Extracting insights for: 100 4
🔹 Gemini Full Response for 100_4.txt: "N/A","Negative","Acting;Plot","Adrian Paul (Actor);","Disappointment","N/A"
Processing file: initial-loads/imdb-reviews/neg/104_1.txt
🎬 Extracting insights for: 104 1
🔹 Gemini Full Response for 104_1.txt: "N/A","Negative","Plot;Acting;Cinematography","N/A","Frustration","N/A"
Processing file: initial-loads/imdb-reviews/neg/105_3.txt
🎬 Extracting insights for: 105 3
🔹 Gemini Full Response for 105_3.txt: "The Forgotten (AKA: Don't Look In The Basement)","Negative","Acting;Plot;Cinematography","N/A","Disappointment","N/A"
Processing file: initial-loads/imdb-reviews/neg/106_3.txt
🎬 Extracting insights for: 106 3
🔹 Gemini Full Respons