# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [None]:
# # EXAMPLE (from LLM) — Auth + Project/Region (commented; write your own cell using the prompt)
# # from google.colab import auth
# # auth.authenticate_user()
# #
# # import os
# # PROJECT_ID = input("Enter your GCP Project ID: ").strip()
# # REGION = "us-central1"  # keep consistent; change if instructed
# # os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
# # print("Project:", PROJECT_ID, "| Region:", REGION)
# #
# # # Set active project for gcloud/BigQuery CLI
# # !gcloud config set project $GOOGLE_CLOUD_PROJECT
# # !gcloud config get-value project
# # # Done: Auth + Project/Region set

In [1]:
from google.colab import auth
auth.authenticate_user()

import os
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "us-central1"  # keep consistent; change if instructed
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
print("Project:", PROJECT_ID, "| Region:", REGION)

# Set active project for gcloud/BigQuery CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT
!gcloud config get-value project
# Done: Auth + Project/Region set

Enter your GCP Project ID: mgmt-467-471819
Project: mgmt-467-471819 | Region: us-central1
Updated property [core/project].
mgmt-467-471819


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [2]:
# Verify configuration using gcloud and local variables

# Execute gcloud command to get the currently set project
print("Active Project ID:")
!gcloud config get-value project

# Print the Python variable REGION (assuming it was defined in the previous cell)
print("\nConfigured Region:")
print(REGION)

Active Project ID:
mgmt-467-471819

Configured Region:
us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

REFLECTION: So that it can be easily referenced across the rest and can be used for multiple different projects.

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [None]:
# # EXAMPLE (from LLM) — Kaggle setup (commented)
# # from google.colab import files
# # print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
# # uploaded = files.upload()
# #
# # import os
# # os.makedirs('/root/.kaggle', exist_ok=True)
# # with open('/root/.kaggle/kaggle.json', 'wb') as f:
# #     f.write(uploaded[list(uploaded.keys())[0]])
# # os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only
# #
# # !kaggle --version

In [5]:
from google.colab import files
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()

import os
os.makedirs('/root/.kaggle', exist_ok=True)
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])
os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only

!kaggle --version

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [6]:
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding? It helps keep everything secure.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [None]:
# # EXAMPLE (from LLM) — Download & unzip (commented)
# # !mkdir -p /content/data/raw
# # !kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data
# # !unzip -o /content/data/*.zip -d /content/data/raw
# # # List CSV inventory
# # !ls -lh /content/data/raw/*.csv

