<a href="https://colab.research.google.com/github/gabeunix/mgmt467-analytics-portfolio/blob/main/GWang_Unit2_Lab1_PromptPlusExamples_Colab_Kaggle_GCS_BQ_DQ.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [1]:
# # EXAMPLE (from LLM) — Auth + Project/Region (commented; write your own cell using the prompt)
from google.colab import auth
auth.authenticate_user()
# #
import os
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "us-central1"  # keep consistent; change if instructed
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
print("Project:", PROJECT_ID, "| Region:", REGION)
# #
# # # Set active project for gcloud/BigQuery CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT
!gcloud config get-value project
# # # Done: Auth + Project/Region set

Enter your GCP Project ID: boxwood-veld-471119-r6
Project: boxwood-veld-471119-r6 | Region: us-central1
Updated property [core/project].
boxwood-veld-471119-r6


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [5]:
#EXAMPLE (from LLM) — Kaggle setup (commented)
from google.colab import files
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()
import os
os.makedirs('/root/.kaggle', exist_ok=True)
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])
os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only
!kaggle --version

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [6]:
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

Requiring strict 0600 permissions on API tokens means that only the owner of the file can read and write to it. This is a crucial security measure to protect your sensitive API credentials. By setting these permissions, you are preventing other users on the system (if any) or processes running under different user accounts from accessing or potentially stealing your API key. This helps avoid risks such as unauthorized access to your accounts, data breaches, and potential misuse of your API key, which could lead to unexpected costs or other security compromises.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [7]:
#EXAMPLE (from LLM) — Download & unzip (commented)
!mkdir -p /content/data/raw
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data
!unzip -o /content/data/*.zip -d /content/data/raw
# List CSV inventory
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 754MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root 

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [8]:
import uuid
import os

# Create a unique bucket name with a random suffix
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Create the GCS bucket
# Attempting without explicit location due to previous error
!gcloud storage buckets create gs://$BUCKET_NAME

# Upload all CSV files from the raw data directory to the bucket
# Staging data in GCS provides a consistent, versionable source for data loading into BigQuery.
# It decouples data storage from the compute layer and allows for easier auditing and recovery.
!gcloud storage cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# Print the bucket name
print(f"Created and uploaded data to GCS bucket: {bucket_name}")
print("\nBenefits of staging in GCS:")
print("- **Consistent Source:** Provides a stable location for data, unlike temporary Colab storage.")
print("- **Versionable:** GCS supports object versioning for easier rollback and auditing.")
print("- **Decoupled Storage:** Separates data storage from compute, allowing BigQuery to access data efficiently.")
print("- **Auditable:** GCS logs provide an audit trail of data access and modifications.")

Creating gs://mgmt467-netflix-3e491f27/...
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-3e491f27/netflix/movies.csv
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-3e491f27/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-3e491f27/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-3e491f27/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-3e491f27/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-3e491f27/netflix/watch_history.csv

Average throughput: 91.1MiB/s
Created and uploaded data to GCS bucket: mgmt467-netflix-3e491f27

Benefits of staging in GCS:
- **Consistent Source:** Provides a stable location for data, unlike temporary Colab storage.
- **Versionable:** GCS supports object versioning for easier rollback and auditing.
- **Decoupled Storage:** Se

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [9]:
# List objects in the bucket under the 'netflix/' prefix with sizes
import os
bucket_name = os.environ.get("BUCKET_NAME")
if bucket_name:
  !gcloud storage ls --readable-sizes gs://$BUCKET_NAME/netflix/
else:
  print("BUCKET_NAME environment variable is not set.")

gs://mgmt467-netflix-3e491f27/netflix/movies.csv
gs://mgmt467-netflix-3e491f27/netflix/recommendation_logs.csv
gs://mgmt467-netflix-3e491f27/netflix/reviews.csv
gs://mgmt467-netflix-3e491f27/netflix/search_logs.csv
gs://mgmt467-netflix-3e491f27/netflix/users.csv
gs://mgmt467-netflix-3e491f27/netflix/watch_history.csv


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

Persistence and Consistency: Data in local Colab storage is temporary and tied to the Colab session. Staging data in GCS provides a persistent and consistent location for your data that is accessible across different sessions and services, making your data pipeline more robust and reproducible.
Decoupling of Storage and Compute: Loading directly from Colab couples your data source tightly to the Colab environment. Staging in GCS decouples storage from compute, allowing services like BigQuery to access the data efficiently and independently. This is a more scalable and flexible approach for cloud-based data processing.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [None]:
# # EXAMPLE (from LLM) — BigQuery dataset (commented)
# # DATASET="netflix"
# # # Attempt to create; ignore if exists
# # !bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

In [10]:
DATASET="netflix"
# Attempt to create; ignore if exists and print a friendly message
!bq --location=US mk -d --description "Netflix dataset for MGMT467" $DATASET 2> /dev/null || echo "Dataset '$DATASET' may already exist."


BigQuery error in mk operation: Dataset 'boxwood-veld-471119-r6:netflix' already
exists.
Dataset '' may already exist.


In [11]:
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}

import os
DATASET="netflix" # Ensure DATASET is defined
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] # Get PROJECT_ID from environment

for tbl, fname in tables.items():
  src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
  print(f"Loading {tbl} from {src}")
  # Use bq load command with specified flags
  !bq load --skip_leading_rows=1 --autodetect --source_format=CSV {DATASET}.{tbl} {src}

# Row counts for verification
print("\nRow counts for loaded tables:")
for tbl in tables.keys():
  # Escape the backticks with backslashes
  !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM \`{PROJECT_ID}.{DATASET}.{tbl}\`"


Loading users from gs://mgmt467-netflix-3e491f27/netflix/users.csv
Waiting on bqjob_r2d231617720abc79_0000019a22d003c9_1 ... (1s) Current status: DONE   
Loading movies from gs://mgmt467-netflix-3e491f27/netflix/movies.csv
Waiting on bqjob_r574cbe027fec1f7c_0000019a22d01c30_1 ... (1s) Current status: DONE   
Loading watch_history from gs://mgmt467-netflix-3e491f27/netflix/watch_history.csv
Waiting on bqjob_r286592e0b816f4ae_0000019a22d031f8_1 ... (2s) Current status: DONE   
Loading recommendation_logs from gs://mgmt467-netflix-3e491f27/netflix/recommendation_logs.csv
Waiting on bqjob_r1c8bb4e3784e62fe_0000019a22d04e6c_1 ... (1s) Current status: DONE   
Loading search_logs from gs://mgmt467-netflix-3e491f27/netflix/search_logs.csv
Waiting on bqjob_r434724f2a4e21b8f_0000019a22d06461_1 ... (1s) Current status: DONE   
Loading reviews from gs://mgmt467-netflix-3e491f27/netflix/reviews.csv
Waiting on bqjob_r55410240437260f5_0000019a22d07c66_1 ... (1s) Current status: DONE   

Row counts fo

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [13]:
!bq query --nouse_legacy_sql 'SELECT table_id AS table_name, row_count FROM `{os.environ["GOOGLE_CLOUD_PROJECT"]}.netflix.__TABLES__` WHERE table_id IN ("users", "movies", "watch_history", "recommendation_logs", "search_logs", "reviews")'


Waiting on bqjob_r7fd975191096af01_0000019a22e925b7_1 ... (0s) Current status: DONE   
+---------------------+-----------+
|     table_name      | row_count |
+---------------------+-----------+
| movies              |      2080 |
| recommendation_logs |    104000 |
| reviews             |     30900 |
| search_logs         |     53000 |
| users               |     20600 |
| watch_history       |    210000 |
+---------------------+-----------+


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

Schema autodetect is best used for convenience and exploration, such as when you're first profiling a new dataset or for quick, one-off analyses. It's also reliable for self-describing formats like Parquet or Avro. However, you must enforce an explicit schema in all production environments or automated data pipelines. This is critical for reliability and data quality. An explicit schema acts as a strict contract, ensuring type safety (e.g., preventing a STRING from entering an INT column) and rejecting malformed data. Autodetect only samples the first few rows, so it can easily guess the wrong data type, leading to load failures or silent data corruption when different data appears later in the file. Enforcing a schema prevents these errors, improves load performance, and ensures your data is consistent and trustworthy.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [15]:
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

# Check the schema of the users table
!bq show --schema --format=prettyjson {project_id}:netflix.users

[
  {
    "mode": "NULLABLE",
    "name": "user_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "email",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "first_name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "last_name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "age",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "gender",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "country",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "state_province",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "city",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "subscription_plan",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "subscription_start_date",
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "is_active",
    "type": "BOOLEAN"
  },
  {
    "mode": "NULLABLE",
    "name": "month

In [17]:
# Cell 1: Total rows and % missing in country, subscription_plan, age
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM \`{project_id}.netflix.users\`
)
SELECT n,
       ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base
"""

!bq query --nouse_legacy_sql "{sql_query}"

  """


+-------+---------------------+-------------------------------+-----------------+
|   n   | pct_missing_country | pct_missing_subscription_plan | pct_missing_age |
+-------+---------------------+-------------------------------+-----------------+
| 20600 |                 0.0 |                           0.0 |           11.93 |
+-------+---------------------+-------------------------------+-----------------+


In [None]:
# # EXAMPLE (from LLM) — Missingness profile (commented)
# # -- Users: % missing per column
# # WITH base AS (
# #   SELECT COUNT(*) n,
# #          COUNTIF(region IS NULL) miss_region,
# #          COUNTIF(plan_tier IS NULL) miss_plan,
# #          COUNTIF(age_band IS NULL) miss_age
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# # )
# # SELECT n,
# #        ROUND(100*miss_region/n,2) AS pct_missing_region,
# #        ROUND(100*miss_plan/n,2)   AS pct_missing_plan_tier,
# #        ROUND(100*miss_age/n,2)    AS pct_missing_age_band
# # FROM base;

In [None]:
# # EXAMPLE (from LLM) — MAR by region (commented)
# # SELECT region,
# #        COUNT(*) AS n,
# #        ROUND(100*COUNTIF(plan_tier IS NULL)/COUNT(*),2) AS pct_missing_plan_tier
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# # GROUP BY region
# # ORDER BY pct_missing_plan_tier DESC;

### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [18]:
# Verification: Print the three missingness percentages
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM \`{project_id}.netflix.users\`
)
SELECT ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base
"""

!bq query --nouse_legacy_sql "{sql_query}"


  """


+---------------------+-------------------------------+-----------------+
| pct_missing_country | pct_missing_subscription_plan | pct_missing_age |
+---------------------+-------------------------------+-----------------+
|                 0.0 |                           0.0 |           11.93 |
+---------------------+-------------------------------+-----------------+


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

Based on the query results, the age column is the one with missing data, at 11.93%. The country and subscription_plan columns have no missing data (0.0%)

MCAR (Missing Completely at Random): The fact that the data is missing is completely random. It has nothing to do with any other column or the missing age value itself. (e.g., a database glitch randomly deleted 11.93% of age entries).

MAR (Missing at Random): The missingness is related to another column in the dataset. (e.g., users from a specific country or on a free_tier plan are less likely to provide their age).

MNAR (Missing Not at Random): The missingness is related to the value of the missing data itself. (e.g., people who are very young or very old are less likely to provide their age).

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [None]:
# # EXAMPLE (from LLM) — Detect duplicate groups (commented)
# # SELECT user_id, movie_id, event_ts, device_type, COUNT(*) AS dup_count
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history`
# # GROUP BY user_id, movie_id, event_ts, device_type
# # HAVING dup_count > 1
# # ORDER BY dup_count DESC
# # LIMIT 20;

In [None]:
# # EXAMPLE (from LLM) — Keep-one policy (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` AS
# # SELECT * EXCEPT(rk) FROM (
# #   SELECT h.*,
# #          ROW_NUMBER() OVER (
# #            PARTITION BY user_id, movie_id, event_ts, device_type
# #            ORDER BY progress_ratio DESC, minutes_watched DESC
# #          ) AS rk
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history` h
# # )
# # WHERE rk = 1;

In [19]:
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

!bq show --schema --format=prettyjson {project_id}:netflix.watch_history

[
  {
    "mode": "NULLABLE",
    "name": "session_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "user_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "movie_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "watch_date",
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "device_type",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "watch_duration_minutes",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "progress_percentage",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "action",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "quality",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "location_country",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "is_download",
    "type": "BOOLEAN"
  },
  {
    "mode": "NULLABLE",
    "name": "user_rating",
    "type": "INTEGER"
  }
]


In [20]:
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT user_id,
       movie_id,
       watch_date,
       device_type,
       COUNT(*) AS duplicate_count
FROM \`{project_id}.netflix.watch_history\`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING COUNT(*) > 1
ORDER BY duplicate_count DESC
LIMIT 20
"""

!bq query --nouse_legacy_sql "{sql_query}"

  """


+------------+------------+------------+-------------+-----------------+
|  user_id   |  movie_id  | watch_date | device_type | duplicate_count |
+------------+------------+------------+-------------+-----------------+
| user_03310 | movie_0640 | 2024-09-08 | Smart TV    |               8 |
| user_00391 | movie_0893 | 2024-08-26 | Laptop      |               8 |
| user_02822 | movie_0009 | 2025-08-30 | Desktop     |               6 |
| user_09972 | movie_0536 | 2025-07-16 | Laptop      |               6 |
| user_01292 | movie_0231 | 2024-07-05 | Laptop      |               6 |
| user_06103 | movie_0113 | 2025-04-08 | Laptop      |               6 |
| user_01807 | movie_0921 | 2025-01-30 | Laptop      |               6 |
| user_07981 | movie_0094 | 2025-11-08 | Laptop      |               6 |
| user_08826 | movie_0133 | 2025-04-11 | Desktop     |               6 |
| user_07738 | movie_0793 | 2025-07-28 | Desktop     |               6 |
| user_01383 | movie_0015 | 2025-04-29 | Desktop   

In [21]:
# Cell 2: Create deduplicated watch_history table
# Strategy: Keep one row per (user_id, movie_id, watch_date, device_type) group
# Preference: Higher progress_percentage first, then higher watch_duration_minutes
# This removes duplicates while preserving the most complete viewing record
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
CREATE OR REPLACE TABLE \`{project_id}.netflix.watch_history_dedup\` AS
SELECT * EXCEPT(row_num)
FROM (
  SELECT *,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS row_num
  FROM \`{project_id}.netflix.watch_history\`
)
WHERE row_num = 1
"""

!bq query --nouse_legacy_sql "{sql_query}"

  
  """
  """


Waiting on bqjob_r2f59de04178455c5_0000019a22f6ab43_1 ... (1s) Current status: DONE   
Created boxwood-veld-471119-r6.netflix.watch_history_dedup



### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [22]:
# Verification: Compare row counts before and after deduplication
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT
  'Original' AS table_name,
  COUNT(*) AS row_count
FROM \`{project_id}.netflix.watch_history\`

UNION ALL

SELECT
  'Deduplicated' AS table_name,
  COUNT(*) AS row_count
FROM \`{project_id}.netflix.watch_history_dedup\`

UNION ALL

SELECT
  'Duplicates Removed' AS table_name,
  (SELECT COUNT(*) FROM \`{project_id}.netflix.watch_history\`) -
  (SELECT COUNT(*) FROM \`{project_id}.netflix.watch_history_dedup\`) AS row_count
"""

!bq query --nouse_legacy_sql "{sql_query}"


  """
  """
  """
  """


+--------------------+-----------+
|     table_name     | row_count |
+--------------------+-----------+
| Deduplicated       |    100000 |
| Duplicates Removed |    110000 |
| Original           |    210000 |
+--------------------+-----------+


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

Duplicates primarily arise from two sources: system-generated errors and natural user behavior. System-generated duplicates are technical flaws, such as an ETL job failing and re-running, which re-loads the same data, or a streaming system guaranteeing "at-least-once" delivery, which might send the same event twice to prevent data loss. Natural duplicates, on the other hand, are caused by user actions, like a person impatiently clicking a "submit" button multiple times or signing up for an account twice with different email addresses.

This duplication severely corrupts analytics and machine learning. For KPIs, duplicates directly inflate metrics, leading to flawed business decisions; in your case, the 210,000 "Original" watch_history events would report user engagement as more than double the true 100,000 "Deduplicated" events. For ML labels, the impact is even more damaging. Duplicates in the training set cause overfitting, as the model learns to memorize these repeated examples instead of generalizing. Even worse, if duplicates leak into both the training and test sets, the model effectively "cheats" by training on the answers, leading to a completely fake and inflated accuracy score that will fail in a real-world scenario.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [None]:
# # EXAMPLE (from LLM) — IQR outlier rate (commented)
# # WITH dist AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(1)] AS q1,
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(3)] AS q3
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # bounds AS (
# #   SELECT q1, q3, (q3-q1) AS iqr,
# #          q1 - 1.5*(q3-q1) AS lo,
# #          q3 + 1.5*(q3-q1) AS hi
# #   FROM dist
# # )
# # SELECT
# #   COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi) AS outliers,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi)/COUNT(*),2) AS pct_outliers
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h
# # CROSS JOIN bounds b;

In [None]:
# # EXAMPLE (from LLM) — Winsorize + quantiles (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust` AS
# # WITH q AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(1)]  AS p01,
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(98)] AS p99
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # )
# # SELECT
# #   h.*,
# #   GREATEST(q.p01, LEAST(q.p99, h.minutes_watched)) AS minutes_watched_capped
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h, q;
# #
# # -- Quantiles before vs after
# # WITH before AS (
# #   SELECT 'before' AS which, APPROX_QUANTILES(minutes_watched, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # after AS (
# #   SELECT 'after' AS which, APPROX_QUANTILES(minutes_watched_capped, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`
# # )
# # SELECT * FROM before UNION ALL SELECT * FROM after;

In [23]:
import os

sql_query = f"""
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM \`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM \`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\` h
CROSS JOIN bounds b
"""

!bq query --nouse_legacy_sql "{sql_query}"

  """
  """


+----------+--------+--------------+
| outliers | total  | pct_outliers |
+----------+--------+--------------+
|     3521 | 100000 |         3.52 |
+----------+--------+--------------+


In [24]:
import os

sql_create_table = f"""
CREATE OR REPLACE TABLE \`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust\` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
  FROM \`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM \`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\` h, q
"""

!bq query --nouse_legacy_sql "{sql_create_table}"

print("\nQuantiles before vs after capping:")

sql_quantiles = f"""
WITH before AS (
  SELECT 'before' AS which, APPROX_QUANTILES(watch_duration_minutes, 5) AS q
  FROM \`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\`
),
after AS (
  SELECT 'after' AS which, APPROX_QUANTILES(watch_duration_minutes_capped, 5) AS q
  FROM \`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust\`
)
SELECT * FROM before UNION ALL SELECT * FROM after
"""

!bq query --nouse_legacy_sql "{sql_quantiles}"

  
  """
  """
  """
  """
  """


Waiting on bqjob_r6abdbe335aea0458_0000019a2300b03f_1 ... (1s) Current status: DONE   
Created boxwood-veld-471119-r6.netflix.watch_history_robust


Quantiles before vs after capping:
+--------+---------------------------------------------+
| which  |                      q                      |
+--------+---------------------------------------------+
| after  | ["4.4","24.6","41.5","61.5","92.0","356.3"] |
| before | ["0.2","24.8","41.7","61.2","91.7","799.3"] |
+--------+---------------------------------------------+


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [25]:
# Verification: Show min/median/max before vs after capping
import os

sql_query = f"""
SELECT
  'Before (Original)' AS version,
  MIN(watch_duration_minutes) AS min_val,
  APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_val,
  MAX(watch_duration_minutes) AS max_val
FROM \`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\`

UNION ALL

SELECT
  'After (Capped)' AS version,
  MIN(watch_duration_minutes_capped) AS min_val,
  APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_val,
  MAX(watch_duration_minutes_capped) AS max_val
FROM \`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust\`

ORDER BY version DESC
"""

!bq query --nouse_legacy_sql "{sql_query}"

  """
  """


+-------------------+---------+------------+---------+
|      version      | min_val | median_val | max_val |
+-------------------+---------+------------+---------+
| Before (Original) |     0.2 |       50.8 |   799.3 |
| After (Capped)    |     4.4 |       51.4 |   356.3 |
+-------------------+---------+------------+---------+


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

Capping (or winsorizing) can be harmful when the outliers are not data errors but are legitimate, meaningful events. This is especially damaging in anomaly or fraud detection, where the extreme outlier is the signal you're trying to find; capping it effectively deletes the data you need most. It also artificially compresses the data's variance and distorts its true distribution, causing models to underestimate the full range of real-world possibilities and the true level of risk (e.g., in financial or load-capacity models).

Random Forests (and other decision tree-based models) are much less sensitive to outliers. This is because they work by splitting data based on rank and order (e.g., "is age > 65?"), not by its magnitude (e.g., "how far is age from the mean?"). An extreme outlier (like an age of 500) will simply be sorted into the same group as all other high values and cannot "pull" or "skew" the model's decision boundaries, unlike in

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [None]:
# # EXAMPLE (from LLM) — flag_binge (commented)
# # SELECT
# #   COUNTIF(minutes_watched > 8*60) AS sessions_over_8h,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(minutes_watched > 8*60)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`;

In [None]:
# # EXAMPLE (from LLM) — flag_age_extreme (commented)
# # SELECT
# #   COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #           CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100) AS extreme_age_rows,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #                     CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`;

In [None]:
# # EXAMPLE (from LLM) — flag_duration_anomaly (commented)
# # SELECT
# #   COUNTIF(duration_min < 15) AS titles_under_15m,
# #   COUNTIF(duration_min > 8*60) AS titles_over_8h,
# #   COUNT(*) AS total
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.movies`;

In [26]:
# Cell 1: Create and summarize flag_binge for sessions > 8 hours (480 minutes)
# Identifies potential binge-watching behavior in watch_history_robust
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT
  COUNTIF(watch_duration_minutes_capped > 480) AS binge_sessions,
  COUNT(*) AS total_sessions,
  ROUND(100 * COUNTIF(watch_duration_minutes_capped > 480) / COUNT(*), 2) AS pct_binge,
  AVG(CASE WHEN watch_duration_minutes_capped > 480 THEN watch_duration_minutes_capped END) AS avg_binge_duration,
  MAX(watch_duration_minutes_capped) AS max_binge_duration
FROM \`{project_id}.netflix.watch_history_robust\`
"""

!bq query --nouse_legacy_sql "{sql_query}"

  """


+----------------+----------------+-----------+--------------------+--------------------+
| binge_sessions | total_sessions | pct_binge | avg_binge_duration | max_binge_duration |
+----------------+----------------+-----------+--------------------+--------------------+
|              0 |         100000 |       0.0 |               NULL |              356.3 |
+----------------+----------------+-----------+--------------------+--------------------+


In [27]:
# Cell 2: Create and summarize flag_age_extreme for users with age <10 or >100
# Flag extreme/suspicious ages that may indicate data quality issues
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_users,
  COUNT(*) AS total_users_with_age,
  ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2) AS pct_extreme_age,
  COUNTIF(age < 10) AS users_under_10,
  COUNTIF(age > 100) AS users_over_100,
  MIN(age) AS min_age,
  MAX(age) AS max_age
FROM \`{project_id}.netflix.users\`
WHERE age IS NOT NULL
"""

!bq query --nouse_legacy_sql "{sql_query}"

  """


+-------------------+----------------------+-----------------+----------------+----------------+---------+---------+
| extreme_age_users | total_users_with_age | pct_extreme_age | users_under_10 | users_over_100 | min_age | max_age |
+-------------------+----------------------+-----------------+----------------+----------------+---------+---------+
|               358 |                18142 |            1.97 |            336 |             22 |    -7.0 |   109.0 |
+-------------------+----------------------+-----------------+----------------+----------------+---------+---------+


In [28]:
# Cell 3: Compute and summarize flag_duration_anomaly for movies with durations < 15 or > 480 minutes
# Flag movies with potentially anomalous durations (very short or very long)
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT
  COUNTIF(duration_minutes < 15 OR duration_minutes > 480) AS duration_anomalies,
  COUNT(*) AS total_movies,
  ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2)
    AS pct_duration_anomalies,
  COUNTIF(duration_minutes < 15)  AS movies_under_15_min,
  COUNTIF(duration_minutes > 480) AS movies_over_480_min
FROM \`{project_id}.netflix.movies\`
WHERE duration_minutes IS NOT NULL
"""

!bq query --nouse_legacy_sql "{sql_query}"

  """


+--------------------+--------------+------------------------+---------------------+---------------------+
| duration_anomalies | total_movies | pct_duration_anomalies | movies_under_15_min | movies_over_480_min |
+--------------------+--------------+------------------------+---------------------+---------------------+
|                 46 |         2080 |                   2.21 |                  24 |                  22 |
+--------------------+--------------+------------------------+---------------------+---------------------+


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [29]:
# Compact summary of all data quality and behavioral flags (binge, age, duration)
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT 'flag_binge' AS flag_name,
       ROUND(100 * COUNTIF(watch_duration_minutes_capped > 480) / COUNT(*), 2) AS pct_of_rows
FROM \`{project_id}.netflix.watch_history_robust\`

UNION ALL

SELECT 'flag_age_extreme' AS flag_name,
       ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2) AS pct_of_rows
FROM \`{project_id}.netflix.users\`
WHERE age IS NOT NULL

UNION ALL

SELECT 'flag_duration_anomaly' AS flag_name,
       ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2) AS pct_of_rows
FROM \`{project_id}.netflix.movies\`
WHERE duration_minutes IS NOT NULL
"""

!bq query --nouse_legacy_sql "{sql_query}"

  """
  """
  """


+-----------------------+-------------+
|       flag_name       | pct_of_rows |
+-----------------------+-------------+
| flag_binge            |         0.0 |
| flag_age_extreme      |        1.97 |
| flag_duration_anomaly |        2.21 |
+-----------------------+-------------+


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

The most common anomaly flag is flag_duration_anomaly, at 2.21%. As for which to keep, both flags are valuable as features, but for different reasons. flag_duration_anomaly: This should be kept as a feature to capture unusual content characteristics. A movie with an abnormally short or long duration is an outlier that could skew recommendation models or average watch-time calculations. By flagging it, a model can learn whether this "unusualness" itself affects user behavior or satisfaction. flag_binge: This is a critical feature to keep because it captures a highly relevant user behavior. Even though it's 0.0% in this specific dataset, this flag is a powerful signal that would be highly predictive for modeling engagement, churn, or subscription upgrades. It's essential to include it for future data where binge-watching does occur.

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
