<a href="https://colab.research.google.com/github/MaxMatteucci/mgmt467-analytics-portfolio/blob/main/Unit2_Lab1_PromptPlusExamples_Colab_Kaggle_GCS_BQ_DQ.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [1]:
# ===== Google Cloud Auth + Project Setup (Fixed) =====
from google.colab import auth
auth.authenticate_user()

import os

# Your correct and accessible MGMT 467 project
PROJECT_ID = "database-project-467"
REGION = "us-central1"

os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["REGION"] = REGION

print(f"✅ Authenticated and configured for project: {PROJECT_ID} | region: {REGION}")

# Confirm project in gcloud CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT
!gcloud config get-value project


✅ Authenticated and configured for project: database-project-467 | region: us-central1
Updated property [core/project].
database-project-467


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [2]:
# Verify the active project and region
!gcloud config get-value project
import os
print(f"REGION: {os.environ.get('REGION', 'REGION not set')}")

database-project-467
REGION: us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

If we do not set a project_id and region, the computer will not know which project it is going to, and where the data is from potentially leading to vastly increased costs.  I faced this issue earlier in the class, getting stucl, but now that I know how to fix it, I am able to get it right every time.

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [3]:
# ===== Kaggle Setup in Google Colab =====
from google.colab import files
import os

# Prompt: upload your Kaggle API key (downloaded as kaggle.json from your Kaggle account)
print("📁 Upload your kaggle.json file (Kaggle → Account → Create New API Token)")
uploaded = files.upload()

# Create the hidden .kaggle directory if it doesn't exist
os.makedirs("/root/.kaggle", exist_ok=True)

# Save the uploaded API key file securely
kaggle_json_path = "/root/.kaggle/kaggle.json"
with open(kaggle_json_path, "wb") as f:
    f.write(uploaded[list(uploaded.keys())[0]])

# Restrict permissions to the file (owner-only access)
os.chmod(kaggle_json_path, 0o600)

# Verify Kaggle CLI installation
!kaggle --version

print("✅ Kaggle API is set up and ready to use!")


📁 Upload your kaggle.json file (Kaggle → Account → Create New API Token)


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5
✅ Kaggle API is set up and ready to use!


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [4]:
!kaggle --help | head -n 20  # Verify Kaggle CLI is installed and display first 20 help lines


usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

We require strict 0600 permissions on API tokens because they contain private credentials that grant full access to your Kaggle (or any other service’s) account. “0600” means only I can read or write the token and no one else, not even other users on the same system, can view it.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [5]:
# ===== Download & Unzip Netflix Dataset =====
# Dataset: sayeeduddin/netflix-2025user-behavior-dataset-210k-records

import os
import shutil
import pandas as pd
import kagglehub
from zipfile import ZipFile

# --- Step 1: Create predictable raw-data folder ---
os.makedirs("/content/data/raw", exist_ok=True)

# --- Step 2: Download latest dataset version from Kaggle ---
print("📥 Downloading dataset...")
path = kagglehub.dataset_download("sayeeduddin/netflix-2025user-behavior-dataset-210k-records")
print("✅ Download complete. Path to cached dataset files:", path)

# --- Step 3: Unzip dataset into /content/data/raw (overwrite OK) ---
for item in os.listdir(path):
    src = os.path.join(path, item)
    dst = os.path.join("/content/data/raw", item)
    if os.path.isdir(src):
        shutil.copytree(src, dst, dirs_exist_ok=True)
    else:
        shutil.copy2(src, dst)

print("\n📂 Files extracted to /content/data/raw")

# --- Step 4: List CSVs with sizes in a neat table ---
csv_files = []
for root, _, files in os.walk("/content/data/raw"):
    for file in files:
        if file.endswith(".csv"):
            file_path = os.path.join(root, file)
            size_mb = os.path.getsize(file_path) / (1024 * 1024)
            csv_files.append({"File": file, "Size (MB)": round(size_mb, 2)})

df = pd.DataFrame(csv_files)
print("\n📄 CSV files found:")
display(df)


📥 Downloading dataset...
Downloading from https://www.kaggle.com/api/v1/datasets/download/sayeeduddin/netflix-2025user-behavior-dataset-210k-records?dataset_version_number=1...


