# üé¨ CymbalFlix Discover - Database Setup

Welcome to the data engineering portion of CymbalFlix Discover! In this notebook, you'll set up your AlloyDB database with everything needed for an AI-powered movie discovery application.

## What We're Building

By the end of this notebook, your database will contain:

| Table | Records | Purpose |
|-------|---------|--------|
| `movies` | ~9,700 | Core catalog with AI-searchable summaries and vector embeddings |
| `genres` | 20 | Genre lookup table |
| `movie_genres` | ~21,000 | Many-to-many junction for movie genres |
| `users` | 610 | User profiles extracted from ratings data |
| `ratings` | 100,836 | Historical ratings for analytics |
| `tags` | 3,683 | User-generated tags for semantic analysis |
| `links` | ~9,700 | External IDs (IMDb, TMDb) for integration |
| `watchlist` | 0 | Ready for user watchlist operations |

## AlloyDB Extensions We'll Enable

- **`vector`** - PostgreSQL vector data type for embeddings
- **`alloydb_scann`** - Google's ScaNN index for lightning-fast vector search
- **`google_ml_integration`** - Direct Vertex AI access from SQL

## Security: IAM Authentication

Notice something missing? **No database passwords!** We're using IAM authentication, which means:
- Your Google Cloud identity is your database identity
- No passwords to manage, rotate, or accidentally commit to Git
- The AlloyDB Python Connector handles secure authentication automatically

Let's get started! üöÄ

---
## Step 1: Configure Your Environment

First, let's set up the configuration for your specific AlloyDB cluster. Fill in the form fields below with values from your lab instructions.

**Tip:** The form fields appear when you click on this cell. Just fill them in and run the cell!

In [None]:
# @title Configuration - Fill in your lab details { display-mode: "form" }
# @markdown Enter your project and cluster information from the lab instructions:

PROJECT_ID = ""  # @param {type:"string"}
REGION = ""  # @param {type:"string"}
USER_EMAIL = ""  # @param {type:"string"}
CLUSTER_ID = "cymbalflix-cluster"  # @param {type:"string"}
INSTANCE_ID = "cymbalflix-primary"  # @param {type:"string"}

# Database name we'll create
DB_NAME = "cymbalflix"

# GCS bucket with our MovieLens data
DATA_BUCKET = "gs://class-demo/ml-latest-small"

# Validate configuration
if not PROJECT_ID or PROJECT_ID == "":
    print("‚ùå Please enter your PROJECT_ID in the form field above!")
    print("   You can find it in the lab instructions or Cloud Console.")
else:
    print(f"‚úÖ Configuration set!")
    print(f"   Project:  {PROJECT_ID}")
    print(f"   Region:   {REGION}")
    print(f"   Cluster:  {CLUSTER_ID}")
    print(f"   Instance: {INSTANCE_ID}")
    print(f"\nüîê Using IAM authentication (no password required!)")

---
## Step 2: Install Dependencies & Connect to AlloyDB

We'll use the **AlloyDB Python Connector** with **pg8000** to establish a secure connection. This connector:

- Handles IAM authentication automatically
- Creates encrypted connections without manual certificate management  
- Works seamlessly in Colab, Cloud Shell, or any Python environment
- Is the recommended approach for production applications

We'll use batch loading with `executemany()` for fast bulk data insertion‚Äîmuch faster than row-by-row inserts!

In [None]:
# Install required packages
!pip install -q google-cloud-alloydb-connector[pg8000] \
    pandas google-cloud-storage sqlalchemy

print("‚úÖ Dependencies installed!")

In [None]:
import pandas as pd
from google.cloud import storage
from google.cloud.alloydb.connector import Connector, IPTypes
import pg8000
import sqlalchemy
from sqlalchemy import text
import io
import re
import json
from datetime import datetime

# Build the instance URI for the connector
INSTANCE_URI = f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{CLUSTER_ID}/instances/{INSTANCE_ID}"

# Initialize the AlloyDB connector
connector = Connector()

def get_connection(database="postgres"):
    """
    Create a connection to AlloyDB using the Python Connector.

    With enable_iam_auth=True, your Google Cloud identity is used
    for authentication - no password needed!
    """
    conn = connector.connect(
        INSTANCE_URI,
        "pg8000",
        user=USER_EMAIL,
        db=database,
        enable_iam_auth=True,
        ip_type=IPTypes.PUBLIC,
    )
    return conn

# Test the connection
print(f"üîó Connecting to: {INSTANCE_URI}")
print("‚è≥ Establishing secure connection...")

