# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [1]:
# Authenticate to Google Cloud
from google.colab import auth
auth.authenticate_user()

import os
# Prompt for project ID and set region
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "us-central1"  # default region

# Export project ID as environment variable for gcloud
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["REGION"] = REGION
print("Project:", PROJECT_ID, "| Region:", REGION)

# Set active project for gcloud/BigQuery CLI
# This ensures subsequent gcloud commands use the correct project
!gcloud config set project $GOOGLE_CLOUD_PROJECT
# Verify the project is set
!gcloud config get-value project

# Done: Auth + Project/Region set

Enter your GCP Project ID: mgmt467-472519
Project: mgmt467-472519 | Region: us-central1
Updated property [core/project].
mgmt467-472519


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [3]:
# Verify the active project
!gcloud config get-value project

# Echo the set region
import os
print("REGION:", os.environ.get("REGION", "REGION not set"))

mgmt467-472519
REGION: us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

Region and Project_ID are set at the top to prevent errors later in the process if we forget to set.

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [4]:
# Prompt the user to upload their kaggle.json file
# This file contains your Kaggle API credentials.
# It's important to keep this file secure.
from google.colab import files
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()

# Save the uploaded file to the correct directory and set permissions
# This ensures the Kaggle CLI can find your credentials.
# Setting permissions to 0600 makes the file readable/writable only by the owner,
# which is crucial for security and reproducibility.
import os
os.makedirs('/root/.kaggle', exist_ok=True)
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])
os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only read/write permissions

# Verify the Kaggle CLI is installed and configured correctly
# This helps ensure the next steps involving the Kaggle API will work.
!kaggle --version

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [6]:
# Verify Kaggle CLI is ready
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [7]:
# Create a directory to store the raw data
# This ensures a consistent location for downloaded files.
!mkdir -p /content/data/raw

# Download the dataset using the Kaggle CLI
# The dataset is downloaded to the /content/data directory.
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

