# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [None]:
from google.colab import auth
auth.authenticate_user()

import os
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "us-central1"  # keep consistent; change if instructed
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["REGION"] = REGION # Export the REGION environment variable
print("Project:", PROJECT_ID, "| Region:", REGION)

# Set active project for gcloud/BigQuery CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT
!gcloud config get-value project
# Done: Auth + Project/Region set

Enter your GCP Project ID: mgmt-467-55510 
Project: mgmt-467-55510 | Region: us-central1
Updated property [core/project].
mgmt-467-55510


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [None]:
# Print the active project and region
!gcloud config get-value project
import os
print("Region:", os.environ.get("REGION")) # Access the exported REGION environment variable

mgmt-467-55510
Region: us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

We would like to establish the project id and region at the top of the code sequence to ensure that the pipeline flows smoothly and no issues occur in the next steps. As the sequence gets more complex when using more products and services, it is important to keep all of the information repeatable and consistent so establishing it ahead of time will help with this.

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [None]:
from google.colab import files
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()

import os
os.makedirs('/root/.kaggle', exist_ok=True)
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])
os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only

!kaggle --version

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [None]:
#Verifying the CLI is ready
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

We require strict '0600' permissions on API tokens so that only the owner of the file can read and write to it. This helps avoid risks that may expose your API token if there is a misconfiguration of your permissions and avoid attacks from unauthorized access.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [None]:
# # EXAMPLE (from LLM) — Download & unzip (commented)
# # !mkdir -p /content/data/raw
# # !kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data
# # !unzip -o /content/data/*.zip -d /content/data/raw
# # # List CSV inventory
# # !ls -lh /content/data/raw/*.csv

In [None]:
# Create directory for raw data
!mkdir -p /content/data/raw

# Download the dataset using Kaggle CLI
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