try:
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute("SELECT version();")
    version = cursor.fetchone()[0]
    cursor.execute("SELECT current_user;")
    current_user = cursor.fetchone()[0]
    cursor.close()
    conn.close()

    print("\n‚úÖ Successfully connected to AlloyDB!")
    print(f"\nüîê Authenticated as: {current_user}")
    print(f"\nüìä Database version:")
    print(f"   {version[:60]}...")
except Exception as e:
    print(f"\n‚ùå Connection failed: {e}")
    print("\nüîç Troubleshooting tips:")
    print("   1. Verify your PROJECT_ID is correct (check the form above)")
    print("   2. Make sure your AlloyDB cluster shows 'Ready' in Cloud Console")
    print("   3. Confirm the cluster and instance names match your Terraform output")
    print("   4. Check that your user has the AlloyDB IAM Database User role")

---
## Step 3: Create the CymbalFlix Database

We'll create a dedicated database for CymbalFlix rather than using the default `postgres` database. This is a best practice‚Äîit keeps your application data isolated and makes it easier to manage permissions, backups, and migrations.

In [None]:
# Create the cymbalflix database
# We need to use autocommit mode for CREATE DATABASE
conn = get_connection("postgres")
conn.autocommit = True
cursor = conn.cursor()

# Check if database exists
cursor.execute("SELECT 1 FROM pg_database WHERE datname = %s", (DB_NAME,))
exists = cursor.fetchone()

if not exists:
    cursor.execute(f"CREATE DATABASE {DB_NAME}")
    print(f"‚úÖ Created database: {DB_NAME}")
else:
    print(f"‚ÑπÔ∏è  Database '{DB_NAME}' already exists - continuing...")

cursor.close()
conn.close()

---
## Step 4: Enable Extensions

This is where AlloyDB becomes more than just PostgreSQL! We'll enable three powerful extensions:

| Extension | What It Does |
|-----------|-------------|
| `vector` | Adds the VECTOR data type for storing embeddings |
| `alloydb_scann` | Enables Google's ScaNN algorithm for fast similarity search |
| `google_ml_integration` | Connects AlloyDB directly to Vertex AI |

In [None]:
# Enable AlloyDB extensions
conn = get_connection(DB_NAME)
conn.autocommit = True
cursor = conn.cursor()

extensions = [
    ("vector", "Vector data type for embeddings"),
    ("alloydb_scann", "ScaNN index for lightning-fast vector similarity search"),
    ("google_ml_integration", "Direct Vertex AI integration for AI SQL functions")
]

print("üîß Enabling AlloyDB extensions...\n")

for ext_name, description in extensions:
    try:
        cursor.execute(f"CREATE EXTENSION IF NOT EXISTS {ext_name}")
        print(f"‚úÖ {ext_name}")
        print(f"   ‚îî‚îÄ {description}")
    except Exception as e:
        print(f"‚ö†Ô∏è  Could not enable {ext_name}: {e}")

cursor.close()
conn.close()

print("\nüéâ Extensions enabled!")

---
## Step 5: Create the Database Schema

Our schema is designed for both transactional operations (watchlists, ratings) and analytical queries (trending movies, genre analysis).

**Key design decisions:**

- **Normalized genres** - Instead of storing "Action|Comedy|Sci-Fi" as text, we use a proper junction table
- **Vector column** - The `movies.summary_embedding` stores 3072-dimensional vectors for semantic search
- **Foreign keys** - Enforce data integrity across related tables
- **Timestamps** - Enable temporal analysis and audit trails

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê       ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê       ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ   movies    ‚îÇ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÇ movie_genres ‚îÇ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÇ   genres    ‚îÇ
‚îÇ (+ vector)  ‚îÇ       ‚îÇ  (junction)  ‚îÇ       ‚îÇ  (lookup)   ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò       ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò       ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
       ‚îÇ
       ‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
       ‚îÇ                    ‚îÇ                     ‚îÇ
       ‚ñº                    ‚ñº                     ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê       ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê       ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ   ratings   ‚îÇ       ‚îÇ    tags     ‚îÇ       ‚îÇ   links     ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò       ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò       ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
       ‚îÇ                    ‚îÇ
       ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                ‚ñº
          ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
          ‚îÇ    users    ‚îÇ
          ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                ‚îÇ
                ‚ñº
          ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
          ‚îÇ  watchlist  ‚îÇ
          ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