# Unzip the downloaded dataset into the raw data directory
# The -o flag ensures existing files are overwritten if present.
!unzip -o /content/data/*.zip -d /content/data/raw

# List all CSV files in the raw data directory with their sizes
# This provides a quick inventory of the downloaded files.
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 1.03GB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [9]:
# Verify there are exactly six CSV files and print their names
import glob
csv_files = glob.glob('/content/data/raw/*.csv')
print("Found", len(csv_files), "CSV files:")
for f in sorted(csv_files):
    print(f)
assert len(csv_files) == 6, f"Expected 6 CSV files, but found {len(csv_files)}"

Found 6 CSV files:
/content/data/raw/movies.csv
/content/data/raw/recommendation_logs.csv
/content/data/raw/reviews.csv
/content/data/raw/search_logs.csv
/content/data/raw/users.csv
/content/data/raw/watch_history.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [10]:
import uuid, os
# Create a unique bucket name
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Create the GCS bucket
# Buckets are globally unique and reside in a specific region.
!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION

# Upload all CSV files to the bucket
# Staging data in GCS provides a stable, versionable source for loading into BigQuery.
# This is more reliable than loading directly from a temporary Colab environment.
!gcloud storage cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# Print the bucket name
print("Created and uploaded files to GCS bucket:", bucket_name)

# Verify contents
!gcloud storage ls gs://$BUCKET_NAME/netflix/

Creating gs://mgmt467-netflix-22ade7ad/...
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-22ade7ad/netflix/movies.csv
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-22ade7ad/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-22ade7ad/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-22ade7ad/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-22ade7ad/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-22ade7ad/netflix/watch_history.csv

Average throughput: 23.6MiB/s
Created and uploaded files to GCS bucket: mgmt467-netflix-22ade7ad
gs://mgmt467-netflix-22ade7ad/netflix/movies.csv
gs://mgmt467-netflix-22ade7ad/netflix/recommendation_logs.csv
gs://mgmt467-netflix-22ade7ad/netflix/reviews.csv
gs://mgmt467-netflix-22ade7ad/netflix/search_logs.csv
gs://mgmt467-netfl

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [12]:
# List objects in the bucket with details including size
# This verifies the files were uploaded correctly to the specified prefix.
!gcloud storage ls -l gs://$BUCKET_NAME/netflix/

    115942  2025-10-26T19:20:55Z  gs://mgmt467-netflix-22ade7ad/netflix/movies.csv
   4695557  2025-10-26T19:20:56Z  gs://mgmt467-netflix-22ade7ad/netflix/recommendation_logs.csv
   1861942  2025-10-26T19:20:55Z  gs://mgmt467-netflix-22ade7ad/netflix/reviews.csv
   2250902  2025-10-26T19:20:56Z  gs://mgmt467-netflix-22ade7ad/netflix/search_logs.csv
   1606820  2025-10-26T19:20:55Z  gs://mgmt467-netflix-22ade7ad/netflix/users.csv
   9269425  2025-10-26T19:20:56Z  gs://mgmt467-netflix-22ade7ad/netflix/watch_history.csv
TOTAL: 6 objects, 19800588 bytes (18.88MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [14]:
# Load data into BigQuery tables from GCS
# `--skip_leading_rows=1` skips the header row.
# `--autodetect` allows BigQuery to infer the schema.
# `--source_format=CSV` specifies the file format.
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}
import os
DATASET="netflix" # Define DATASET variable
for tbl, fname in tables.items():
  src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
  print(f"Loading {tbl} from {src}")
  !bq load --skip_leading_rows=1 --autodetect --source_format=CSV {DATASET}.{tbl} {src}

# Row counts for verification
print("\nVerifying row counts:")
from google.cloud import bigquery
client = bigquery.Client(project=os.environ['GOOGLE_CLOUD_PROJECT'])

for tbl in tables.keys():
  query = f"SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.{tbl}`"
  query_job = client.query(query)
  results = query_job.result()
  for row in results:
    print(f"Table: {row.table_name}, Row Count: {row.n}")

Loading users from gs://mgmt467-netflix-22ade7ad/netflix/users.csv
Waiting on bqjob_r732d0fb5819d49c5_0000019a21f78eb4_1 ... (1s) Current status: DONE   
Loading movies from gs://mgmt467-netflix-22ade7ad/netflix/movies.csv
Waiting on bqjob_ra9ffbecbec8faaa_0000019a21f7a5d3_1 ... (1s) Current status: DONE   
Loading watch_history from gs://mgmt467-netflix-22ade7ad/netflix/watch_history.csv
Waiting on bqjob_r7bfaa7f8555b606a_0000019a21f7bacb_1 ... (2s) Current status: DONE   
Loading recommendation_logs from gs://mgmt467-netflix-22ade7ad/netflix/recommendation_logs.csv
Waiting on bqjob_r4a2cd0276ada247b_0000019a21f7d514_1 ... (3s) Current status: DONE   
Loading search_logs from gs://mgmt467-netflix-22ade7ad/netflix/search_logs.csv
Waiting on bqjob_r5d0be98f7d040bd0_0000019a21f7f259_1 ... (1s) Current status: DONE   
Loading reviews from gs://mgmt467-netflix-22ade7ad/netflix/reviews.csv
Waiting on bqjob_r70c8074c81c577a7_0000019a21f80748_1 ... (1s) Current status: DONE   

Verifying row 

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [15]:
from google.cloud import bigquery
import os

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.users`
UNION ALL
SELECT 'movies' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.movies`
UNION ALL
SELECT 'watch_history' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.search_logs`
UNION ALL
SELECT 'reviews' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.reviews`
"""

query_job = client.query(query)
results = query_job.result()

print("Row counts for tables in netflix dataset:")
for row in results:
    print(f"Table: {row.table_name}, Row Count: {row.row_count}")

Row counts for tables in netflix dataset:
Table: reviews, Row Count: 154500
Table: recommendation_logs, Row Count: 520000
Table: search_logs, Row Count: 265000
Table: movies, Row Count: 10400
Table: watch_history, Row Count: 1050000
Table: users, Row Count: 103000


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [16]:
from google.cloud import bigquery
import os

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

# Query 1: Total rows and % missing per column
query_missingness_profile = f"""
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_region,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `{project_id}.netflix.users`
)
SELECT n,
       ROUND(100*miss_region/n,2) AS pct_missing_region,
       ROUND(100*miss_plan/n,2)   AS pct_missing_plan_tier,
       ROUND(100*miss_age/n,2)    AS pct_missing_age_band
FROM base;
"""

print("Missingness Profile (users table):")
query_job_profile = client.query(query_missingness_profile)
results_profile = query_job_profile.result()

for row in results_profile:
    print(f"Total Rows: {row.n}")
    print(f"Pct Missing Region: {row.pct_missing_region}%")
    print(f"Pct Missing Plan Tier: {row.pct_missing_plan_tier}%")
    print(f"Pct Missing Age Band: {row.pct_missing_age_band}%")

Missingness Profile (users table):
Total Rows: 103000
Pct Missing Region: 0.0%
Pct Missing Plan Tier: 0.0%
Pct Missing Age Band: 11.93%


In [17]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

# Query 2: % plan_tier missing by region (checking for MAR)
query_mar_by_region = f"""
-- % plan_tier missing by region ordered descending (checking for MAR - Missing At Random)
-- This query helps identify if the missingness of 'plan_tier' is dependent on the 'region'.
SELECT country,
       COUNT(*) AS n,
       ROUND(100*COUNTIF(subscription_plan IS NULL)/COUNT(*),2) AS pct_missing_plan_tier
FROM `{project_id}.netflix.users`
GROUP BY country
ORDER BY pct_missing_plan_tier DESC;
"""

print("\n% Plan Tier Missing by Region:")
query_job_mar = client.query(query_mar_by_region)
results_mar = query_job_mar.result()

for row in results_mar:
    print(f"Region: {row.country}, Total Rows: {row.n}, Pct Missing Plan Tier: {row.pct_missing_plan_tier}%")


% Plan Tier Missing by Region:
Region: Canada, Total Rows: 30960, Pct Missing Plan Tier: 0.0%
Region: USA, Total Rows: 72040, Pct Missing Plan Tier: 0.0%


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [18]:
from google.cloud import bigquery
import os

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

# Query to print the three missingness percentages
query_missingness_summary = f"""
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_region,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `{project_id}.netflix.users`
)
SELECT
       ROUND(100*miss_region/n,2) AS pct_missing_region,
       ROUND(100*miss_plan/n,2)   AS pct_missing_plan_tier,
       ROUND(100*miss_age/n,2)    AS pct_missing_age_band
FROM base;
"""

print("Missingness Percentages:")
query_job_summary = client.query(query_missingness_summary)
results_summary = query_job_summary.result()

for row in results_summary:
    print(f"Pct Missing Region: {row.pct_missing_region}%")
    print(f"Pct Missing Plan Tier: {row.pct_missing_plan_tier}%")
    print(f"Pct Missing Age Band: {row.pct_missing_age_band}%")

Missingness Percentages:
Pct Missing Region: 0.0%
Pct Missing Plan Tier: 0.0%
Pct Missing Age Band: 11.93%


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [20]:
duplicate_grp_query = f"""-- Find duplicate groups in Netflix watch history
-- Grouping by (user_id, movie_id, event_ts, device_type)
-- Count how many rows exist per group, then filter to only duplicates
-- Show the top 20 groups with the highest duplicate counts

SELECT
  user_id,
  movie_id,
  watch_date,
  device_type,
  COUNT(*) AS duplicate_count
FROM
  `{project_id}.netflix.watch_history`
GROUP BY
  user_id, movie_id, watch_date, device_type
HAVING
  COUNT(*) > 1
ORDER BY
  duplicate_count DESC
LIMIT 20;
"""

duplicate_grp_query_job = client.query(duplicate_grp_query)
duplicate_grp_results = duplicate_grp_query_job.result()
for row in duplicate_grp_results:
    print(f"User ID: {row.user_id}")
    print(f"Movie ID: {row.movie_id}")
    print(f"Watch Date: {row.watch_date}")
    print(f"Device Type: {row.device_type}")
    print(f"Duplicate Count: {row.duplicate_count}")
    print("\n")

User ID: user_03310
Movie ID: movie_0640
Watch Date: 2024-09-08
Device Type: Smart TV
Duplicate Count: 40


User ID: user_00391
Movie ID: movie_0893
Watch Date: 2024-08-26
Device Type: Laptop
Duplicate Count: 40


User ID: user_03043
Movie ID: movie_0465
Watch Date: 2024-02-03
Device Type: Laptop
Duplicate Count: 30


User ID: user_04899
Movie ID: movie_0142
Watch Date: 2025-01-20
Device Type: Desktop
Duplicate Count: 30


User ID: user_01143
Movie ID: movie_0166
Watch Date: 2024-05-28
Device Type: Laptop
Duplicate Count: 30


User ID: user_06799
Movie ID: movie_0458
Watch Date: 2024-08-15
Device Type: Desktop
Duplicate Count: 30


User ID: user_09564
Movie ID: movie_0552
Watch Date: 2025-01-11
Device Type: Laptop
Duplicate Count: 30


User ID: user_02126
Movie ID: movie_0642
Watch Date: 2025-02-09
Device Type: Desktop
Duplicate Count: 30


User ID: user_03348
Movie ID: movie_0688
Watch Date: 2024-01-22
Device Type: Desktop
Duplicate Count: 30


User ID: user_00564
Movie ID: movie_0234

In [22]:
dedup_query = f"""
-- Create a deduplicated version of watch_history
-- Keep only one row per (user_id, movie_id, event_ts, device_type)
-- Preference order:
--   1. Higher progress_ratio
--   2. If tie, higher minutes_watched
-- Use QUALIFY with ROW_NUMBER for deterministic selection

CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_dedup` AS
SELECT
  *
FROM
  `{project_id}.netflix.watch_history`
QUALIFY
  ROW_NUMBER() OVER (
    PARTITION BY user_id, movie_id, watch_date, device_type
    ORDER BY progress_percentage DESC, watch_duration_minutes DESC
  ) = 1;

"""
dedup_query_job = client.query(dedup_query)
dedup_results = dedup_query_job.result()
for row in dedup_results:
    print(row)

### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [23]:
dedup_verification_query = f"""
-- Validation query: compare row counts before and after deduplication
-- Shows how many rows existed originally vs. in the deduplicated table
-- Also calculates how many rows were removed

WITH before_count AS (
  SELECT COUNT(*) AS total_rows
  FROM {project_id}.netflix.`watch_history`
),
after_count AS (
  SELECT COUNT(*) AS total_rows
  FROM {project_id}.netflix.`watch_history_dedup`
)
SELECT
  b.total_rows AS before_count,
  a.total_rows AS after_count,
  b.total_rows - a.total_rows AS rows_removed
FROM before_count b
CROSS JOIN after_count a;
"""

dedup_verification_query_job = client.query(dedup_verification_query)
dedup_verification_results = dedup_verification_query_job.result()
for row in dedup_verification_results:
    print(f"Before Count: {row.before_count}")
    print(f"After Count: {row.after_count}")
    print(f"Rows Removed: {row.rows_removed}")

Before Count: 1050000
After Count: 100000
Rows Removed: 950000


#### **Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

Duplicates can arise from various sources, both natural and system-generated.

**Natural duplicates** might occur from user behavior, like accidentally submitting a form twice, or from real-world events being recorded multiple times through different channels.

**System-generated duplicates** are often due to errors in data pipelines, ETL processes, or database merges where records are not properly identified and consolidated. Issues like retries on failed API calls or faulty data ingestion scripts can also create duplicates.

Duplicates can significantly corrupt labels and KPIs:

- **Labels:** If you're building a model to predict user engagement (e.g., watch time), duplicate watch history entries will artificially inflate the engagement metric, leading to inaccurate labels and a biased model.
- **KPIs:** Business metrics like "total minutes watched" or "number of active users" will be inflated by duplicates, giving a false picture of user behavior and business performance. This can lead to poor decision-making based on inaccurate data.

Deduplication is a crucial step to ensure the integrity of your data for analysis and modeling.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [25]:
iqr_query = f"""
-- Compute Q1, Q3, and IQR for minutes_watched
-- Derive lower/upper bounds using Tukey's rule (Q1 - 1.5*IQR, Q3 + 1.5*IQR)
-- Report the percentage of rows outside these bounds as outliers

WITH stats AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(25)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(75)] AS q3,
    COUNT(*) AS total_rows
  FROM {project_id}.netflix.`watch_history_dedup`
),
bounds AS (
  SELECT
    q1,
    q3,
    q3 - q1 AS iqr,
    q1 - 1.5 * (q3 - q1) AS lower_bound,
    q3 + 1.5 * (q3 - q1) AS upper_bound,
    total_rows
  FROM stats
),
outliers AS (
  SELECT
    COUNTIF(watch_duration_minutes < lower_bound OR watch_duration_minutes > upper_bound) AS outlier_count
  FROM {project_id}.netflix.`watch_history_dedup`, bounds
)
SELECT
  b.q1,
  b.q3,
  b.iqr,
  b.lower_bound,
  b.upper_bound,
  o.outlier_count,
  b.total_rows,
  SAFE_DIVIDE(o.outlier_count, b.total_rows) * 100 AS pct_outliers
FROM bounds b
CROSS JOIN outliers o;
"""
iqr_query_job = client.query(iqr_query)
iqr_results = iqr_query_job.result()
for row in iqr_results:
    print(f"Q1: {row.q1}")
    print(f"Q3: {row.q3}")
    print(f"IQR: {row.iqr}")
    print(f"Lower Bound: {row.lower_bound}")
    print(f"Upper Bound: {row.upper_bound}")
    print(f"Outlier Count: {row.outlier_count}")
    print(f"Total Rows: {row.total_rows}")
    print(f"Percentage Outliers: {row.pct_outliers}%")
    print("\n")

Q1: 29.1
Q3: 82.4
IQR: 53.300000000000004
Lower Bound: -50.85
Upper Bound: 162.35000000000002
Outlier Count: 3531
Total Rows: 100000
Percentage Outliers: 3.531%




In [27]:
quantile_query = f"""
-- Step 1: Compute P01 and P99 for minutes_watched
-- Step 2: Create watch_history_robust with capped values
-- Step 3: Return quantile summaries before/after capping for comparison

-- Create the robust table with capped minutes_watched
CREATE OR REPLACE TABLE {project_id}.netflix.`watch_history_robust` AS
WITH percentiles AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)] AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
  FROM {project_id}.netflix.`watch_history_dedup`
)
SELECT
  wh.*,
  LEAST(GREATEST(watch_duration_minutes, p01), p99) AS minutes_watched_capped
FROM {project_id}.netflix.`watch_history_dedup` wh
CROSS JOIN percentiles;

-- Summaries before/after capping (deciles shown for readability)
WITH before AS (
  SELECT 'before' AS stage,
    APPROX_QUANTILES(watch_duration_minutes, 10) AS deciles
  FROM {project_id}.netflix.`watch_history_dedup`
),
after AS (
  SELECT 'after' AS stage,
    APPROX_QUANTILES(minutes_watched_capped, 10) AS deciles
  FROM {project_id}.netflix.`watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after;
"""
quantile_query_job = client.query(quantile_query)
quantile_results = quantile_query_job.result()
for row in quantile_results:
    print(f"Stage: {row.stage}")
    print(f"Deciles: {row.deciles}")
    print("\n")


Stage: after
Deciles: [4.4, 16.1, 24.9, 33.2, 41.8, 51.3, 61.9, 74.6, 91.6, 120.7, 366.0]


Stage: before
Deciles: [0.2, 16.1, 24.9, 33.1, 41.9, 51.1, 61.5, 74.7, 91.8, 121.1, 799.3]




### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [29]:
binge_query = f"""
-- Flag sessions where minutes_watched exceeds 8 hours (480 minutes)
-- Summarize count and percentage of flagged binge sessions

WITH base AS (
  SELECT
    *,
    minutes_watched_capped > 480 AS flag_binge
  FROM {project_id}.netflix.`watch_history_robust`
)
SELECT
  COUNTIF(flag_binge) AS binge_count,
  COUNT(*) AS total_rows,
  SAFE_DIVIDE(COUNTIF(flag_binge), COUNT(*)) * 100 AS pct_binge
FROM base;
"""
binge_query_job = client.query(binge_query)
binge_results = binge_query_job.result()
for row in binge_results:
    print(f"Binge Count: {row.binge_count}")
    print(f"Total Rows: {row.total_rows}")
    print(f"Percentage Binge: {row.pct_binge}%")

Binge Count: 0
Total Rows: 100000
Percentage Binge: 0.0%


In [40]:
extreme_age_query = f"""
-- Flag extreme ages in users table
-- Extreme defined as age < 10 or age > 100
-- Summarize count and percentage of flagged rows

WITH flagged AS (
  SELECT
    *,
    (age < 10 OR age > 100) AS flag_age_extreme
  FROM {project_id}.netflix.`users`
)
SELECT
  COUNTIF(flag_age_extreme) AS extreme_age_count,
  COUNT(*) AS total_rows,
  SAFE_DIVIDE(COUNTIF(flag_age_extreme), COUNT(*)) * 100 AS pct_extreme_age
FROM flagged;
"""
extreme_age_query_job = client.query(extreme_age_query)
extreme_age_results = extreme_age_query_job.result()
for row in extreme_age_results:
    print(f"Extreme Age Count: {row.extreme_age_count}")
    print(f"Total Rows: {row.total_rows}")
    print(f"Percentage Extreme Age: {row.pct_extreme_age}%")

Extreme Age Count: 1790
Total Rows: 103000
Percentage Extreme Age: 1.7378640776699028%


In [36]:
duration_query = f"""
-- Flag movies with implausible durations (<15 minutes or >480 minutes)
-- Summarize count and percentage of anomalies

WITH flagged AS (
  SELECT
    *,
    (duration_minutes < 15 OR duration_minutes > 480) AS flag_duration_anomaly
  FROM {project_id}.netflix.`movies`
)
SELECT
  COUNTIF(flag_duration_anomaly) AS anomaly_count,
  COUNT(*) AS total_rows,
  SAFE_DIVIDE(COUNTIF(flag_duration_anomaly), COUNT(*)) * 100 AS pct_anomaly
FROM flagged;
"""
duration_query_job = client.query(duration_query)
duration_results = duration_query_job.result()
for row in duration_results:
    print(f"Anomaly Count: {row.anomaly_count}")
    print(f"Total Rows: {row.total_rows}")
    print(f"Percentage Anomaly: {row.pct_anomaly}%")

Anomaly Count: 230
Total Rows: 10400
Percentage Anomaly: 2.2115384615384617%


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [38]:
anomaly_query = f"""
-- Compact anomaly summary: returns flag_name and pct_of_rows for each flag

WITH binge AS (
  SELECT
    'flag_binge' AS flag_name,
    SAFE_DIVIDE(COUNTIF(minutes_watched_capped > 480), COUNT(*)) * 100 AS pct_of_rows
  FROM {project_id}.netflix.`watch_history_robust`
),
age_extreme AS (
  SELECT
    'flag_age_extreme' AS flag_name,
    SAFE_DIVIDE(
      COUNTIF(age < 10 OR age > 100),
      COUNT(*)
    ) * 100 AS pct_of_rows
  FROM {project_id}.netflix.`users`
),
duration_anomaly AS (
  SELECT
    'flag_duration_anomaly' AS flag_name,
    SAFE_DIVIDE(COUNTIF(duration_minutes < 15 OR duration_minutes > 480), COUNT(*)) * 100 AS pct_of_rows
  FROM {project_id}.netflix.`movies`
)
SELECT * FROM binge
UNION ALL
SELECT * FROM age_extreme
UNION ALL
SELECT * FROM duration_anomaly;
"""
anomaly_query_job = client.query(anomaly_query)
anomaly_results = anomaly_query_job.result()
for row in anomaly_results:
    print(f"Flag Name: {row.flag_name}")
    print(f"Percentage of Rows Flagged: {row.pct_of_rows}%")
    print("\n")

Flag Name: flag_binge
Percentage of Rows Flagged: 0.0%


Flag Name: flag_age_extreme
Percentage of Rows Flagged: 1.7378640776699028%


Flag Name: flag_duration_anomaly
Percentage of Rows Flagged: 2.2115384615384617%




**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