# Unzip the downloaded dataset into the raw data directory
!unzip -o /content/data/*.zip -d /content/data/raw

# List all CSV files in the raw data directory with their sizes
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 610MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root 

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [None]:
# Asserting that there are exactly six CSV files and printing their names
import glob
csv_files = glob.glob('/content/data/raw/*.csv')
assert len(csv_files) == 6, f"Expected 6 CSV files, but found {len(csv_files)}"
print("Found the following CSV files:")
for csv_file in csv_files:
    print(csv_file)

Found the following CSV files:
/content/data/raw/movies.csv
/content/data/raw/watch_history.csv
/content/data/raw/search_logs.csv
/content/data/raw/users.csv
/content/data/raw/recommendation_logs.csv
/content/data/raw/reviews.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

It is important to stay organized so it is easier to locate data files later on in the process when you may have a large amount of files to take care of. Additionally distinguishing files by names clearly will help ensure you are using the right data and that it does not disrupt the pipeline. It also to be noted that these files can be uploaded by students but not deleted or edited as easily due to restricted permissions/legal right issues with GCS.

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [None]:
import uuid
import os

# Create a unique bucket name
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Create the GCS bucket
print(f"Creating bucket: gs://{bucket_name} in region {os.environ['REGION']}")
!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION

# Upload all CSVs to the bucket
print(f"Uploading CSV files to gs://{bucket_name}/netflix/")
!gcloud storage cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# Print the bucket name
print("\nData staged in GCS bucket:", bucket_name)

# Explain staging benefits
print("""
Benefits of staging data in GCS:
- Durable and highly available storage.
- Versioning for data changes.
- Scalable and cost-effective.
- Acts as a single source of truth for downstream processing (e.g., loading into BigQuery).
""")

# Verify contents
print(f"\nVerifying contents of gs://{bucket_name}/netflix/")
!gcloud storage ls gs://$BUCKET_NAME/netflix/

Creating bucket: gs://mgmt467-netflix-c7457bd7 in region us-central1
Creating gs://mgmt467-netflix-c7457bd7/...
Uploading CSV files to gs://mgmt467-netflix-c7457bd7/netflix/
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-c7457bd7/netflix/movies.csv
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-c7457bd7/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-c7457bd7/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-c7457bd7/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-c7457bd7/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-c7457bd7/netflix/watch_history.csv

Average throughput: 47.8MiB/s

Data staged in GCS bucket: mgmt467-netflix-c7457bd7

Benefits of staging data in GCS:
- Durable and highly available storage.
- Versioning for data changes.
- Scalable 

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [None]:
# List the contents of the netflix/ prefix and show object sizes (corrected command)
!gcloud storage ls --readable-sizes gs://$BUCKET_NAME/netflix/

gs://mgmt467-netflix-14a64d4b/netflix/movies.csv
gs://mgmt467-netflix-14a64d4b/netflix/recommendation_logs.csv
gs://mgmt467-netflix-14a64d4b/netflix/reviews.csv
gs://mgmt467-netflix-14a64d4b/netflix/search_logs.csv
gs://mgmt467-netflix-14a64d4b/netflix/users.csv
gs://mgmt467-netflix-14a64d4b/netflix/watch_history.csv


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

1. Durability and Accessibility - GCS has highly durable and storage with ease of access. Data stored in GCS is less likely to be lost in the Colab environment and can be easily found through various Google Cloud services and apps

2. Scalable and Efficient - GCS is designed for handling tasks at a massive scale. Loading data from GCS into BigQuery is quicker and efficient since it uses Google Cloud's internal network and directly pull data through optimized transfer systems

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [None]:
# # EXAMPLE (from LLM) — BigQuery dataset (commented)
# # DATASET="netflix"
# # # Attempt to create; ignore if exists
# # !bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

In [None]:
# Create (idempotently) dataset netflix in US multi-region
DATASET="netflix"
# Attempt to create; ignore if exists
!bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

Dataset 'mgmt-467-55510:netflix' successfully created.


In [None]:
# Load tables from gs://$BUCKET_NAME/netflix/
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}
import os
for tbl, fname in tables.items():
  src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
  dest = f"{DATASET}.{tbl}" # Corrected destination table format
  print("Loading", tbl, "from", src)
  !bq load --skip_leading_rows=1 --autodetect --source_format=CSV $dest $src # Corrected bq load command

# Row counts
for tbl in tables.keys():
  # Corrected bq query command syntax
  !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `{GOOGLE_CLOUD_PROJECT}.{DATASET}.{tbl}`".format(tbl=tbl)

Loading users from gs://mgmt467-netflix-14a64d4b/netflix/users.csv
Waiting on bqjob_r40cca239d9ae750a_00000199ca8e227b_1 ... (1s) Current status: DONE   
Loading movies from gs://mgmt467-netflix-14a64d4b/netflix/movies.csv
Waiting on bqjob_r7a9623f36ae47861_00000199ca8e43ce_1 ... (1s) Current status: DONE   
Loading watch_history from gs://mgmt467-netflix-14a64d4b/netflix/watch_history.csv
Waiting on bqjob_r307a23b11063a895_00000199ca8e651b_1 ... (2s) Current status: DONE   
Loading recommendation_logs from gs://mgmt467-netflix-14a64d4b/netflix/recommendation_logs.csv
Waiting on bqjob_r2a849e2e3db01cb7_00000199ca8e8c80_1 ... (1s) Current status: DONE   
Loading search_logs from gs://mgmt467-netflix-14a64d4b/netflix/search_logs.csv
Waiting on bqjob_r67f24dc187ea41a5_00000199ca8eaed5_1 ... (1s) Current status: DONE   
Loading reviews from gs://mgmt467-netflix-14a64d4b/netflix/reviews.csv
Waiting on bqjob_r4279b8615212452d_00000199ca8ed015_1 ... (1s) Current status: DONE   
/bin/bash: -c:

In [None]:
# # EXAMPLE (from LLM) — Load tables (commented)
# # tables = {
# #   "users": "users.csv",
# #   "movies": "movies.csv",
# #   "watch_history": "watch_history.csv",
# #   "recommendation_logs": "recommendation_logs.csv",
# #   "search_logs": "search_logs.csv",
# #   "reviews": "reviews.csv",
# # }
# # import os
# # for tbl, fname in tables.items():
# #   src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
# #   print("Loading", tbl, "from", src)
# #   !bq load --skip_leading_rows=1 --autodetect --source_format=CSV $DATASET.$tbl $src
# #
# # # Row counts
# # for tbl in tables.keys():
# #   !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `${GOOGLE_CLOUD_PROJECT}.netflix.{tbl}`".format(tbl=tbl)

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [None]:
import os

# Get row counts for all six tables using bq query and string formatting
query = """
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.users`
UNION ALL
SELECT 'movies' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.movies`
UNION ALL
SELECT 'watch_history' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.search_logs`
UNION ALL
SELECT 'reviews' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.reviews`
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("temp_query.sql", "w") as f:
    f.write(query)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < temp_query.sql

# Clean up the temporary file
os.remove("temp_query.sql")

+---------------------+-----------+
|     table_name      | row_count |
+---------------------+-----------+
| movies              |      4160 |
| users               |     41200 |
| recommendation_logs |    208000 |
| search_logs         |    106000 |
| watch_history       |    420000 |
| reviews             |     61800 |
+---------------------+-----------+


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

Autodetect is usually acceptable and most useful for initial data exploration and when the data you are observing is clean and well-formatted. It allows the user to get started right away with querying without needing to do any additional cleaning.

Explicit schemas should be enforced when the data is ambigious/inconsistent or if you are working with critical data for long periods of time. This prevents unexpected errors and the correct data types are being used. Additionally, explicit schemas are helpful for providing control and clarified data pipelines where consistency is essential.


## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [None]:
# # EXAMPLE (from LLM) — Missingness profile (commented)
# # -- Users: % missing per column
# # WITH base AS (
# #   SELECT COUNT(*) n,
# #          COUNTIF(region IS NULL) miss_region,
# #          COUNTIF(plan_tier IS NULL) miss_plan,
# #          COUNTIF(age_band IS NULL) miss_age
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# # )
# # SELECT n,
# #        ROUND(100*miss_region/n,2) AS pct_missing_region,
# #        ROUND(100*miss_plan/n,2)   AS pct_missing_plan_tier,
# #        ROUND(100*miss_age/n,2)    AS pct_missing_age_band
# # FROM base;

In [None]:
# # EXAMPLE (from LLM) — MAR by region (commented)
# # SELECT region,
# #        COUNT(*) AS n,
# #        ROUND(100*COUNTIF(plan_tier IS NULL)/COUNT(*),2) AS pct_missing_plan_tier
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# # GROUP BY region
# # ORDER BY pct_missing_plan_tier DESC;

In [None]:
import os

# Users: Total rows and % missing per column
query_missingness = """
WITH base AS (
  SELECT
    COUNT(*) AS n,
    COUNTIF(country IS NULL) AS miss_country,
    COUNTIF(subscription_plan IS NULL) AS miss_plan,
    COUNTIF(age IS NULL) AS miss_age
  FROM
    `{project_id}.netflix.users`
)
SELECT
  n,
  ROUND(100 * miss_country / n, 2) AS pct_missing_country,
  ROUND(100 * miss_plan / n, 2) AS pct_missing_subscription_plan,
  ROUND(100 * miss_age / n, 2) AS pct_missing_age
FROM
  base;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("missingness_query.sql", "w") as f:
    f.write(query_missingness)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < missingness_query.sql

# Clean up the temporary file
os.remove("missingness_query.sql")

+-------+---------------------+-------------------------------+-----------------+
|   n   | pct_missing_country | pct_missing_subscription_plan | pct_missing_age |
+-------+---------------------+-------------------------------+-----------------+
| 41200 |                 0.0 |                           0.0 |           11.93 |
+-------+---------------------+-------------------------------+-----------------+


In [None]:
import os

# % subscription_plan missing by country (potential MAR)
query_mar = """
SELECT
  country,
  COUNT(*) AS n,
  ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan
FROM
  `{project_id}.netflix.users`
GROUP BY
  country
ORDER BY
  pct_missing_subscription_plan DESC;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("mar_query.sql", "w") as f:
    f.write(query_mar)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < mar_query.sql

# Clean up the temporary file
os.remove("mar_query.sql")

+---------+-------+-------------------------------+
| country |   n   | pct_missing_subscription_plan |
+---------+-------+-------------------------------+
| Canada  | 12384 |                           0.0 |
| USA     | 28816 |                           0.0 |
+---------+-------+-------------------------------+


**Interpreting Missingness by Country (Potential MAR):**

The query above shows the percentage of missing `subscription_plan` values for each `country`. If the percentage of missing values for `subscription_plan` varies significantly across different `country` values, this could be an indicator of **Missing At Random (MAR)**.

MAR suggests that the probability of a value being missing is related to *other* observed variables in the dataset (in this case, `country`), but not to the value of the missing data itself. For example, if users in one country are significantly more likely to have a missing subscription plan than users in another country, the missingness is likely MAR and not MCAR (Missing Completely At Random).

Understanding if data is MAR is important because it can influence how you handle missing values during data cleaning and feature engineering for machine learning models. Ignoring MAR can lead to biased analyses and models.

### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [None]:
import os

# Verify the three missingness percentages
query = """
WITH base AS (
  SELECT
    COUNT(*) AS n,
    COUNTIF(country IS NULL) AS miss_country,
    COUNTIF(subscription_plan IS NULL) AS miss_plan,
    COUNTIF(age IS NULL) AS miss_age
  FROM
    `{project_id}.netflix.users`
)
SELECT
  ROUND(100 * miss_country / n, 2) AS pct_missing_country,
  ROUND(100 * miss_plan / n, 2) AS pct_missing_subscription_plan,
  ROUND(100 * miss_age / n, 2) AS pct_missing_age
FROM
  base;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("verification_query.sql", "w") as f:
    f.write(query)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < verification_query.sql

# Clean up the temporary file
os.remove("verification_query.sql")

+---------------------+-------------------------------+-----------------+
| pct_missing_country | pct_missing_subscription_plan | pct_missing_age |
+---------------------+-------------------------------+-----------------+
|                 0.0 |                           0.0 |           11.93 |
+---------------------+-------------------------------+-----------------+


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.


Based on the outputs recieved, the age column has the highest precentage of missing values (11.93%).

**MCAR (Missing Completely At Random)**: This would mean the missingness of age is completely random and not related to any other variables in the dataset (including age itself).

**MAR (Missing At Random)**: This would mean the missingness of age is related to other observed variables in the dataset, but not to the age value itself.

**MNAR (Missing Not At Random)**: This would mean the missingness of age is related to the unobserved age value itself.


In order to fully hypothesize MCAR/MAR/MNAR, more analysis would be needed.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [None]:
# # EXAMPLE (from LLM) — Detect duplicate groups (commented)
# # SELECT user_id, movie_id, event_ts, device_type, COUNT(*) AS dup_count
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history`
# # GROUP BY user_id, movie_id, event_ts, device_type
# # HAVING dup_count > 1
# # ORDER BY dup_count DESC
# # LIMIT 20;

In [None]:
# # EXAMPLE (from LLM) — Keep-one policy (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` AS
# # SELECT * EXCEPT(rk) FROM (
# #   SELECT h.*,
# #          ROW_NUMBER() OVER (
# #            PARTITION BY user_id, movie_id, event_ts, device_type
# #            ORDER BY progress_ratio DESC, minutes_watched DESC
# #          ) AS rk
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history` h
# # )
# # WHERE rk = 1;

In [None]:
import os

# Detect duplicate groups on key columns and report top 20
query_detect_duplicates = """
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{project_id}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("detect_duplicates_query.sql", "w") as f:
    f.write(query_detect_duplicates)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < detect_duplicates_query.sql

# Clean up the temporary file
os.remove("detect_duplicates_query.sql")

+------------+------------+------------+-------------+-----------+
|  user_id   |  movie_id  | watch_date | device_type | dup_count |
+------------+------------+------------+-------------+-----------+
| user_03310 | movie_0640 | 2024-09-08 | Smart TV    |        16 |
| user_00391 | movie_0893 | 2024-08-26 | Laptop      |        16 |
| user_01292 | movie_0231 | 2024-07-05 | Laptop      |        12 |
| user_03176 | movie_0534 | 2024-01-06 | Laptop      |        12 |
| user_01807 | movie_0921 | 2025-01-30 | Laptop      |        12 |
| user_03140 | movie_0205 | 2025-09-11 | Desktop     |        12 |
| user_06799 | movie_0458 | 2024-08-15 | Desktop     |        12 |
| user_01143 | movie_0166 | 2024-05-28 | Laptop      |        12 |
| user_02976 | movie_0987 | 2024-09-19 | Desktop     |        12 |
| user_03660 | movie_0109 | 2025-05-20 | Desktop     |        12 |
| user_09973 | movie_0342 | 2025-03-22 | Desktop     |        12 |
| user_08681 | movie_0332 | 2024-06-13 | Laptop      |        

In [None]:
import os

# Create a new table watch_history_dedup with one row per duplicate group
# Policy: Prefer higher progress_percentage, then higher watch_duration_minutes
query_dedup = """
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `{project_id}.netflix.watch_history` h
)
WHERE rk = 1;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("dedup_query.sql", "w") as f:
    f.write(query_dedup)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < dedup_query.sql

# Clean up the temporary file
os.remove("dedup_query.sql")

Replaced mgmt-467-55510.netflix.watch_history_dedup



### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [None]:
import os

# Query to compare row counts before and after deduplication
query = """
SELECT 'Original watch_history' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT 'Deduplicated watch_history_dedup' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history_dedup`;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("dedup_verification_query.sql", "w") as f:
    f.write(query)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < dedup_verification_query.sql

# Clean up the temporary file
os.remove("dedup_verification_query.sql")

+----------------------------------+-----------+
|            table_name            | row_count |
+----------------------------------+-----------+
| Original watch_history           |    420000 |
| Deduplicated watch_history_dedup |    100000 |
+----------------------------------+-----------+


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

Duplicates can arise from many sources.

Natural duplicates - Based on a user performing the same action multiple times (clicking something quickly repeatedly).

System-generated - Occurs due to errors in the overall pipeline or process. Network issues often appear due to this.

Duplicates can corrupt labels and KPIs through inflated counts of events, skewed distribution metrics, and misleading KPIs that are over or understated.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [None]:
import os

# Compute IQR bounds for watch_duration_minutes and report % outliers
query_outliers = """
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `{project_id}.netflix.watch_history_dedup`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM `{project_id}.netflix.watch_history_dedup` h
CROSS JOIN bounds b;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("outliers_query.sql", "w") as f:
    f.write(query_outliers)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < outliers_query.sql

# Clean up the temporary file
os.remove("outliers_query.sql")

+----------+--------+--------------+
| outliers | total  | pct_outliers |
+----------+--------+--------------+
|     3409 | 100000 |         3.41 |
+----------+--------+--------------+


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [None]:
import os

# Create watch_history_robust with minutes_watched_capped capped at P01/P99
# This step creates the table needed for the quantile summaries
query_create_robust = """
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
  FROM `{project_id}.netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS minutes_watched_capped
FROM `{project_id}.netflix.watch_history_dedup` h, q;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("create_robust_query.sql", "w") as f:
    f.write(query_create_robust)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < create_robust_query.sql

# Clean up the temporary file
os.remove("create_robust_query.sql")

print("--- Quantile Summaries Before/After Capping ---")

# Query to show min/median/max before vs after capping
query_quantile_summary = """
WITH
  before_capping AS (
    SELECT
      'Before Capping' AS stage,
      MIN(watch_duration_minutes) AS min_minutes,
      APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(50)] AS median_minutes,
      MAX(watch_duration_minutes) AS max_minutes
    FROM
      `{project_id}.netflix.watch_history_dedup`
  ),
  after_capping AS (
    SELECT
      'After Capping' AS stage,
      MIN(minutes_watched_capped) AS min_minutes,
      APPROX_QUANTILES(minutes_watched_capped, 100)[OFFSET(50)] AS median_minutes,
      MAX(minutes_watched_capped) AS max_minutes
    FROM
      `{project_id}.netflix.watch_history_robust`
  )
SELECT * FROM before_capping
UNION ALL
SELECT * FROM after_capping;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("quantile_summary_query.sql", "w") as f:
    f.write(query_quantile_summary)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < quantile_summary_query.sql

# Clean up the temporary file
os.remove("quantile_summary_query.sql")

Created mgmt-467-55510.netflix.watch_history_robust

--- Quantile Summaries Before/After Capping ---
+----------------+-------------+----------------+-------------+
|     stage      | min_minutes | median_minutes | max_minutes |
+----------------+-------------+----------------+-------------+
| Before Capping |         0.2 |           51.2 |       799.3 |
| After Capping  |         4.4 |           51.2 |       366.0 |
+----------------+-------------+----------------+-------------+


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

Capping is useful in handling extreme values to adjust data within a model. It can be harmful when the outliers are valuable to the data, the distribution is naturally heavy-tailed, and when you are aiming to interpret the impact of the original variable.

A less sensitive type of model would be anything tree-based. These models partition data based on feature values and focus on rank order or relative differences rather than the magnitude. They do not rely on calculating means, variances, or distances the same way since they bin the data in a way that capping wouldn't impact the model as much.


### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [2]:
from google.colab import auth
auth.authenticate_user()

import os
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "us-central1"  # keep consistent; change if instructed
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["REGION"] = REGION # Export the REGION environment variable
print("Project:", PROJECT_ID, "| Region:", REGION)

# Set active project for gcloud/BigQuery CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT
!gcloud config get-value project
# Done: Auth + Project/Region set

Enter your GCP Project ID: mgmt-467-55510
Project: mgmt-467-55510 | Region: us-central1
Updated property [core/project].
mgmt-467-55510


In [3]:
import os

# 1. Compute and summarize flag_binge for sessions > 8 hours in watch_history_robust
# This flag identifies potentially long viewing sessions.
query_binge = """
SELECT
  COUNTIF(minutes_watched_capped > 8*60) AS sessions_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(minutes_watched_capped > 8*60)/COUNT(*),2) AS pct_sessions_over_8h
FROM `{project_id}.netflix.watch_history_robust`;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("binge_query.sql", "w") as f:
    f.write(query_binge)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < binge_query.sql

# Clean up the temporary file
os.remove("binge_query.sql")

+------------------+--------+----------------------+
| sessions_over_8h | total  | pct_sessions_over_8h |
+------------------+--------+----------------------+
|                0 | 100000 |                  0.0 |
+------------------+--------+----------------------+


In [4]:
import os

# 2. Compute and summarize flag_age_extreme if age is <10 or >100 in the users table
# This flag identifies users with potentially extreme age values.
query_age_extreme = """
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_rows,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct_extreme_age
FROM `{project_id}.netflix.users`
WHERE age IS NOT NULL; -- Only consider rows where age is not missing
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("age_extreme_query.sql", "w") as f:
    f.write(query_age_extreme)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < age_extreme_query.sql

# Clean up the temporary file
os.remove("age_extreme_query.sql")

+------------------+-------+-----------------+
| extreme_age_rows | total | pct_extreme_age |
+------------------+-------+-----------------+
|              716 | 36284 |            1.97 |
+------------------+-------+-----------------+


In [5]:
import os

# 3. Compute and summarize flag_duration_anomaly where duration_minutes < 15 or > 480 in the movies table
# This flag identifies movies with potentially anomalous durations.
query_duration_anomaly = """
SELECT
  COUNTIF(duration_minutes < 15 OR duration_minutes > 480) AS duration_anomaly_rows,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_duration_anomaly
FROM `{project_id}.netflix.movies`
WHERE duration_minutes IS NOT NULL; -- Only consider rows where duration_minutes is not missing
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("duration_anomaly_query.sql", "w") as f:
    f.write(query_duration_anomaly)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < duration_anomaly_query.sql

# Clean up the temporary file
os.remove("duration_anomaly_query.sql")

+-----------------------+-------+----------------------+
| duration_anomaly_rows | total | pct_duration_anomaly |
+-----------------------+-------+----------------------+
|                    92 |  4160 |                 2.21 |
+-----------------------+-------+----------------------+


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [7]:
import os

# Generate a single compact summary query that returns two columns per flag: flag_name, pct_of_rows
query_summary = """
SELECT 'flag_binge' AS flag_name, ROUND(100*COUNTIF(minutes_watched_capped > 8*60)/COUNT(*),2) AS pct_of_rows FROM `{project_id}.netflix.watch_history_robust`
UNION ALL
SELECT 'flag_age_extreme' AS flag_name, ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct_of_rows FROM `{project_id}.netflix.users` WHERE age IS NOT NULL
UNION ALL
SELECT 'flag_duration_anomaly' AS flag_name, ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_of_rows FROM `{project_id}.netflix.movies` WHERE duration_minutes IS NOT NULL;
""".format(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

# Save the query to a temporary file
with open("anomaly_summary_query.sql", "w") as f:
    f.write(query_summary)

# Execute the query using bq query with the file
!bq query --nouse_legacy_sql --quiet < anomaly_summary_query.sql

# Clean up the temporary file
os.remove("anomaly_summary_query.sql")

+-----------------------+-------------+
|       flag_name       | pct_of_rows |
+-----------------------+-------------+
| flag_binge            |         0.0 |
| flag_age_extreme      |        1.97 |
| flag_duration_anomaly |        2.21 |
+-----------------------+-------------+


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

Based on the summary query output:

*   `flag_binge`: 0.0%
*   `flag_age_extreme`: 1.97%
*   `flag_duration_anomaly`: 2.21%

The most common anomaly flag is flag_duration_anomaly, indicating that a small percentage of movies have durations that fall outside the typical range (less than 15 minutes or more than 8 hours).

Which flag to keep as a feature depends on the specific business problem or machine learning task. However, `flag_age_extreme` and `flag_duration_anomaly` seem potentially more directly useful as features for many tasks compared to `flag_binge` (which is 0% in this dataset).

*   **`flag_age_extreme`:** This could be a useful feature for understanding user behavior or targeting. Users with extreme ages might have different viewing habits or content preferences. It could also be an indicator of potential data entry errors if the extreme ages are not plausible.
*   **`flag_duration_anomaly`:** This could be a valuable feature for recommendation systems or content analysis. Movies with unusually short or long durations might appeal to different audiences or indicate different content types (e.g., short films, documentaries).


Likely keep **`flag_age_extreme`** and **`flag_duration_anomaly`** as features, as they identify potentially distinct groups of users and content that could influence various downstream tasks.


## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


Here is a checklist for saving and submitting your work:

- [ ] Save this notebook to your team's shared Google Drive folder. Make sure the filename is clear and includes your team name or initials.
- [ ] Export your Data Quality (DQ) SQL queries into a single `.sql` file. You can copy the relevant SQL statements from the code cells in sections 5.1, 5.2, 5.3, and 5.4.
- [ ] Save the `.sql` file in your local repository.
- [ ] Push both the notebook (`.ipynb` file) and the `.sql` file to your team's GitHub repository. Write a descriptive commit message summarizing the work done (e.g., "Completed DQ profiling and cleaning for Netflix dataset").
- [ ] Add or update your team's README file in the GitHub repository. Include the following information:
    - Your GCP `PROJECT_ID`.
    - The `REGION` used for your resources.
    - The name of your GCS bucket (`BUCKET_NAME`).
    - The name of your BigQuery dataset (`DATASET`).
    - The row counts for all six tables in the `netflix` dataset obtained from the verification step in section 4.
    - A brief summary of the data quality issues found (missingness, duplicates, outliers, anomalies) and how you addressed them.

## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