In [None]:
# Define our database schema
schema_sql = """
-- Core movie catalog with vector embeddings for semantic search
CREATE TABLE IF NOT EXISTS movies (
    movie_id INTEGER PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    year INTEGER,
    summary TEXT,
    summary_embedding VECTOR(3072)
);

-- Genre lookup table
CREATE TABLE IF NOT EXISTS genres (
    genre_id SERIAL PRIMARY KEY,
    genre_name VARCHAR(50) UNIQUE NOT NULL
);

-- Many-to-many junction table for movie genres
CREATE TABLE IF NOT EXISTS movie_genres (
    movie_id INTEGER REFERENCES movies(movie_id) ON DELETE CASCADE,
    genre_id INTEGER REFERENCES genres(genre_id) ON DELETE CASCADE,
    PRIMARY KEY (movie_id, genre_id)
);

-- User profiles (extracted from ratings data)
CREATE TABLE IF NOT EXISTS users (
    user_id INTEGER PRIMARY KEY,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Historical ratings for analytics
CREATE TABLE IF NOT EXISTS ratings (
    rating_id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(user_id) ON DELETE CASCADE,
    movie_id INTEGER REFERENCES movies(movie_id) ON DELETE CASCADE,
    rating NUMERIC(2,1) NOT NULL CHECK (rating >= 0.5 AND rating <= 5.0),
    rated_at TIMESTAMP
);

-- User-generated tags for semantic analysis
CREATE TABLE IF NOT EXISTS tags (
    tag_id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(user_id) ON DELETE CASCADE,
    movie_id INTEGER REFERENCES movies(movie_id) ON DELETE CASCADE,
    tag_text VARCHAR(255) NOT NULL,
    tagged_at TIMESTAMP
);

-- User watchlists (for transactional operations)
CREATE TABLE IF NOT EXISTS watchlist (
    user_id INTEGER REFERENCES users(user_id) ON DELETE CASCADE,
    movie_id INTEGER REFERENCES movies(movie_id) ON DELETE CASCADE,
    added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id, movie_id)
);

-- External database links (IMDb, TMDb)
CREATE TABLE IF NOT EXISTS links (
    movie_id INTEGER PRIMARY KEY REFERENCES movies(movie_id) ON DELETE CASCADE,
    imdb_id VARCHAR(20),
    tmdb_id INTEGER
);
"""

# Execute the schema
conn = get_connection(DB_NAME)
cursor = conn.cursor()
cursor.execute(schema_sql)
conn.commit()
cursor.close()
conn.close()

print("‚úÖ Database schema created!")
print("\nüìã Tables created:")
print("   ‚Ä¢ movies (with VECTOR(3072) for embeddings)")
print("   ‚Ä¢ genres")
print("   ‚Ä¢ movie_genres (junction table)")
print("   ‚Ä¢ users")
print("   ‚Ä¢ ratings")
print("   ‚Ä¢ tags")
print("   ‚Ä¢ watchlist")
print("   ‚Ä¢ links (IMDb/TMDb IDs)")

In [None]:
# Grant permissions to the application service account (for Cloud Run deployment)
app_sa_db_user = f"cymbalflix-app@{PROJECT_ID}.iam"

grant_sql = f'''
GRANT USAGE ON SCHEMA public TO "{app_sa_db_user}";
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA public TO "{app_sa_db_user}";
'''

conn = get_connection(DB_NAME)
conn.autocommit = True
cursor = conn.cursor()
cursor.execute(grant_sql)
cursor.close()
conn.close()

print(f"‚úÖ Granted database permissions to {app_sa_db_user}")
print("   (This enables Cloud Run deployment later)")

---
## Step 6: Load Data via GCS Import

Now comes the fun part‚Äîloading our MovieLens data! We'll use **AlloyDB's native GCS import** for blazing-fast bulk loading:

1. **Transform** - Clean and prepare data in pandas DataFrames
2. **Stage** - Upload transformed CSVs to a GCS bucket
3. **Import** - Use `gcloud alloydb clusters import` for server-side loading

**Why GCS import?** It's the fastest way to load data into AlloyDB because:
- Data flows directly from GCS to AlloyDB (no client bottleneck)
- Uses optimized server-side COPY operations
- Can load 100K+ rows in **seconds** instead of minutes

Let's start by setting up our staging bucket and helper functions:

In [None]:
# Create a staging bucket for our transformed data
STAGING_BUCKET = PROJECT_ID  # Use project ID as bucket name (guaranteed unique)

# Get project number for service account
import subprocess
result = subprocess.run(
    ['gcloud', 'projects', 'describe', PROJECT_ID, '--format=value(projectNumber)'],
    capture_output=True, text=True
)
PROJECT_NUMBER = result.stdout.strip()
ALLOYDB_SA = f"service-{PROJECT_NUMBER}@gcp-sa-alloydb.iam.gserviceaccount.com"