100%|██████████| 4.02M/4.02M [00:00<00:00, 119MB/s]

Extracting files...





✅ Download complete. Path to cached dataset files: /root/.cache/kagglehub/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records/versions/1

📂 Files extracted to /content/data/raw

📄 CSV files found:


Unnamed: 0,File,Size (MB)
0,reviews.csv,1.78
1,search_logs.csv,2.15
2,watch_history.csv,8.84
3,recommendation_logs.csv,4.48
4,users.csv,1.53
5,movies.csv,0.11


### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [6]:

display(df)

Unnamed: 0,File,Size (MB)
0,reviews.csv,1.78
1,search_logs.csv,2.15
2,watch_history.csv,8.84
3,recommendation_logs.csv,4.48
4,users.csv,1.53
5,movies.csv,0.11


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?


CHECK

Transparency & Reproducibility
A clear record of what files were used (and their exact sizes) makes your workflow auditable and repeatable.
You or a teammate can re-run the project knowing which version of each file was used.
If results ever differ, you can confirm whether a dataset changed or if a file was replaced.  

Pipeline Integrity & Debugging
When working with multiple CSVs, clean inventories prevent subtle bugs like missing or duplicate files, accidental overwrites, and incorrect joins from mis-named or truncated data sources.
Quickly checking file sizes can also reveal corruption or incomplete downloads a 0 MB file is a red flag before you even load it into memory.

Automation & Scalability
Many downstream scripts (i.e., preprocessing or model training pipelines) can loop through an inventory rather than hardcoding file names.  If we do not have a clean file inventory, we will struggle to effectively streamline the overall process.


Storage Management & Efficiency
Knowing which files are largest helps with prioritizing compression or sampling, deciding what to archive versus keep in active use, and monitoring disk usage to prevent environments from filling up unexpectedly.  We can automaticaly log file size, spending, and all of that, we can also specifiy region easily.

Context for Data Understanding
A dataset’s structure often reveals itself through its file breakdown.
Filenames can hint at table purpose (i.e., users.csv, ratings.csv, sessions.csv).
File size differences often reflect data granularity (i.e., “transactions” vs. “customers”).
This helps plan join logic, feature engineering, and modeling strategy early on.


---
Essentially it is very useful because it allows for better integration, it helps knowing the size of our files, it also is helpful for not having duplicates or missing files, and streamlines the entire process.

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [7]:
import uuid, os

REGION = "us-central1"  # or your desired region
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Create the bucket
!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION

# Upload all CSVs in your raw folder
!gcloud storage cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# Print summary
print(f"\n✅ Bucket created and files staged at: gs://{bucket_name}/netflix/\n")
print("Staging ensures your pipeline is reproducible and scalable — "
      "data is versioned, centralized, and safely accessible from GCP.")


Creating gs://mgmt467-netflix-48f66522/...
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-48f66522/netflix/movies.csv
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-48f66522/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-48f66522/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-48f66522/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-48f66522/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-48f66522/netflix/watch_history.csv

Average throughput: 37.6MiB/s

✅ Bucket created and files staged at: gs://mgmt467-netflix-48f66522/netflix/

Staging ensures your pipeline is reproducible and scalable — data is versioned, centralized, and safely accessible from GCP.


### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [8]:
# List all objects under the netflix/ prefix with sizes
!gcloud storage ls -l gs://$BUCKET_NAME/netflix/


    115942  2025-10-23T18:25:43Z  gs://mgmt467-netflix-48f66522/netflix/movies.csv
   4695557  2025-10-23T18:25:43Z  gs://mgmt467-netflix-48f66522/netflix/recommendation_logs.csv
   1861942  2025-10-23T18:25:43Z  gs://mgmt467-netflix-48f66522/netflix/reviews.csv
   2250902  2025-10-23T18:25:43Z  gs://mgmt467-netflix-48f66522/netflix/search_logs.csv
   1606820  2025-10-23T18:25:43Z  gs://mgmt467-netflix-48f66522/netflix/users.csv
   9269425  2025-10-23T18:25:43Z  gs://mgmt467-netflix-48f66522/netflix/watch_history.csv