In [7]:
!mkdir -p /content/data/raw
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data
!unzip -o /content/data/*.zip -d /content/data/raw
# List CSV inventory
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 631MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root 

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [8]:
import glob

csv_files = glob.glob('/content/data/raw/*.csv')
num_csv_files = len(csv_files)

assert num_csv_files == 6, f"Expected 6 CSV files, but found {num_csv_files}"

print("Found exactly 6 CSV files:")
for csv_file in csv_files:
    print(csv_file)

Found exactly 6 CSV files:
/content/data/raw/movies.csv
/content/data/raw/watch_history.csv
/content/data/raw/search_logs.csv
/content/data/raw/users.csv
/content/data/raw/recommendation_logs.csv
/content/data/raw/reviews.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream? It makes it easy to run queries fast so that it is not running for useless points.

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [None]:
# # EXAMPLE (from LLM) — GCS staging (commented)
# # import uuid, os
# # bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
# # os.environ["BUCKET_NAME"] = bucket_name
# # !gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION
# # !gcloud storage cp /content/data/raw/* gs://$BUCKET_NAME/netflix/
# # print("Bucket:", bucket_name)
# # # Verify contents
# # !gcloud storage ls gs://$BUCKET_NAME/netflix/

In [9]:
import uuid, os
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name
# Use 'US' for multi-region bucket location
!gcloud storage buckets create gs://$BUCKET_NAME --location=US
!gcloud storage cp /content/data/raw/* gs://$BUCKET_NAME/netflix/
print("Bucket:", bucket_name)
# Verify contents
!gcloud storage ls gs://$BUCKET_NAME/netflix/

Creating gs://mgmt467-netflix-a38435f3/...
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-a38435f3/netflix/movies.csv
Copying file:///content/data/raw/README.md to gs://mgmt467-netflix-a38435f3/netflix/README.md
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-a38435f3/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-a38435f3/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-a38435f3/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-a38435f3/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-a38435f3/netflix/watch_history.csv

Average throughput: 46.8MiB/s
Bucket: mgmt467-netflix-a38435f3
gs://mgmt467-netflix-a38435f3/netflix/README.md
gs://mgmt467-netflix-a38435f3/netflix/movies.csv
gs://mgmt467-netflix-a38435f3/netflix/recommendation_logs.csv
gs://mgmt467-n

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [10]:
import os
# Verify the contents of the 'netflix/' prefix and show sizes
!gcloud storage ls -l gs://{os.environ['BUCKET_NAME']}/netflix/

      8002  2025-10-26T22:03:49Z  gs://mgmt467-netflix-a38435f3/netflix/README.md
    115942  2025-10-26T22:03:49Z  gs://mgmt467-netflix-a38435f3/netflix/movies.csv
   4695557  2025-10-26T22:03:49Z  gs://mgmt467-netflix-a38435f3/netflix/recommendation_logs.csv
   1861942  2025-10-26T22:03:49Z  gs://mgmt467-netflix-a38435f3/netflix/reviews.csv
   2250902  2025-10-26T22:03:49Z  gs://mgmt467-netflix-a38435f3/netflix/search_logs.csv
   1606820  2025-10-26T22:03:49Z  gs://mgmt467-netflix-a38435f3/netflix/users.csv
   9269425  2025-10-26T22:03:49Z  gs://mgmt467-netflix-a38435f3/netflix/watch_history.csv
TOTAL: 7 objects, 19808590 bytes (18.89MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab. It keeps it from being have to be reloaded in here repeatedly.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [None]:
# # EXAMPLE (from LLM) — BigQuery dataset (commented)
# # DATASET="netflix"
# # # Attempt to create; ignore if exists
# # !bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

In [None]:
# # EXAMPLE (from LLM) — Load tables (commented)
# # tables = {
# #   "users": "users.csv",
# #   "movies": "movies.csv",
# #   "watch_history": "watch_history.csv",
# #   "recommendation_logs": "recommendation_logs.csv",
# #   "search_logs": "search_logs.csv",
# #   "reviews": "reviews.csv",
# # }
# # import os
# # for tbl, fname in tables.items():
# #   src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
# #   print("Loading", tbl, "from", src)
# #   !bq load --skip_leading_rows=1 --autodetect --source_format=CSV $DATASET.$tbl $src
# #
# # # Row counts
# # for tbl in tables.keys():
# #   !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `${GOOGLE_CLOUD_PROJECT}.netflix.{tbl}`".format(tbl=tbl)

In [11]:
DATASET="netflix"
# Attempt to create; ignore if exists
!bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

BigQuery error in mk operation: Dataset 'mgmt-467-471819:netflix' already
exists.
Dataset may already exist.


In [12]:
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}
import os
DATASET = "netflix" # Ensure DATASET is defined in this cell or accessible

for tbl, fname in tables.items():
  src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
  print("Loading", tbl, "from", src)
  # Corrected bq load syntax: destination_table comes before source
  !bq load --skip_leading_rows=1 --autodetect --source_format=CSV {DATASET}.{tbl} {src}

# Row counts
for tbl in tables.keys():
  # Corrected bq query syntax to escape backticks for shell execution
  print(f"Getting row count for {tbl}")
  !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM \`{os.environ['GOOGLE_CLOUD_PROJECT']}.{DATASET}.{tbl}\`"

Loading users from gs://mgmt467-netflix-a38435f3/netflix/users.csv
Waiting on bqjob_r2441932a136d22f0_0000019a228d0038_1 ... (1s) Current status: DONE   
Loading movies from gs://mgmt467-netflix-a38435f3/netflix/movies.csv
Waiting on bqjob_r3d8e9aa1dc29e2ea_0000019a228d1687_1 ... (1s) Current status: DONE   
Loading watch_history from gs://mgmt467-netflix-a38435f3/netflix/watch_history.csv
Waiting on bqjob_r5b5d0a2e0bcecc5d_0000019a228d2c23_1 ... (2s) Current status: DONE   
Loading recommendation_logs from gs://mgmt467-netflix-a38435f3/netflix/recommendation_logs.csv
Waiting on bqjob_r49516a7a55608b96_0000019a228d4569_1 ... (1s) Current status: DONE   
Loading search_logs from gs://mgmt467-netflix-a38435f3/netflix/search_logs.csv
Waiting on bqjob_r6dabd7b0eb74ceb0_0000019a228d5bbf_1 ... (1s) Current status: DONE   
Loading reviews from gs://mgmt467-netflix-a38435f3/netflix/reviews.csv
Waiting on bqjob_r3e42044d9e1a596f_0000019a228d70f1_1 ... (1s) Current status: DONE   
Getting row co

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [18]:
from google.cloud import bigquery
import os

client = bigquery.Client()

# Compose unified query
query = f"""
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.users`
UNION ALL
SELECT 'movies' AS table_name, COUNT(*) AS row_count FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.movies`
UNION ALL
SELECT 'watch_history' AS table_name, COUNT(*) AS row_count FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs' AS table_name, COUNT(*) AS row_count FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs' AS table_name, COUNT(*) AS row_count FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.search_logs`
UNION ALL
SELECT 'reviews' AS table_name, COUNT(*) AS row_count FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.reviews`
"""

# Run query and convert results to DataFrame
results = client.query(query).to_dataframe()

# Display results
print(results)

            table_name  row_count
0              reviews      61800
1               movies       4160
2        watch_history     420000
3                users      41200
4  recommendation_logs     208000
5          search_logs     106000


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why? Autodetect is good when you do not know the names of the tables, but if you do know it then you should not use it.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [None]:
# # EXAMPLE (from LLM) — Missingness profile (commented)
# # -- Users: % missing per column
# # WITH base AS (
# #   SELECT COUNT(*) n,
# #          COUNTIF(region IS NULL) miss_region,
# #          COUNTIF(plan_tier IS NULL) miss_plan,
# #          COUNTIF(age_band IS NULL) miss_age
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# # )
# # SELECT n,
# #        ROUND(100*miss_region/n,2) AS pct_missing_region,
# #        ROUND(100*miss_plan/n,2)   AS pct_missing_plan_tier,
# #        ROUND(100*miss_age/n,2)    AS pct_missing_age_band
# # FROM base;

In [22]:
from google.cloud import bigquery
import os

client = bigquery.Client()

query1 = f"""
WITH base AS (
  SELECT
    COUNT(*) AS n,
    COUNTIF(country IS NULL) AS miss_country,
    COUNTIF(subscription_plan IS NULL) AS miss_plan,
    COUNTIF(age IS NULL) AS miss_age
  FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.users`
)
SELECT
  n,
  ROUND(100 * miss_country / n, 2) AS pct_missing_country,
  ROUND(100 * miss_plan / n, 2) AS pct_missing_subscription_plan,
  ROUND(100 * miss_age / n, 2) AS pct_missing_age
FROM base
"""

missing_summary = client.query(query1).to_dataframe()
print("Total rows and % missing per column:")
print(missing_summary)



Total rows and % missing per column:
       n  pct_missing_country  pct_missing_subscription_plan  pct_missing_age
0  41200                  0.0                            0.0            11.93


In [None]:
# # EXAMPLE (from LLM) — MAR by region (commented)
# # SELECT region,
# #        COUNT(*) AS n,
# #        ROUND(100*COUNTIF(plan_tier IS NULL)/COUNT(*),2) AS pct_missing_plan_tier
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# # GROUP BY region
# # ORDER BY pct_missing_plan_tier DESC;

In [23]:
query2 = f"""
SELECT
  country,
  COUNT(*) AS n,
  ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC
"""

missing_by_country = client.query(query2).to_dataframe()
print("% subscription_plan missing by country:")
print(missing_by_country)


% subscription_plan missing by country:
  country      n  pct_missing_subscription_plan
0  Canada  12384                            0.0
1     USA  28816                            0.0


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [24]:
query_verify = f"""
SELECT
  ROUND(100 * COUNTIF(country IS NULL) / COUNT(*), 2) AS pct_missing_country,
  ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan,
  ROUND(100 * COUNTIF(age IS NULL) / COUNT(*), 2) AS pct_missing_age
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.users`
"""

verify_df = client.query(query_verify).to_dataframe()
print("Three missingness percentages:")
print(verify_df)


Three missingness percentages:
   pct_missing_country  pct_missing_subscription_plan  pct_missing_age
0                  0.0                            0.0            11.93


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why. The most is by far age, and this makes sense since you do not have to input it, while the other ones are required.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [None]:
# # EXAMPLE (from LLM) — Detect duplicate groups (commented)
# # SELECT user_id, movie_id, event_ts, device_type, COUNT(*) AS dup_count
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history`
# # GROUP BY user_id, movie_id, event_ts, device_type
# # HAVING dup_count > 1
# # ORDER BY dup_count DESC
# # LIMIT 20;

In [27]:
from google.cloud import bigquery
import os

client = bigquery.Client()

query1 = f"""
-- Detect duplicate groups on (user_id, movie_id, watch_date, device_type)
SELECT
  user_id,
  movie_id,
  watch_date,
  device_type,
  COUNT(*) AS dup_count
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20
"""

duplicates_df = client.query(query1).to_dataframe()
print("Top 20 duplicate groups:")
print(duplicates_df)



Top 20 duplicate groups:
       user_id    movie_id  watch_date device_type  dup_count
0   user_03310  movie_0640  2024-09-08    Smart TV         16
1   user_00391  movie_0893  2024-08-26      Laptop         16
2   user_03176  movie_0534  2024-01-06      Laptop         12
3   user_03660  movie_0109  2025-05-20     Desktop         12
4   user_09564  movie_0552  2025-01-11      Laptop         12
5   user_08826  movie_0133  2025-04-11     Desktop         12
6   user_04899  movie_0142  2025-01-20     Desktop         12
7   user_02950  movie_0928  2025-06-03     Desktop         12
8   user_09512  movie_0825  2025-01-07     Desktop         12
9   user_02822  movie_0009  2025-08-30     Desktop         12
10  user_03898  movie_0500  2025-07-29     Desktop         12
11  user_06417  movie_0590  2024-01-15      Laptop         12
12  user_09815  movie_0827  2024-05-25      Laptop         12
13  user_06085  movie_0346  2024-03-04     Desktop         12
14  user_03140  movie_0205  2025-09-11     De

In [None]:
# # EXAMPLE (from LLM) — Keep-one policy (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` AS
# # SELECT * EXCEPT(rk) FROM (
# #   SELECT h.*,
# #          ROW_NUMBER() OVER (
# #            PARTITION BY user_id, movie_id, event_ts, device_type
# #            ORDER BY progress_ratio DESC, minutes_watched DESC
# #          ) AS rk
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history` h
# # )
# # WHERE rk = 1;

In [28]:
query2 = f"""
-- Keep-one policy: prefer higher progress_percentage, then watch_duration_minutes
CREATE OR REPLACE TABLE `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk)
FROM (
  SELECT
    h.*,
    ROW_NUMBER() OVER (
      PARTITION BY user_id, movie_id, watch_date, device_type
      ORDER BY progress_percentage DESC, watch_duration_minutes DESC
    ) AS rk
  FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history` h
)
WHERE rk = 1
"""

client.query(query2).result()
print("✅ Table `watch_history_dedup` created successfully.")


✅ Table `watch_history_dedup` created successfully.


### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [29]:
query_verify = f"""
SELECT 'raw_watch_history' AS table_name, COUNT(*) AS row_count
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history`
UNION ALL
SELECT 'watch_history_dedup' AS table_name, COUNT(*) AS row_count
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup`
"""

compare_df = client.query(query_verify).to_dataframe()
print("Before vs after deduplication:")
print(compare_df)


Before vs after deduplication:
            table_name  row_count
0  watch_history_dedup     100000
1    raw_watch_history     420000


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs? They can be someone who has two accounts or can be a bug in the system. They can have the same name and throws off all statistics.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [None]:
# # EXAMPLE (from LLM) — IQR outlier rate (commented)
# # WITH dist AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(1)] AS q1,
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(3)] AS q3
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # bounds AS (
# #   SELECT q1, q3, (q3-q1) AS iqr,
# #          q1 - 1.5*(q3-q1) AS lo,
# #          q3 + 1.5*(q3-q1) AS hi
# #   FROM dist
# # )
# # SELECT
# #   COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi) AS outliers,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi)/COUNT(*),2) AS pct_outliers
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h
# # CROSS JOIN bounds b;

In [30]:
from google.cloud import bigquery
import os

client = bigquery.Client()

query1 = f"""
-- Compute IQR bounds for watch_duration_minutes and report % outliers
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(2)] AS q2,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup`
),
bounds AS (
  SELECT
    q1,
    q3,
    (q3 - q1) AS iqr,
    q1 - 1.5 * (q3 - q1) AS lo,
    q3 + 1.5 * (q3 - q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100 * COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) / COUNT(*), 2) AS pct_outliers
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup` h
CROSS JOIN bounds b;
"""

iqr_df = client.query(query1).to_dataframe()
print("IQR-based outlier summary:")
print(iqr_df)


IQR-based outlier summary:
   outliers   total  pct_outliers
0      3462  100000          3.46


In [None]:
# # EXAMPLE (from LLM) — Winsorize + quantiles (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust` AS
# # WITH q AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(1)]  AS p01,
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(98)] AS p99
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # )
# # SELECT
# #   h.*,
# #   GREATEST(q.p01, LEAST(q.p99, h.minutes_watched)) AS minutes_watched_capped
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h, q;
# #
# # -- Quantiles before vs after
# # WITH before AS (
# #   SELECT 'before' AS which, APPROX_QUANTILES(minutes_watched, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # after AS (
# #   SELECT 'after' AS which, APPROX_QUANTILES(minutes_watched_capped, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`
# # )
# # SELECT * FROM before UNION ALL SELECT * FROM after;

In [31]:
query2 = f"""
-- Create robust version of watch_history_dedup with capped watch_duration_minutes
CREATE OR REPLACE TABLE `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)] AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
  FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_capped
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup` h, q;
"""

client.query(query2).result()
print("✅ Table `watch_history_robust` created successfully.")


✅ Table `watch_history_robust` created successfully.


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [32]:
query_verify = f"""
SELECT 'before' AS which,
  MIN(watch_duration_minutes) AS min_val,
  MAX(watch_duration_minutes) AS max_val
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup`
UNION ALL
SELECT 'after' AS which,
  MIN(watch_duration_capped) AS min_val,
  MAX(watch_duration_capped) AS max_val
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust`;
"""

verify_df = client.query(query_verify).to_dataframe()
print("Min/Max before vs after capping:")
print(verify_df)


Min/Max before vs after capping:
    which  min_val  max_val
0  before      0.2    799.3
1   after      4.4    366.0


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why. It can be harmful if it gets rid of values but are not actually outliers, thus getting rid of useful information.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [None]:
# # EXAMPLE (from LLM) — flag_binge (commented)
# # SELECT
# #   COUNTIF(minutes_watched > 8*60) AS sessions_over_8h,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(minutes_watched > 8*60)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`;

In [33]:
from google.cloud import bigquery
import os
client = bigquery.Client()

query1 = f"""
-- Flag sessions over 8 hours
SELECT
  COUNTIF(watch_duration_minutes > 8*60) AS sessions_over_8h,
  COUNT(*) AS total,
  ROUND(100 * COUNTIF(watch_duration_minutes > 8*60) / COUNT(*), 2) AS pct_over_8h
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust`;
"""

flag_binge = client.query(query1).to_dataframe()
print("Flag: Binge sessions (>8h):")
print(flag_binge)


Flag: Binge sessions (>8h):
   sessions_over_8h   total  pct_over_8h
0               639  100000         0.64


In [None]:
# # EXAMPLE (from LLM) — flag_age_extreme (commented)
# # SELECT
# #   COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #           CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100) AS extreme_age_rows,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #                     CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`;

In [34]:
query2 = f"""
-- Flag extreme ages (<10 or >100)
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_rows,
  COUNT(*) AS total,
  ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2) AS pct_extreme_age
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.users`;
"""

flag_age = client.query(query2).to_dataframe()
print("Flag: Extreme age values (<10 or >100):")
print(flag_age)


Flag: Extreme age values (<10 or >100):
   extreme_age_rows  total  pct_extreme_age
0               716  41200             1.74


In [None]:
# # EXAMPLE (from LLM) — flag_duration_anomaly (commented)
# # SELECT
# #   COUNTIF(duration_min < 15) AS titles_under_15m,
# #   COUNTIF(duration_min > 8*60) AS titles_over_8h,
# #   COUNT(*) AS total
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.movies`;

In [36]:
query3 = f"""
-- Flag duration anomalies (<15 or >480 minutes)
SELECT
  COUNTIF(duration_minutes < 15 OR duration_minutes > 480) AS duration_anomalies,
  COUNT(*) AS total,
  ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2) AS pct_anomalies
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.movies`;
"""

flag_duration = client.query(query3).to_dataframe()
print("Flag: Duration anomalies (<15 or >480 min):")
print(flag_duration)


Flag: Duration anomalies (<15 or >480 min):
   duration_anomalies  total  pct_anomalies
0                  92   4160           2.21


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [38]:
query_verify = f"""
SELECT 'flag_binge' AS flag_name,
       ROUND(100 * COUNTIF(watch_duration_minutes > 8*60) / COUNT(*), 2) AS pct_of_rows
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust`
UNION ALL
SELECT 'flag_age_extreme',
       ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2)
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.users`
UNION ALL
SELECT 'flag_duration_anomaly',
       ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2)
FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.movies`;
"""

summary_flags = client.query(query_verify).to_dataframe()
print("Compact flag summary:")
print(summary_flags)


Compact flag summary:
               flag_name  pct_of_rows
0             flag_binge         0.64
1       flag_age_extreme         1.74
2  flag_duration_anomaly         2.21


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why? the duration anomaly is the most common.

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