print(f"üì¶ Staging bucket: gs://{STAGING_BUCKET}")
print(f"üîê AlloyDB service account: {ALLOYDB_SA}")

In [None]:
%%bash -s "$STAGING_BUCKET" "$ALLOYDB_SA" "$REGION"
BUCKET=$1
SA=$2
REGION=$3

# Create bucket if it doesn't exist
if ! gcloud storage buckets describe gs://$BUCKET &>/dev/null; then
    echo "üì¶ Creating staging bucket gs://$BUCKET..."
    gcloud storage buckets create gs://$BUCKET --location=$REGION
else
    echo "üì¶ Bucket gs://$BUCKET already exists"
fi

# Grant AlloyDB service account read access
echo "üîê Granting AlloyDB service account access..."
gcloud storage buckets add-iam-policy-binding gs://$BUCKET \
    --member="serviceAccount:$SA" \
    --role="roles/storage.objectViewer" \
    --quiet

echo "‚úÖ Bucket ready for staging!"

In [None]:
def load_csv_from_gcs(bucket_path, filename):
    """Load a CSV file from GCS into a pandas DataFrame."""
    path = bucket_path
    if path.startswith("gs://"):
        path = path[5:]

    if "/" in path:
        parts = path.split("/", 1)
        bucket_name = parts[0]
        blob_path = f"{parts[1]}/{filename}"
    else:
        bucket_name = path
        blob_path = filename

    client = storage.Client(project=PROJECT_ID)
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_path)

    content = blob.download_as_text()
    return pd.read_csv(io.StringIO(content))


def upload_csv_to_gcs(df, filename, columns=None):
    """Upload a DataFrame as CSV to our staging bucket."""
    client = storage.Client(project=PROJECT_ID)
    bucket = client.bucket(STAGING_BUCKET)
    blob = bucket.blob(f"cymbalflix/{filename}")

    # Select columns if specified
    if columns:
        df = df[columns]

    # Convert to CSV without index
    csv_data = df.to_csv(index=False, header=False)
    blob.upload_from_string(csv_data, content_type='text/csv')

    return f"gs://{STAGING_BUCKET}/cymbalflix/{filename}"


def import_csv_to_alloydb(gcs_uri, table_name, columns):
    """Import a CSV from GCS into AlloyDB using gcloud."""
    import subprocess

    # Build column list for --columns flag
    columns_str = ','.join(columns)

    cmd = [
        'gcloud', 'alloydb', 'clusters', 'import',
        CLUSTER_ID,
        f'--region={REGION}',
        f'--gcs-uri={gcs_uri}',
        f'--database={DB_NAME}',
        f'--user={USER_EMAIL}',
        '--csv',
        f'--table={table_name}',
        f'--columns={columns_str}',
        '--quiet'
    ]

    result = subprocess.run(cmd, capture_output=True, text=True)

    if result.returncode != 0:
        print(f"‚ùå Error importing {table_name}: {result.stderr}")
        raise Exception(result.stderr)

    return True


print("‚úÖ Helper functions ready!")

### 6.1 Load and Transform Movies

The MovieLens dataset stores the year in the title (e.g., "Jumanji (1995)"). We'll extract it into a separate column for better querying and analytics.

In [None]:
# Load movies from GCS
print("üì• Loading movies.csv from GCS...")
movies_df = load_csv_from_gcs(DATA_BUCKET, "movies.csv")
print(f"   Loaded {len(movies_df):,} movies")

# Extract year from title using regex
def extract_year_and_clean_title(title):
    match = re.search(r'\s*\((\d{4})\)\s*$', str(title))
    if match:
        year = int(match.group(1))
        clean_title = re.sub(r'\s*\(\d{4}\)\s*$', '', title).strip()
        return clean_title, year
    return title, None

movies_df[['clean_title', 'year']] = movies_df['title'].apply(
    lambda x: pd.Series(extract_year_and_clean_title(x))
)
movies_df['title'] = movies_df['clean_title']
movies_df = movies_df.drop(columns=['clean_title'])

# Store genres for later processing
movies_with_genres = movies_df[['movieId', 'genres']].copy()

print("\n‚úÖ Movies processed!")
display(movies_df[['movieId', 'title', 'year']].head())

### 6.2 Load and Merge Summaries

The summaries were generated using Gemini to provide rich, searchable descriptions of each movie.