TOTAL: 6 objects, 19800588 bytes (18.88MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from

---

local Colab.

Benefit 1: Reproducability, this is accessible for all team members, not just in one colab session.  Furthermore, other pipelines can run my exact same analysis without reuploading or depending on my specific files.

Benefit 2: We can use GCP tools such as VertexAI, BigQuery or other cloud features to give us more tools at our disposal for analysis.  All of these cloud features allow us to better manipulate and analyze data, and having these at our disposal throught GCP radically changes how we can work with the data.  I have personally been using these concepts in other classes with things such as case competitions for easily manipulation and usability.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [9]:
# # Cell A — Create (idempotently) dataset "netflix" in US multi-region
# # If it already exists, print a friendly message

DATASET="netflix"
!bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "✅ Dataset '$DATASET' already exists — skipping creation."


BigQuery error in mk operation: Dataset 'database-project-467:netflix' already
exists.
✅ Dataset '' already exists — skipping creation.


In [10]:
# # Cell B — Load tables from gs://$BUCKET_NAME/netflix/
# # Loads users, movies, watch_history, recommendation_logs, search_logs, reviews
# # using --skip_leading_rows=1 --autodetect --source_format=CSV
# # Finishes with row-count queries for each table.

import os
os.environ["DATASET"] = "netflix"


tables = {
    "users": "users.csv",
    "movies": "movies.csv",
    "watch_history": "watch_history.csv",
    "recommendation_logs": "recommendation_logs.csv",
    "search_logs": "search_logs.csv",
    "reviews": "reviews.csv",
}

import os, subprocess, textwrap

dataset = os.environ["DATASET"]
bucket = os.environ["BUCKET_NAME"]

# ===== Load each CSV into BigQuery =====
for tbl, fname in tables.items():
    src = f"gs://{bucket}/netflix/{fname}"
    print(f"📥 Loading {tbl} from {src}")
    subprocess.run([
        "bq", "load",
        "--skip_leading_rows=1",
        "--autodetect",
        "--source_format=CSV",
        f"{dataset}.{tbl}",
        src
    ], check=False)

print("\n✅ All loads attempted. Verifying row counts:\n")

# ===== Row-count verification =====
project = os.environ["GOOGLE_CLOUD_PROJECT"]
query = textwrap.dedent(f"""
    SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `{project}.{dataset}.users`
    UNION ALL
    SELECT 'movies', COUNT(*) AS row_count FROM `{project}.{dataset}.movies`
    UNION ALL
    SELECT 'watch_history', COUNT(*) AS row_count FROM `{project}.{dataset}.watch_history`
    UNION ALL
    SELECT 'recommendation_logs', COUNT(*) AS row_count FROM `{project}.{dataset}.recommendation_logs`
    UNION ALL
    SELECT 'search_logs', COUNT(*) AS row_count FROM `{project}.{dataset}.search_logs`
    UNION ALL
    SELECT 'reviews', COUNT(*) AS row_count FROM `{project}.{dataset}.reviews`
""").strip()

# Run verification query safely (no shell interpolation issues)
result = subprocess.run(
    ["bq", "query", "--nouse_legacy_sql", query],
    text=True, capture_output=True
)
print(result.stdout or result.stderr)


📥 Loading users from gs://mgmt467-netflix-48f66522/netflix/users.csv
📥 Loading movies from gs://mgmt467-netflix-48f66522/netflix/movies.csv
📥 Loading watch_history from gs://mgmt467-netflix-48f66522/netflix/watch_history.csv
📥 Loading recommendation_logs from gs://mgmt467-netflix-48f66522/netflix/recommendation_logs.csv
📥 Loading search_logs from gs://mgmt467-netflix-48f66522/netflix/search_logs.csv
📥 Loading reviews from gs://mgmt467-netflix-48f66522/netflix/reviews.csv

✅ All loads attempted. Verifying row counts:

+---------------------+-----------+
|     table_name      | row_count |
+---------------------+-----------+
| movies              |      3120 |
| reviews             |     46350 |
| recommendation_logs |    156000 |
| search_logs         |     79500 |
| users               |     30900 |
| watch_history       |    315000 |
+---------------------+-----------+



### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [14]:
# ✅ Verification Prompt — Return table_name and row_count for all six tables (as DataFrame)

import os
import subprocess
import textwrap
import pandas as pd
from io import StringIO

project = os.environ["GOOGLE_CLOUD_PROJECT"]
dataset = "netflix"

query = textwrap.dedent(f"""
    SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `{project}.{dataset}.users`
    UNION ALL
    SELECT 'movies', COUNT(*) AS row_count FROM `{project}.{dataset}.movies`
    UNION ALL
    SELECT 'watch_history', COUNT(*) AS row_count FROM `{project}.{dataset}.watch_history`
    UNION ALL
    SELECT 'recommendation_logs', COUNT(*) AS row_count FROM `{project}.{dataset}.recommendation_logs`
    UNION ALL
    SELECT 'search_logs', COUNT(*) AS row_count FROM `{project}.{dataset}.search_logs`
    UNION ALL
    SELECT 'reviews', COUNT(*) AS row_count FROM `{project}.{dataset}.reviews`
""").strip()

# ✅ Run query safely, output as CSV
result = subprocess.run(
    ["bq", "query", "--nouse_legacy_sql", "--format=csv", query],
    text=True,
    capture_output=True
)

# ✅ Convert to DataFrame
if result.returncode == 0:
    df = pd.read_csv(StringIO(result.stdout))
    display(df)
else:
    print("❌ Error running query:\n", result.stderr)


Unnamed: 0,table_name,row_count
0,movies,3120
1,reviews,46350
2,recommendation_logs,156000
3,search_logs,79500
4,users,30900
5,watch_history,315000


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [15]:
from google.cloud import bigquery
import os

# Get project name from environment variable (optional)
project = os.environ["GOOGLE_CLOUD_PROJECT"]
dataset = "netflix"

# Initialize BigQuery client
client = bigquery.Client(project=project)


In [16]:
query = f"SELECT * FROM `{project}.{dataset}.users` LIMIT 1"
df_preview = client.query(query).to_dataframe()
df_preview.columns


Index(['user_id', 'email', 'first_name', 'last_name', 'age', 'gender',
       'country', 'state_province', 'city', 'subscription_plan',
       'subscription_start_date', 'is_active', 'monthly_spend',
       'primary_device', 'household_size', 'created_at'],
      dtype='object')

In [17]:
# ✅ Verification Prompt — Missingness Profile for Users Table (fixed columns)

from google.cloud import bigquery
import pandas as pd
import os

# ✅ Ensure your project ID is set
project = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-project-id-here")
dataset = "netflix"

client = bigquery.Client(project=project)

# ✅ Query total rows + % missing in country, subscription_plan, and age
query = f"""
WITH base AS (
    SELECT
        COUNT(*) AS n,
        COUNTIF(country IS NULL) AS miss_country,
        COUNTIF(subscription_plan IS NULL) AS miss_subscription_plan,
        COUNTIF(age IS NULL) AS miss_age
    FROM `{project}.{dataset}.users`
)
SELECT
    n AS total_rows,
    ROUND(100 * miss_country / n, 2) AS pct_missing_country,
    ROUND(100 * miss_subscription_plan / n, 2) AS pct_missing_subscription_plan,
    ROUND(100 * miss_age / n, 2) AS pct_missing_age
FROM base
"""

df = client.query(query).to_dataframe()
display(df)


Unnamed: 0,total_rows,pct_missing_country,pct_missing_subscription_plan,pct_missing_age
0,30900,0.0,0.0,11.93


In [18]:
# ✅ Verification Prompt — MAR by Country (Missingness in subscription_plan by country)

from google.cloud import bigquery
import pandas as pd
import os

# ✅ Ensure project is set
project = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-project-id-here")
dataset = "netflix"

client = bigquery.Client(project=project)

# ✅ Query: % missing subscription_plan by country
query = f"""
SELECT
    country,
    COUNT(*) AS n,
    ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan
FROM `{project}.{dataset}.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC
"""

df = client.query(query).to_dataframe()
display(df)


Unnamed: 0,country,n,pct_missing_subscription_plan
0,USA,21612,0.0
1,Canada,9288,0.0


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [22]:
query = f"SELECT * FROM `{project}.{dataset}.users` LIMIT 1"
df_preview = client.query(query).to_dataframe()
df_preview.columns


Index(['user_id', 'email', 'first_name', 'last_name', 'age', 'gender',
       'country', 'state_province', 'city', 'subscription_plan',
       'subscription_start_date', 'is_active', 'monthly_spend',
       'primary_device', 'household_size', 'created_at'],
      dtype='object')

In [21]:


# ✅ Ensure project and dataset are set
project = os.environ.get("GOOGLE_CLOUD_PROJECT", "your-project-id-here")
dataset = "netflix"

# ✅ Initialize BigQuery client
client = bigquery.Client(project=project)

# ✅ Query: % missing subscription_plan by country
query = f"""
SELECT
  country,
  COUNT(*) AS total_rows,
  ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan
FROM `{project}.{dataset}.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC
"""

# ✅ Run query and convert to DataFrame
df_mar_country = client.query(query).to_dataframe()

# ✅ Display results
display(df_mar_country)


Unnamed: 0,country,total_rows,pct_missing_subscription_plan
0,Canada,9288,0.0
1,USA,21612,0.0


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

Age is missing a significant amount of the time, about 12%.  It is not MCAR or they would be more alighned, instead it is likely MAR, of course there is a chance that it does depend on customer age, but we cannot prove that.  This is why I think it is MAR, as it depends on observed variables.  Other columns such as subscription plan have no missing data at all.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [None]:
# # EXAMPLE (from LLM) — Detect duplicate groups (commented)
# # SELECT user_id, movie_id, event_ts, device_type, COUNT(*) AS dup_count
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history`
# # GROUP BY user_id, movie_id, event_ts, device_type
# # HAVING dup_count > 1
# # ORDER BY dup_count DESC
# # LIMIT 20;

In [None]:
# # EXAMPLE (from LLM) — Keep-one policy (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` AS
# # SELECT * EXCEPT(rk) FROM (
# #   SELECT h.*,
# #          ROW_NUMBER() OVER (
# #            PARTITION BY user_id, movie_id, event_ts, device_type
# #            ORDER BY progress_ratio DESC, minutes_watched DESC
# #          ) AS rk
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history` h
# # )
# # WHERE rk = 1;

In [25]:
# ✅ Verification Prompt — Detect duplicate groups in watch_history

query_duplicates = f"""
SELECT
  user_id,
  movie_id,
  watch_date,
  device_type,
  COUNT(*) AS dup_count
FROM `{project}.{dataset}.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20
"""

df_dups = client.query(query_duplicates).to_dataframe()
display(df_dups)


Unnamed: 0,user_id,movie_id,watch_date,device_type,dup_count
0,user_03310,movie_0640,2024-09-08,Smart TV,12
1,user_00391,movie_0893,2024-08-26,Laptop,12
2,user_07738,movie_0793,2025-07-28,Desktop,9
3,user_07617,movie_0785,2024-07-14,Desktop,9
4,user_03140,movie_0205,2025-09-11,Desktop,9
5,user_01292,movie_0231,2024-07-05,Laptop,9
6,user_06417,movie_0590,2024-01-15,Laptop,9
7,user_02976,movie_0987,2024-09-19,Desktop,9
8,user_02126,movie_0642,2025-02-09,Desktop,9
9,user_02950,movie_0928,2025-06-03,Desktop,9


In [26]:
# ✅ Verification Prompt — Create watch_history_dedup (keep higher progress_percentage, then watch_duration_minutes)

query_dedup = f"""
CREATE OR REPLACE TABLE `{project}.{dataset}.watch_history_dedup` AS
SELECT
  * EXCEPT(rk)
FROM (
  SELECT
    h.*,
    ROW_NUMBER() OVER (
      PARTITION BY user_id, movie_id, watch_date, device_type
      ORDER BY progress_percentage DESC, watch_duration_minutes DESC
    ) AS rk
  FROM `{project}.{dataset}.watch_history` AS h
)
WHERE rk = 1
"""

client.query(query_dedup).result()
print("✅ Table 'watch_history_dedup' created successfully.")


✅ Table 'watch_history_dedup' created successfully.


### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [27]:
# ✅ Verification Prompt — Compare row counts before vs after deduplication

query_compare = f"""
SELECT
  'raw_watch_history' AS table_name,
  COUNT(*) AS row_count
FROM `{project}.{dataset}.watch_history`
UNION ALL
SELECT
  'watch_history_dedup' AS table_name,
  COUNT(*) AS row_count
FROM `{project}.{dataset}.watch_history_dedup`
"""

df_compare = client.query(query_compare).to_dataframe()
display(df_compare)


Unnamed: 0,table_name,row_count
0,watch_history_dedup,100000
1,raw_watch_history,315000


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

Duplicates can arise for natural and system generated reasons.  Natural duplicates can happen for example when users rewatch videos, or begin watching from a different timestamp than the previously did. System duplicates can happen from data ingestion issues, such as delayed data logging, or merging multiple sources.

They can corrupt labels and KPI's by blurrying the data, not knowing what trends seem stronger because they are duplicated, for example just because a user is more likely to pause a show rather than watch it all in one sitting does not mean it is obviously better.  It can also overestimate perfomance metrics and lead to false conclusions in data analysis.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [None]:
# # EXAMPLE (from LLM) — IQR outlier rate (commented)
# # WITH dist AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(1)] AS q1,
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(3)] AS q3
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # bounds AS (
# #   SELECT q1, q3, (q3-q1) AS iqr,
# #          q1 - 1.5*(q3-q1) AS lo,
# #          q3 + 1.5*(q3-q1) AS hi
# #   FROM dist
# # )
# # SELECT
# #   COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi) AS outliers,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi)/COUNT(*),2) AS pct_outliers
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h
# # CROSS JOIN bounds b;

In [None]:
# # EXAMPLE (from LLM) — Winsorize + quantiles (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust` AS
# # WITH q AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(1)]  AS p01,
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(98)] AS p99
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # )
# # SELECT
# #   h.*,
# #   GREATEST(q.p01, LEAST(q.p99, h.minutes_watched)) AS minutes_watched_capped
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h, q;
# #
# # -- Quantiles before vs after
# # WITH before AS (
# #   SELECT 'before' AS which, APPROX_QUANTILES(minutes_watched, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # after AS (
# #   SELECT 'after' AS which, APPROX_QUANTILES(minutes_watched_capped, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`
# # )
# # SELECT * FROM before UNION ALL SELECT * FROM after;

### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [28]:
# ✅ Verification Prompt — Compare min/median/max before vs after capping

query_cap_compare = f"""
WITH stats AS (
  SELECT
    'before_capping' AS stage,
    MIN(progress_percentage) AS min_val,
    APPROX_QUANTILES(progress_percentage, 2)[OFFSET(1)] AS median_val,
    MAX(progress_percentage) AS max_val
  FROM `{project}.{dataset}.watch_history`

  UNION ALL

  SELECT
    'after_capping' AS stage,
    MIN(progress_percentage) AS min_val,
    APPROX_QUANTILES(progress_percentage, 2)[OFFSET(1)] AS median_val,
    MAX(progress_percentage) AS max_val
  FROM `{project}.{dataset}.watch_history_dedup`
)

SELECT * FROM stats
"""

df_cap_compare = client.query(query_cap_compare).to_dataframe()
display(df_cap_compare)


Unnamed: 0,stage,min_val,median_val,max_val
0,before_capping,0.0,49.3,100.0
1,after_capping,0.0,50.4,100.0


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

Capping can backfire when those “outliers” are actually meaningful.  An example of this is users who really do watch way more than average. In that case, capping hides valuable behavior and flattens differences in the data.

Models like Random Forests or XGBoost handle outliers better because they  don't rely on averages or distance.  They split data into groups, so extreme values don't throw everything off.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [None]:
# # EXAMPLE (from LLM) — flag_binge (commented)
# # SELECT
# #   COUNTIF(minutes_watched > 8*60) AS sessions_over_8h,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(minutes_watched > 8*60)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`;

In [31]:
# ✅ Verification Prompt — flag_binge in watch_history_dedup

query_flag_binge = f"""
SELECT
  COUNTIF(watch_duration_minutes > 8*60) AS sessions_over_8h,
  COUNT(*) AS total_sessions,
  ROUND(100 * COUNTIF(watch_duration_minutes > 8*60) / COUNT(*), 2) AS pct_over_8h
FROM `{project}.{dataset}.watch_history_dedup`
"""

df_binge = client.query(query_flag_binge).to_dataframe()
display(df_binge)


Unnamed: 0,sessions_over_8h,total_sessions,pct_over_8h
0,639,100000,0.64


In [None]:
# # EXAMPLE (from LLM) — flag_age_extreme (commented)
# # SELECT
# #   COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #           CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100) AS extreme_age_rows,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #                     CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`;

In [32]:
# ✅ Verification Prompt — flag_binge in watch_history_dedup

query_flag_binge = f"""
SELECT
  COUNTIF(watch_duration_minutes > 8*60) AS sessions_over_8h,
  COUNT(*) AS total_sessions,
  ROUND(100 * COUNTIF(watch_duration_minutes > 8*60) / COUNT(*), 2) AS pct_over_8h
FROM `{project}.{dataset}.watch_history_dedup`
"""

df_binge = client.query(query_flag_binge).to_dataframe()
display(df_binge)


Unnamed: 0,sessions_over_8h,total_sessions,pct_over_8h
0,639,100000,0.64


In [None]:
# # EXAMPLE (from LLM) — flag_duration_anomaly (commented)
# # SELECT
# #   COUNTIF(duration_min < 15) AS titles_under_15m,
# #   COUNTIF(duration_min > 8*60) AS titles_over_8h,
# #   COUNT(*) AS total
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.movies`;

In [34]:
# ✅ Verification Prompt — flag_duration_anomaly in movies

query_flag_duration_anomaly = f"""
SELECT
  COUNTIF(duration_minutes < 15 OR duration_minutes > 480) AS anomaly_titles,
  COUNT(*) AS total_titles,
  ROUND(
    100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*),
    2
  ) AS pct_anomaly