In [None]:
# Load summaries
print("üì• Loading summaries.csv from GCS...")
summaries_df = load_csv_from_gcs(DATA_BUCKET, "summaries.csv")
print(f"   Loaded {len(summaries_df):,} summaries")

# Merge summaries into movies
movies_df = movies_df.merge(summaries_df, on='movieId', how='left')

print("\n‚úÖ Summaries merged!")

sample_movie = movies_df.iloc[0]
if pd.notna(sample_movie.get('summary')):
    print(f"\nüìù Sample summary for '{sample_movie['title']}':")
    print(f"   {sample_movie['summary'][:250]}...")

### 6.3 Load and Merge Embeddings

The embeddings are 3072-dimensional vectors from Gemini's embedding model, enabling semantic similarity search.

In [None]:
# Load embeddings
print("üì• Loading embeddings.csv from GCS...")
embeddings_df = load_csv_from_gcs(DATA_BUCKET, "embeddings.csv")
print(f"   Loaded {len(embeddings_df):,} embeddings")

# Merge embeddings into movies
movies_df = movies_df.merge(embeddings_df, on='movieId', how='left')

sample_embedding = movies_df.iloc[0].get('embedding')
if pd.notna(sample_embedding):
    try:
        embedding_values = json.loads(sample_embedding)
        print(f"\n‚úÖ Embeddings merged!")
        print(f"   Dimensions: {len(embedding_values)}")
    except:
        print("\n‚úÖ Embeddings merged!")

### 6.4 Prepare All Data for Import

Now we'll load the remaining data (ratings, tags, links) and prepare all DataFrames for CSV export.

In [None]:
# Load ratings
print("üì• Loading ratings.csv from GCS...")
ratings_df = load_csv_from_gcs(DATA_BUCKET, "ratings.csv")
print(f"   Loaded {len(ratings_df):,} ratings")

# Load tags
print("üì• Loading tags.csv from GCS...")
tags_df = load_csv_from_gcs(DATA_BUCKET, "tags.csv")
print(f"   Loaded {len(tags_df):,} tags")

# Load links
print("üì• Loading links.csv from GCS...")
links_df = load_csv_from_gcs(DATA_BUCKET, "links.csv")
print(f"   Loaded {len(links_df):,} links")

print("\n‚úÖ All source data loaded!")

In [None]:
# Prepare movies DataFrame
movies_load_df = movies_df[['movieId', 'title', 'year', 'summary', 'embedding']].copy()
movies_load_df.columns = ['movie_id', 'title', 'year', 'summary', 'summary_embedding']
# Handle NaN years - use empty string for CSV
movies_load_df['year'] = movies_load_df['year'].apply(
    lambda x: int(x) if pd.notna(x) else ''
)

# Prepare genres DataFrame
all_genres = set()
for genres_str in movies_with_genres['genres']:
    if pd.notna(genres_str) and genres_str != '(no genres listed)':
        all_genres.update(genres_str.split('|'))
genres_load_df = pd.DataFrame({'genre_name': sorted(all_genres)})
genre_lookup = {name: idx + 1 for idx, name in enumerate(sorted(all_genres))}

# Prepare movie_genres junction table
junction_records = []
for _, row in movies_with_genres.iterrows():
    if pd.notna(row['genres']) and row['genres'] != '(no genres listed)':
        movie_id = int(row['movieId'])
        for genre in row['genres'].split('|'):
            if genre in genre_lookup:
                junction_records.append({
                    'movie_id': movie_id,
                    'genre_id': genre_lookup[genre]
                })
movie_genres_load_df = pd.DataFrame(junction_records)

# Prepare users DataFrame
users_load_df = pd.DataFrame({'user_id': sorted(ratings_df['userId'].unique())})

# Prepare ratings DataFrame
ratings_load_df = ratings_df[['userId', 'movieId', 'rating', 'timestamp']].copy()
ratings_load_df.columns = ['user_id', 'movie_id', 'rating', 'rated_at']
ratings_load_df['rated_at'] = pd.to_datetime(ratings_load_df['rated_at'], unit='s')

# Prepare tags DataFrame
tags_load_df = tags_df[['userId', 'movieId', 'tag', 'timestamp']].copy()
tags_load_df.columns = ['user_id', 'movie_id', 'tag_text', 'tagged_at']
tags_load_df['tagged_at'] = pd.to_datetime(tags_load_df['tagged_at'], unit='s')

# Prepare links DataFrame
links_load_df = links_df[['movieId', 'imdbId', 'tmdbId']].copy()
links_load_df.columns = ['movie_id', 'imdb_id', 'tmdb_id']
links_load_df['imdb_id'] = links_load_df['imdb_id'].apply(
    lambda x: f"tt{int(x):07d}" if pd.notna(x) else ''
)
links_load_df['tmdb_id'] = links_load_df['tmdb_id'].apply(
    lambda x: int(x) if pd.notna(x) else ''
)

print("üìä Prepared DataFrames:")
print(f"   movies:       {len(movies_load_df):>8,} rows")
print(f"   genres:       {len(genres_load_df):>8,} rows")
print(f"   movie_genres: {len(movie_genres_load_df):>8,} rows")
print(f"   users:        {len(users_load_df):>8,} rows")
print(f"   ratings:      {len(ratings_load_df):>8,} rows")
print(f"   tags:         {len(tags_load_df):>8,} rows")
print(f"   links:        {len(links_load_df):>8,} rows")
print("\n‚úÖ All data prepared for import!")

### 6.5 Upload to Staging Bucket

Now we'll upload all the prepared CSVs to our staging bucket.

In [None]:
print("üì§ Uploading transformed data to GCS...\n")

# Upload each DataFrame
uploads = [
    ('movies.csv', movies_load_df, ['movie_id', 'title', 'year', 'summary', 'summary_embedding']),
    ('genres.csv', genres_load_df, ['genre_name']),
    ('movie_genres.csv', movie_genres_load_df, ['movie_id', 'genre_id']),
    ('users.csv', users_load_df, ['user_id']),
    ('ratings.csv', ratings_load_df, ['user_id', 'movie_id', 'rating', 'rated_at']),
    ('tags.csv', tags_load_df, ['user_id', 'movie_id', 'tag_text', 'tagged_at']),
    ('links.csv', links_load_df, ['movie_id', 'imdb_id', 'tmdb_id']),
]

gcs_uris = {}
for filename, df, columns in uploads:
    uri = upload_csv_to_gcs(df, filename, columns)
    table_name = filename.replace('.csv', '')
    gcs_uris[table_name] = (uri, columns)
    print(f"   ‚úÖ {filename}: {len(df):,} rows")

print(f"\nüì¶ All files staged at gs://{STAGING_BUCKET}/cymbalflix/")

### 6.6 Import Data into AlloyDB

Now for the fast part! We'll use `gcloud alloydb clusters import` to load data directly from GCS into AlloyDB. This is **much faster** than client-side inserts because:

- Data flows directly from GCS to AlloyDB
- No network bottleneck through our notebook
- Uses optimized server-side COPY operations

‚è±Ô∏è **Expected time:** ~90 seconds for all 7 tables (vs 13+ minutes with client inserts!)

In [None]:
import time

# Import order matters due to foreign key constraints
# Tables with no dependencies first, then dependent tables
import_order = [
    ('movies', ['movie_id', 'title', 'year', 'summary', 'summary_embedding']),
    ('genres', ['genre_name']),
    ('movie_genres', ['movie_id', 'genre_id']),
    ('users', ['user_id']),
    ('ratings', ['user_id', 'movie_id', 'rating', 'rated_at']),
    ('tags', ['user_id', 'movie_id', 'tag_text', 'tagged_at']),
    ('links', ['movie_id', 'imdb_id', 'tmdb_id']),
]

print("üöÄ Importing data into AlloyDB...\n")
total_start = time.time()

for table_name, columns in import_order:
    uri = f"gs://{STAGING_BUCKET}/cymbalflix/{table_name}.csv"
    print(f"   üì• Importing {table_name}...", end=" ", flush=True)

    start = time.time()
    import_csv_to_alloydb(uri, table_name, columns)
    elapsed = time.time() - start

    print(f"‚úÖ ({elapsed:.1f}s)")

total_elapsed = time.time() - total_start
print(f"\nüéâ All data imported in {total_elapsed:.1f} seconds!")

---
## Step 7: Verify Your Data

Let's make sure everything loaded correctly with some verification queries.

In [None]:
# Verification queries
conn = get_connection(DB_NAME)
cursor = conn.cursor()

verification_queries = [
    ("movies", "SELECT COUNT(*) FROM movies"),
    ("  ‚îî‚îÄ with summaries", "SELECT COUNT(*) FROM movies WHERE summary IS NOT NULL"),
    ("  ‚îî‚îÄ with embeddings", "SELECT COUNT(*) FROM movies WHERE summary_embedding IS NOT NULL"),
    ("genres", "SELECT COUNT(*) FROM genres"),
    ("movie_genres", "SELECT COUNT(*) FROM movie_genres"),
    ("users", "SELECT COUNT(*) FROM users"),
    ("ratings", "SELECT COUNT(*) FROM ratings"),
    ("tags", "SELECT COUNT(*) FROM tags"),
    ("links", "SELECT COUNT(*) FROM links"),
]