FROM `{project}.{dataset}.movies`
"""

df_duration = client.query(query_flag_duration_anomaly).to_dataframe()
display(df_duration)


Unnamed: 0,anomaly_titles,total_titles,pct_anomaly
0,69,3120,2.21


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [36]:
# ✅ Verification Prompt — Summary of all flags (binge, age_extreme, duration_anomaly)

query_flag_summary = f"""
WITH
-- Flag 1: Sessions over 8 hours
flag_binge AS (
  SELECT
    'flag_binge' AS flag_name,
    ROUND(100 * COUNTIF(watch_duration_minutes > 8*60) / COUNT(*), 2) AS pct_of_rows
  FROM `{project}.{dataset}.watch_history_dedup`
),

-- Flag 2: Extreme ages (<10 or >100)
flag_age_extreme AS (
  SELECT
    'flag_age_extreme' AS flag_name,
    ROUND(
      100 * COUNTIF(age < 10 OR age > 100) / COUNT(*),
      2
    ) AS pct_of_rows
  FROM `{project}.{dataset}.users`
),

-- Flag 3: Duration anomalies (<15 or >480 min)
flag_duration_anomaly AS (
  SELECT
    'flag_duration_anomaly' AS flag_name,
    ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2) AS pct_of_rows
  FROM `{project}.{dataset}.movies`
)

-- Combine all three flags
SELECT * FROM flag_binge
UNION ALL
SELECT * FROM flag_age_extreme
UNION ALL
SELECT * FROM flag_duration_anomaly
"""

df_flag_summary = client.query(query_flag_summary).to_dataframe()
display(df_flag_summary)


Unnamed: 0,flag_name,pct_of_rows
0,flag_binge,0.64
1,flag_age_extreme,1.74
2,flag_duration_anomaly,2.21


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

Flag_duration_anomaly is the most common, being seen in 2.2% of the data.  I would keep this flag because extra long or extra short media may reflect either data issues or niche content types (shorts, trailers etc.).  The other two flags are more towads individual user behaviors and thus are less useful in practice.

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