print("üìä Data Verification Report")
print("=" * 45)

for name, query in verification_queries:
    cursor.execute(query)
    count = cursor.fetchone()[0]
    print(f"   {name}: {count:,}")

cursor.close()
conn.close()

print("=" * 45)
print("\n‚úÖ All data loaded successfully!")

In [None]:
# Sample query: Top-rated movies with their genres
sample_query = """
SELECT
    m.title,
    m.year,
    ROUND(AVG(r.rating)::numeric, 2) as avg_rating,
    COUNT(r.rating_id) as num_ratings,
    STRING_AGG(DISTINCT g.genre_name, ', ' ORDER BY g.genre_name) as genres
FROM movies m
JOIN ratings r ON m.movie_id = r.movie_id
JOIN movie_genres mg ON m.movie_id = mg.movie_id
JOIN genres g ON mg.genre_id = g.genre_id
GROUP BY m.movie_id, m.title, m.year
HAVING COUNT(r.rating_id) >= 50
ORDER BY avg_rating DESC, num_ratings DESC
LIMIT 10;
"""

conn = get_connection(DB_NAME)
result_df = pd.read_sql(sample_query, conn)
conn.close()

print("üèÜ Top 10 Highest-Rated Movies (minimum 50 ratings):")
display(result_df)

---
## Step 8: Create the ScaNN Index

Now for the feature that makes AlloyDB special for AI workloads‚Äîthe **ScaNN index**.

**What is ScaNN?** Scalable Nearest Neighbors is Google's algorithm for fast vector similarity search. It's the same technology that powers Google Search's ability to find similar content across billions of documents.

**Why do we need it?** Without an index, finding similar movies requires comparing your query vector against every single movie‚Äîthat's 9,700 comparisons. With ScaNN, the search narrows to a small subset almost instantly.

| Without ScaNN | With ScaNN |
|--------------|------------|
| Compare against all 9,700 movies | Compare against ~50 candidates |
| Linear time O(n) | Logarithmic time O(log n) |
| ~100ms per query | ~5ms per query |

In [None]:
# Create the ScaNN index
conn = get_connection(DB_NAME)
conn.autocommit = True
cursor = conn.cursor()

print("üîß Creating ScaNN index on movie embeddings...")
print("   This may take a moment...\n")

try:
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS movies_embedding_scann_idx
        ON movies USING scann (summary_embedding cosine)
        WITH (num_leaves = 50, quantizer = 'sq8');
    """)
    print("‚úÖ ScaNN index created!")
    print("\nüìä Index configuration:")
    print("   ‚Ä¢ Distance metric: cosine (measures angle between vectors)")
    print("   ‚Ä¢ num_leaves: 50 (partitions for efficient search)")
    print("   ‚Ä¢ quantizer: sq8 (8-bit scalar quantization for speed)")
except Exception as e:
    if "already exists" in str(e).lower():
        print("‚ÑπÔ∏è  ScaNN index already exists")
    else:
        print(f"‚ö†Ô∏è  Could not create index: {e}")

cursor.close()
conn.close()

---
## Step 9: Semantic Search Demo üéØ

This is the payoff! Let's see semantic search in action.

**How it works:**
1. Your search query gets converted to a 3072-dimensional vector using Gemini's embedding model
2. AlloyDB uses the ScaNN index to find movies with similar vectors
3. Results are ranked by cosine similarity (1.0 = identical, 0.0 = completely different)


In [None]:
def semantic_search(query, limit=5):
    """
    Search for movies using semantic similarity.

    This converts your natural language query into a vector,
    then finds movies with similar vectors.
    """
    conn = get_connection(DB_NAME)

    search_sql = """
    WITH query_embedding AS (
    SELECT embedding(
        'gemini-embedding-001',   -- no registration needed
            %s                        -- the query text from Python
        )::vector AS embedding
    )
    SELECT
        m.title,
        m.year,
        ROUND((1 - (m.summary_embedding <=> q.embedding))::numeric, 3) AS similarity,
        LEFT(m.summary, 150) || '...' AS summary_preview
    FROM movies m
    CROSS JOIN query_embedding q
    WHERE m.summary_embedding IS NOT NULL
    ORDER BY m.summary_embedding <=> q.embedding
    LIMIT %s;

    """

    result = pd.read_sql(search_sql, conn, params=(query, limit))
    conn.close()
    return result

print("‚úÖ Semantic search function ready!")

In [None]:
# Demo 1: Conceptual search
print("üîç Search: 'A movie about artificial intelligence becoming self-aware'")
print("=" * 70)
results = semantic_search("A movie about artificial intelligence becoming self-aware")
display(results)

In [None]:
# Demo 2: Emotional/thematic search
print("üîç Search: 'Heartwarming story about unlikely friendship'")
print("=" * 70)
results = semantic_search("Heartwarming story about unlikely friendship")
display(results)

In [None]:
# Demo 3: Compare semantic vs. what keyword search would find
print("üîç Search: 'space adventure'")
print("=" * 70)
print("\nüìä Semantic Search Results (finds movies by MEANING):")
results = semantic_search("space adventure")
display(results)

# Now show what a simple keyword search would find
print("\nüìä Traditional Keyword Search (finds movies by EXACT WORDS):")
conn = get_connection(DB_NAME)
keyword_results = pd.read_sql("""
    SELECT title, year, LEFT(summary, 100) || '...' as summary_preview
    FROM movies
    WHERE LOWER(title) LIKE '%space%'
       OR LOWER(summary) LIKE '%space adventure%'
    LIMIT 5;
""", conn)
conn.close()
display(keyword_results)

print("\nüí° Notice how semantic search finds thematically similar movies")
print("   even if 'space adventure' doesn't appear in the text!")

---
## Step 10: Verify Columnar Engine

AlloyDB's columnar engine accelerates analytical queries by up to 100x. It works automatically‚ÄîAlloyDB identifies analytical query patterns and creates optimized columnar representations.

Let's verify it's enabled on your instance:

In [None]:
# Check columnar engine settings
conn = get_connection(DB_NAME)
cursor = conn.cursor()

print("üîß Columnar Engine Configuration")
print("=" * 50)

cursor.execute("""
    SELECT name, setting, short_desc
    FROM pg_settings
    WHERE name LIKE '%columnar%' OR name LIKE '%google_columnar%'
    ORDER BY name;
""")

results = cursor.fetchall()
if results:
    for name, setting, desc in results:
        print(f"   {name}: {setting}")
    print("\n‚úÖ Columnar engine is configured!")
    print("   Analytical queries will be automatically accelerated.")
else:
    print("   No columnar settings found (may be auto-configured)")

cursor.close()
conn.close()

---
## üéâ Congratulations!

Your CymbalFlix database is fully operational! Here's what you've accomplished:

### Database Setup
- ‚úÖ Connected to AlloyDB using **IAM authentication** (no passwords!)
- ‚úÖ Created a dedicated `cymbalflix` database
- ‚úÖ Enabled vector, ScaNN, and ML integration extensions
- ‚úÖ Registered Vertex AI model endpoints

### Data Loading
- ‚úÖ Bulk loaded ~9,700 movies with AI-generated summaries using **batch inserts**
- ‚úÖ Added 3072-dimensional vector embeddings for semantic search
- ‚úÖ Normalized genres into a proper relational structure
- ‚úÖ Loaded 100,000+ ratings and 3,600+ tags at high speed
- ‚úÖ Added external links (IMDb, TMDb)

### AI Features
- ‚úÖ Created a ScaNN index for lightning-fast vector similarity
- ‚úÖ Tested semantic search that finds movies by meaning
- ‚úÖ Verified columnar engine for analytical acceleration

### Performance Highlight ‚ö°

By using batch loading with `executemany()` instead of row-by-row inserts, we achieved:
- **10-50x faster** data loading
- Reduced round-trips to the database
- Efficient batched transactions

### Security Highlight üîê

Notice how we never handled a database password? That's **IAM authentication** in action:
- Your Google Cloud identity IS your database identity
- The Python Connector handles secure token exchange automatically
- No credentials to rotate, leak, or accidentally commit to Git

This is the **production-ready** way to handle database authentication in Google Cloud.

---

### What's Next?

Return to the lab instructions for **Task 4**, where you'll build the CymbalFlix Discover web application using Streamlit. You'll create a user interface that lets anyone search for movies semantically and explore AI-powered recommendations!

üé¨ Your database is ready to power an AI-driven movie discovery experience! ü§ñ

In [None]:
# Cleanup: Close the connector when done
# Uncomment the line below when you're finished with the notebook
# connector.close()
# print("‚úÖ Connector closed.")