<a href="https://colab.research.google.com/github/bulut19/mgmt467-analytics-portfolio/blob/main/Lab4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [3]:
# # EXAMPLE (from LLM) — Auth + Project/Region (commented; write your own cell using the prompt)
# # from google.colab import auth
# # auth.authenticate_user()
# #
# # import os
# # PROJECT_ID = input("Enter your GCP Project ID: ").strip()
# # REGION = "us-central1"  # keep consistent; change if instructed
# # os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
# # print("Project:", PROJECT_ID, "| Region:", REGION)
# #
# # # Set active project for gcloud/BigQuery CLI
# # !gcloud config set project $GOOGLE_CLOUD_PROJECT
# # !gcloud config get-value project
# # # Done: Auth + Project/Region set

In [7]:
# Authenticates the Colab environment to Google Cloud, allowing access to GCP services.
from google.colab import auth
auth.authenticate_user()

# Prompts the user to enter their Google Cloud Project ID.
import os
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
# Sets the default region for Google Cloud services.
REGION = "us-central1"

# Exports the PROJECT_ID and REGION as environment variables.
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["REGION"] = REGION


# Sets the active project for the gcloud command-line tool.
!gcloud config set project $GOOGLE_CLOUD_PROJECT
# Prints the configured project and region for verification.
print(f"Project: {PROJECT_ID} | Region: {REGION}")

# Done: Auth + Project/Region set

Enter your GCP Project ID: boxwood-veld-471119-r6
Updated property [core/project].
Project: boxwood-veld-471119-r6 | Region: us-central1


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [8]:
# Print the active project
!gcloud config get-value project

# Echo the set region
import os
print(f"Region: {os.environ.get('REGION')}")

boxwood-veld-471119-r6
Region: us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

For consistency, to make sure all later commands use the same project and region. If not, commands might default to wrong projects or regions, causing errors when resources aren't found where expected, creating resources in wrong locations, and issues if reproducibility when you try to run the code in a different environment.

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [None]:
# # EXAMPLE (from LLM) — Kaggle setup (commented)
# # from google.colab import files
# # print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
# # uploaded = files.upload()
# #
# # import os
# # os.makedirs('/root/.kaggle', exist_ok=True)
# # with open('/root/.kaggle/kaggle.json', 'wb') as f:
# #     f.write(uploaded[list(uploaded.keys())[0]])
# # os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only
# #
# # !kaggle --version

In [9]:
# Prompt the user to upload their kaggle.json file
from google.colab import files
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()

# Create the .kaggle directory and save the kaggle.json file
import os
os.makedirs('/root/.kaggle', exist_ok=True)
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])

# Set restrictive permissions (owner-only read/write) for security
os.chmod('/root/.kaggle/kaggle.json', 0o600)

# Verify the Kaggle installation by printing the version
!kaggle --version

# This setup ensures your Kaggle API key is stored securely and the environment is ready for reproducible downloads.

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [10]:
# Verify Kaggle CLI is ready
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

So that only the owner of the file can read and write to it. API tokens grant access to your account so you are avoiding the risk of unauthroized access by attackers, accidental exposure, and credential compromise from someone else performing action on your behalf.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [None]:
# # EXAMPLE (from LLM) — Download & unzip (commented)
# # !mkdir -p /content/data/raw
# # !kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data
# # !unzip -o /content/data/*.zip -d /content/data/raw
# # # List CSV inventory
# # !ls -lh /content/data/raw/*.csv

In [11]:
# Create the directory to store raw data
!mkdir -p /content/data/raw

# Download the dataset using the Kaggle CLI to /content/data
# The -p flag specifies the download path
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

# Unzip the downloaded file into the raw data directory
# The -o flag allows overwriting existing files
!unzip -o /content/data/*.zip -d /content/data/raw

# List all CSV files in the raw data directory with their sizes in a human-readable format
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 562MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root 

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [12]:
import glob
import os

csv_files = glob.glob('/content/data/raw/*.csv')

# Assert that there are exactly 6 CSV files
assert len(csv_files) == 6, f"Expected 6 CSV files, but found {len(csv_files)}"

print("Found exactly 6 CSV files:")
for csv_file in csv_files:
    print(os.path.basename(csv_file))

Found exactly 6 CSV files:
recommendation_logs.csv
search_logs.csv
movies.csv
watch_history.csv
reviews.csv
users.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

So that there is a clean recoord of the raw data we start working with, and that we can easily reproduce results by using the same input data. It's also a good practice for easier troubleshooting and documentation.

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [None]:
# # EXAMPLE (from LLM) — GCS staging (commented)
# # import uuid, os
# # bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
# # os.environ["BUCKET_NAME"] = bucket_name
# # !gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION
# # !gcloud storage cp /content/data/raw/* gs://$BUCKET_NAME/netflix/
# # print("Bucket:", bucket_name)
# # # Verify contents
# # !gcloud storage ls gs://$BUCKET_NAME/netflix/

In [13]:
import uuid
import os

# Generate a unique bucket name with a random suffix
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Create the GCS bucket in the specified region
print(f"Creating bucket: {bucket_name} in region: {os.environ['REGION']}")
!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION

# Upload all CSV files from the raw data directory to the 'netflix' folder in the bucket
print(f"Uploading CSVs to gs://{bucket_name}/netflix/")
!gcloud storage cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# Print the bucket name for verification
print("\nSuccessfully created bucket and uploaded files.")
print("Bucket Name:", bucket_name)

# Explain the benefits of staging data in GCS
print("""
Benefits of staging data in GCS:
- **Centralized Storage:** Provides a single, accessible location for your data.
- **Version Control:** GCS offers object versioning, allowing you to track changes and revert if needed.
- **Scalability and Durability:** GCS is highly scalable and designed for high durability.
- **Integration with GCP Services:** Seamlessly integrates with services like BigQuery, Dataproc, and AI Platform.
- **Cost-Effective:** Generally cost-effective for storing large amounts of data.
- **Reproducibility:** Provides a stable and addressable source for data, improving workflow reproducibility.
""")

Creating bucket: mgmt467-netflix-61e49b23 in region: us-central1
Creating gs://mgmt467-netflix-61e49b23/...
Uploading CSVs to gs://mgmt467-netflix-61e49b23/netflix/
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-61e49b23/netflix/movies.csv
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-61e49b23/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-61e49b23/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-61e49b23/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-61e49b23/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-61e49b23/netflix/watch_history.csv

Average throughput: 60.8MiB/s

Successfully created bucket and uploaded files.
Bucket Name: mgmt467-netflix-61e49b23

Benefits of staging data in GCS:
- **Centralized Storage:** Provides a single, accessible 

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [14]:
# List the contents of the netflix/ prefix in the GCS bucket with object sizes
!gcloud storage ls -l gs://$BUCKET_NAME/netflix/

    115942  2025-10-09T19:40:20Z  gs://mgmt467-netflix-61e49b23/netflix/movies.csv
   4695557  2025-10-09T19:40:20Z  gs://mgmt467-netflix-61e49b23/netflix/recommendation_logs.csv
   1861942  2025-10-09T19:40:20Z  gs://mgmt467-netflix-61e49b23/netflix/reviews.csv
   2250902  2025-10-09T19:40:20Z  gs://mgmt467-netflix-61e49b23/netflix/search_logs.csv
   1606820  2025-10-09T19:40:20Z  gs://mgmt467-netflix-61e49b23/netflix/users.csv
   9269425  2025-10-09T19:40:20Z  gs://mgmt467-netflix-61e49b23/netflix/watch_history.csv
TOTAL: 6 objects, 19800588 bytes (18.88MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

1. Loading data into BigQuery directly from GCS is faster and more scalable, especially for large datasets.
2. Staging in GCP is also better becaus eyou are loading from a stable location instead of temporary file on a locak machine, so the loading process is more reproducible and easier to audit.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [None]:
# # EXAMPLE (from LLM) — BigQuery dataset (commented)
# # DATASET="netflix"
# # # Attempt to create; ignore if exists
# # !bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

In [None]:
# # EXAMPLE (from LLM) — Load tables (commented)
# # tables = {
# #   "users": "users.csv",
# #   "movies": "movies.csv",
# #   "watch_history": "watch_history.csv",
# #   "recommendation_logs": "recommendation_logs.csv",
# #   "search_logs": "search_logs.csv",
# #   "reviews": "reviews.csv",
# # }
# # import os
# # for tbl, fname in tables.items():
# #   src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
# #   print("Loading", tbl, "from", src)
# #   !bq load --skip_leading_rows=1 --autodetect --source_format=CSV $DATASET.$tbl $src
# #
# # # Row counts
# # for tbl in tables.keys():
# #   !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `${GOOGLE_CLOUD_PROJECT}.netflix.{tbl}`".format(tbl=tbl)

In [15]:
DATASET="netflix"
# Attempt to create; ignore if exists and print a friendly message
!bq --location=US mk -d --description "Netflix dataset for MGMT467" $DATASET 2> /dev/null || echo "Dataset '$DATASET' may already exist."

Dataset 'boxwood-veld-471119-r6:netflix' successfully created.


In [31]:
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}

import os
DATASET="netflix" # Ensure DATASET is defined
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] # Get PROJECT_ID from environment

for tbl, fname in tables.items():
  src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
  print(f"Loading {tbl} from {src}")
  # Use bq load command with specified flags
  !bq load --skip_leading_rows=1 --autodetect --source_format=CSV {DATASET}.{tbl} {src}

# Row counts for verification
print("\nRow counts for loaded tables:")
for tbl in tables.keys():
  # Escape the backticks with backslashes
  !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM \`{PROJECT_ID}.{DATASET}.{tbl}\`"

Loading users from gs://mgmt467-netflix-61e49b23/netflix/users.csv
Waiting on bqjob_ref15040e8af8b8b_00000199ca92971e_1 ... (1s) Current status: DONE   
Loading movies from gs://mgmt467-netflix-61e49b23/netflix/movies.csv
Waiting on bqjob_r2237168453284a19_00000199ca92ada2_1 ... (1s) Current status: DONE   
Loading watch_history from gs://mgmt467-netflix-61e49b23/netflix/watch_history.csv
Waiting on bqjob_r70a7ab0c1aabe68e_00000199ca92c56b_1 ... (2s) Current status: DONE   
Loading recommendation_logs from gs://mgmt467-netflix-61e49b23/netflix/recommendation_logs.csv
Waiting on bqjob_r2d10d32f905ded70_00000199ca92e0d3_1 ... (1s) Current status: DONE   
Loading search_logs from gs://mgmt467-netflix-61e49b23/netflix/search_logs.csv
Waiting on bqjob_r2538b176059f26cf_00000199ca92fa80_1 ... (1s) Current status: DONE   
Loading reviews from gs://mgmt467-netflix-61e49b23/netflix/reviews.csv
Waiting on bqjob_rf8b4c504e84aba8_00000199ca93109c_1 ... (1s) Current status: DONE   

Row counts for 

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [32]:
# Query BigQuery metadata to get row counts for all tables in the dataset
!bq query --nouse_legacy_sql 'SELECT table_id AS table_name, row_count FROM `{os.environ["GOOGLE_CLOUD_PROJECT"]}.netflix.__TABLES__` WHERE table_id IN ("users", "movies", "watch_history", "recommendation_logs", "search_logs", "reviews")'

Waiting on bqjob_r26b9c124e03daa2f_00000199ca939851_1 ... (0s) Current status: DONE   
+---------------------+-----------+
|     table_name      | row_count |
+---------------------+-----------+
| movies              |      5200 |
| recommendation_logs |    260000 |
| reviews             |     77250 |
| search_logs         |    132500 |
| users               |     51500 |
| watch_history       |    525000 |
+---------------------+-----------+


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

Autodetect is acceptable in initial exploration when you are not fully sure of the schema yet. It is also okey when you data source has a very predictable structure where autodetect can correctly infer the schema.

Though you should enforce explicit schemas when producing data pipelines and critical datasets to ensure data quality and consistency without incorrect type inference errors. Autodetect can also sometimes make incorrect inferences, especially with mixed data types or ambiguous date formats. Hence explicit schemas give you control over how these situations are handled.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [None]:
# # EXAMPLE (from LLM) — Missingness profile (commented)
# # -- Users: % missing per column
# # WITH base AS (
# #   SELECT COUNT(*) n,
# #          COUNTIF(region IS NULL) miss_region,
# #          COUNTIF(plan_tier IS NULL) miss_plan,
# #          COUNTIF(age_band IS NULL) miss_age
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# # )
# # SELECT n,
# #        ROUND(100*miss_region/n,2) AS pct_missing_region,
# #        ROUND(100*miss_plan/n,2)   AS pct_missing_plan_tier,
# #        ROUND(100*miss_age/n,2)    AS pct_missing_age_band
# # FROM base;

In [None]:
# # EXAMPLE (from LLM) — MAR by region (commented)
# # SELECT region,
# #        COUNT(*) AS n,
# #        ROUND(100*COUNTIF(plan_tier IS NULL)/COUNT(*),2) AS pct_missing_plan_tier
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`
# # GROUP BY region
# # ORDER BY pct_missing_plan_tier DESC;

In [34]:
# First, let's check what columns actually exist in the users table
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

# Check the schema of the users table
!bq show --schema --format=prettyjson {project_id}:netflix.users

[
  {
    "mode": "NULLABLE",
    "name": "user_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "email",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "first_name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "last_name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "age",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "gender",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "country",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "state_province",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "city",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "subscription_plan",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "subscription_start_date",
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "is_active",
    "type": "BOOLEAN"
  },
  {
    "mode": "NULLABLE",
    "name": "month

In [40]:
# Cell 1: Total rows and % missing in country, subscription_plan, age
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM \\`{project_id}.netflix.users\\`
)
SELECT n,
       ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base
"""

!bq query --nouse_legacy_sql "{sql_query}"

+-------+---------------------+-------------------------------+-----------------+
|   n   | pct_missing_country | pct_missing_subscription_plan | pct_missing_age |
+-------+---------------------+-------------------------------+-----------------+
| 51500 |                 0.0 |                           0.0 |           11.93 |
+-------+---------------------+-------------------------------+-----------------+


In [39]:
# Cell 2: % subscription_plan missing by country (ordered descending)
# Investigate if missingness depends on another variable (country) - potential MAR
# MAR (Missing At Random): If subscription_plan missingness varies by country,
# it suggests the probability of being missing depends on the observed country value.
# This would indicate MAR rather than MCAR (Missing Completely At Random).
import os

sql_query = f"""
SELECT country,
       COUNT(*) AS n,
       ROUND(100*COUNTIF(subscription_plan IS NULL)/COUNT(*),2) AS pct_missing_subscription_plan
FROM \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.users\\`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC
"""

!bq query --nouse_legacy_sql "{sql_query}"

+---------+-------+-------------------------------+
| country |   n   | pct_missing_subscription_plan |
+---------+-------+-------------------------------+
| USA     | 36020 |                           0.0 |
| Canada  | 15480 |                           0.0 |
+---------+-------+-------------------------------+


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [41]:
# Verification: Print the three missingness percentages
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM \\`{project_id}.netflix.users\\`
)
SELECT ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base
"""

!bq query --nouse_legacy_sql "{sql_query}"

+---------------------+-------------------------------+-----------------+
| pct_missing_country | pct_missing_subscription_plan | pct_missing_age |
+---------------------+-------------------------------+-----------------+
|                 0.0 |                           0.0 |           11.93 |
+---------------------+-------------------------------+-----------------+


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

Age columns has the most missing values with 11.93% missing, country and subscription_plan have 0% missing.

**MCAR:** Could be MCAR if the missingness is completely random, could be random system error or data entry issue.

**MAR:** It could be MAR if the missingness is related to other observed variables. For example, maybe users with a certain subscription_plan or from a specific country are less likely to provide their age.

**MNAR:** It could be MNAR if the missingness is related to the unobserved age value itself. For example, maybe users who are very young or very old are less likely to report their age. This is a common scenario with sensitive demographic data.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [None]:
# # EXAMPLE (from LLM) — Detect duplicate groups (commented)
# # SELECT user_id, movie_id, event_ts, device_type, COUNT(*) AS dup_count
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history`
# # GROUP BY user_id, movie_id, event_ts, device_type
# # HAVING dup_count > 1
# # ORDER BY dup_count DESC
# # LIMIT 20;

In [None]:
# # EXAMPLE (from LLM) — Keep-one policy (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` AS
# # SELECT * EXCEPT(rk) FROM (
# #   SELECT h.*,
# #          ROW_NUMBER() OVER (
# #            PARTITION BY user_id, movie_id, event_ts, device_type
# #            ORDER BY progress_ratio DESC, minutes_watched DESC
# #          ) AS rk
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history` h
# # )
# # WHERE rk = 1;

In [43]:
# First, check the actual column names in watch_history
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

!bq show --schema --format=prettyjson {project_id}:netflix.watch_history

[
  {
    "mode": "NULLABLE",
    "name": "session_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "user_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "movie_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "watch_date",
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "device_type",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "watch_duration_minutes",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "progress_percentage",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "action",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "quality",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "location_country",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "is_download",
    "type": "BOOLEAN"
  },
  {
    "mode": "NULLABLE",
    "name": "user_rating",
    "type": "INTEGER"
  }
]


In [44]:
# Cell 1: Report duplicate groups on (user_id, movie_id, watch_date, device_type) with counts (top 20)
# This identifies rows that have identical combinations of these four columns
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT user_id,
       movie_id,
       watch_date,
       device_type,
       COUNT(*) AS duplicate_count
FROM \\`{project_id}.netflix.watch_history\\`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING COUNT(*) > 1
ORDER BY duplicate_count DESC
LIMIT 20
"""

!bq query --nouse_legacy_sql "{sql_query}"

+------------+------------+------------+-------------+-----------------+
|  user_id   |  movie_id  | watch_date | device_type | duplicate_count |
+------------+------------+------------+-------------+-----------------+
| user_03310 | movie_0640 | 2024-09-08 | Smart TV    |              20 |
| user_00391 | movie_0893 | 2024-08-26 | Laptop      |              20 |
| user_03898 | movie_0500 | 2025-07-29 | Desktop     |              15 |
| user_04513 | movie_0564 | 2024-06-11 | Mobile      |              15 |
| user_03043 | movie_0465 | 2024-02-03 | Laptop      |              15 |
| user_08157 | movie_0729 | 2025-10-26 | Laptop      |              15 |
| user_06554 | movie_0505 | 2025-10-02 | Laptop      |              15 |
| user_01580 | movie_0984 | 2025-06-25 | Mobile      |              15 |
| user_02976 | movie_0987 | 2024-09-19 | Desktop     |              15 |
| user_03408 | movie_0146 | 2025-06-02 | Desktop     |              15 |
| user_05629 | movie_0697 | 2025-01-23 | Desktop   

In [45]:
# Cell 2: Create deduplicated watch_history table
# Strategy: Keep one row per (user_id, movie_id, watch_date, device_type) group
# Preference: Higher progress_percentage first, then higher watch_duration_minutes
# This removes duplicates while preserving the most complete viewing record
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
CREATE OR REPLACE TABLE \\`{project_id}.netflix.watch_history_dedup\\` AS
SELECT * EXCEPT(row_num)
FROM (
  SELECT *,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS row_num
  FROM \\`{project_id}.netflix.watch_history\\`
)
WHERE row_num = 1
"""

!bq query --nouse_legacy_sql "{sql_query}"

Waiting on bqjob_r49f7a0312d565bfc_00000199cab4f41d_1 ... (1s) Current status: DONE   
Created boxwood-veld-471119-r6.netflix.watch_history_dedup



### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [47]:
# Verification: Compare row counts before and after deduplication
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT
  'Original' AS table_name,
  COUNT(*) AS row_count
FROM \\`{project_id}.netflix.watch_history\\`

UNION ALL

SELECT
  'Deduplicated' AS table_name,
  COUNT(*) AS row_count
FROM \\`{project_id}.netflix.watch_history_dedup\\`

UNION ALL

SELECT
  'Duplicates Removed' AS table_name,
  (SELECT COUNT(*) FROM \\`{project_id}.netflix.watch_history\\`) -
  (SELECT COUNT(*) FROM \\`{project_id}.netflix.watch_history_dedup\\`) AS row_count
"""

!bq query --nouse_legacy_sql "{sql_query}"

+--------------------+-----------+
|     table_name     | row_count |
+--------------------+-----------+
| Original           |    525000 |
| Duplicates Removed |    425000 |
| Deduplicated       |    100000 |
+--------------------+-----------+


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

**Natural duplicates** occur when users legitimately engage with content multiple times. For example, watching the same movie in different sessions throughout the day, switching between devices mid-viewing, or multiple household members using a shared account. **System-generated duplicate** on the other hand stem from technical problems like ETL pipeline failures that reload data multiple times, application bugs that log events redundantly, network retry logic creating duplicate API calls, or improper data synchronization across multiple sources without idempotency checks.

Duplicates distort labels and KPIS because it inflates engagement statistics.If one person watches a movie but it gets recorded three times, that's 300% fake engagement. This breaks recommendation algorithms because they think certain movies are more popular than they actually are, leading to bad suggestions.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [None]:
# # EXAMPLE (from LLM) — IQR outlier rate (commented)
# # WITH dist AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(1)] AS q1,
# #     APPROX_QUANTILES(minutes_watched, 4)[OFFSET(3)] AS q3
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # bounds AS (
# #   SELECT q1, q3, (q3-q1) AS iqr,
# #          q1 - 1.5*(q3-q1) AS lo,
# #          q3 + 1.5*(q3-q1) AS hi
# #   FROM dist
# # )
# # SELECT
# #   COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi) AS outliers,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(h.minutes_watched < b.lo OR h.minutes_watched > b.hi)/COUNT(*),2) AS pct_outliers
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h
# # CROSS JOIN bounds b;

In [None]:
# # EXAMPLE (from LLM) — Winsorize + quantiles (commented)
# # CREATE OR REPLACE TABLE `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust` AS
# # WITH q AS (
# #   SELECT
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(1)]  AS p01,
# #     APPROX_QUANTILES(minutes_watched, 100)[OFFSET(98)] AS p99
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # )
# # SELECT
# #   h.*,
# #   GREATEST(q.p01, LEAST(q.p99, h.minutes_watched)) AS minutes_watched_capped
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup` h, q;
# #
# # -- Quantiles before vs after
# # WITH before AS (
# #   SELECT 'before' AS which, APPROX_QUANTILES(minutes_watched, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_dedup`
# # ),
# # after AS (
# #   SELECT 'after' AS which, APPROX_QUANTILES(minutes_watched_capped, 5) AS q
# #   FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`
# # )
# # SELECT * FROM before UNION ALL SELECT * FROM after;

In [50]:
# Compute IQR bounds and report % outliers for watch_duration_minutes
import os

sql_query = f"""
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\\`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\\` h
CROSS JOIN bounds b
"""

!bq query --nouse_legacy_sql "{sql_query}"

+----------+--------+--------------+
| outliers | total  | pct_outliers |
+----------+--------+--------------+
|     3433 | 100000 |         3.43 |
+----------+--------+--------------+


In [52]:
# Create watch_history_robust with watch_duration_minutes_capped at P01/P99
# Return quantile summaries before/after capping
import os

sql_create_table = f"""
CREATE OR REPLACE TABLE \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust\\` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
  FROM \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\\`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\\` h, q
"""

!bq query --nouse_legacy_sql "{sql_create_table}"

print("\nQuantiles before vs after capping:")

sql_quantiles = f"""
WITH before AS (
  SELECT 'before' AS which, APPROX_QUANTILES(watch_duration_minutes, 5) AS q
  FROM \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\\`
),
after AS (
  SELECT 'after' AS which, APPROX_QUANTILES(watch_duration_minutes_capped, 5) AS q
  FROM \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust\\`
)
SELECT * FROM before UNION ALL SELECT * FROM after
"""

!bq query --nouse_legacy_sql "{sql_quantiles}"

Waiting on bqjob_r5221cdbad57c2fff_00000199cac04207_1 ... (2s) Current status: DONE   
Created boxwood-veld-471119-r6.netflix.watch_history_robust


Quantiles before vs after capping:
+--------+---------------------------------------------+
| which  |                      q                      |
+--------+---------------------------------------------+
| before | ["0.2","24.9","41.7","61.4","91.7","799.3"] |
| after  | ["4.4","24.6","41.5","61.5","92.0","366.0"] |
+--------+---------------------------------------------+


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [53]:
# Verification: Show min/median/max before vs after capping
import os

sql_query = f"""
SELECT
  'Before (Original)' AS version,
  MIN(watch_duration_minutes) AS min_val,
  APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_val,
  MAX(watch_duration_minutes) AS max_val
FROM \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_dedup\\`

UNION ALL

SELECT
  'After (Capped)' AS version,
  MIN(watch_duration_minutes_capped) AS min_val,
  APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_val,
  MAX(watch_duration_minutes_capped) AS max_val
FROM \\`{os.environ['GOOGLE_CLOUD_PROJECT']}.netflix.watch_history_robust\\`

ORDER BY version DESC
"""

!bq query --nouse_legacy_sql "{sql_query}"

+-------------------+---------+------------+---------+
|      version      | min_val | median_val | max_val |
+-------------------+---------+------------+---------+
| Before (Original) |     0.2 |       51.2 |   799.3 |
| After (Capped)    |     4.4 |       51.4 |   366.0 |
+-------------------+---------+------------+---------+


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

Capping might be harmfull if the outliers are actually meaningful and represent significant events. If the data naturally follows a distribution with extreme values, then capping might also misrepresent the true nature of the data and lead to biased conclusions

Decision trees and random forests are less sensetive to outliers as they work by recursively partitioning the data based on feature values. When splitting a node, they look for a threshold that best separates the data, so outliers might end up in their own small leaf nodes or split, but won't influence overall structure of the tree.


### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [None]:
# # EXAMPLE (from LLM) — flag_binge (commented)
# # SELECT
# #   COUNTIF(minutes_watched > 8*60) AS sessions_over_8h,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(minutes_watched > 8*60)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`;

In [None]:
# # EXAMPLE (from LLM) — flag_age_extreme (commented)
# # SELECT
# #   COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #           CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100) AS extreme_age_rows,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) < 10 OR
# #                     CAST(REGEXP_EXTRACT(age_band, r'\d+') AS INT64) > 100)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`;

In [None]:
# # EXAMPLE (from LLM) — flag_duration_anomaly (commented)
# # SELECT
# #   COUNTIF(duration_min < 15) AS titles_under_15m,
# #   COUNTIF(duration_min > 8*60) AS titles_over_8h,
# #   COUNT(*) AS total
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.movies`;

In [54]:
# Cell 1: Create and summarize flag_binge for sessions > 8 hours (480 minutes)
# Identifies potential binge-watching behavior in watch_history_robust
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT
  COUNTIF(watch_duration_minutes_capped > 480) AS binge_sessions,
  COUNT(*) AS total_sessions,
  ROUND(100 * COUNTIF(watch_duration_minutes_capped > 480) / COUNT(*), 2) AS pct_binge,
  AVG(CASE WHEN watch_duration_minutes_capped > 480 THEN watch_duration_minutes_capped END) AS avg_binge_duration,
  MAX(watch_duration_minutes_capped) AS max_binge_duration
FROM \\`{project_id}.netflix.watch_history_robust\\`
"""

!bq query --nouse_legacy_sql "{sql_query}"

+----------------+----------------+-----------+--------------------+--------------------+
| binge_sessions | total_sessions | pct_binge | avg_binge_duration | max_binge_duration |
+----------------+----------------+-----------+--------------------+--------------------+
|              0 |         100000 |       0.0 |               NULL |              366.0 |
+----------------+----------------+-----------+--------------------+--------------------+


In [56]:
# Cell 2: Create and summarize flag_age_extreme for users with age <10 or >100
# Flag extreme/suspicious ages that may indicate data quality issues
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_users,
  COUNT(*) AS total_users_with_age,
  ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2) AS pct_extreme_age,
  COUNTIF(age < 10) AS users_under_10,
  COUNTIF(age > 100) AS users_over_100,
  MIN(age) AS min_age,
  MAX(age) AS max_age
FROM \\`{project_id}.netflix.users\\`
WHERE age IS NOT NULL
"""

!bq query --nouse_legacy_sql "{sql_query}"

+-------------------+----------------------+-----------------+----------------+----------------+---------+---------+
| extreme_age_users | total_users_with_age | pct_extreme_age | users_under_10 | users_over_100 | min_age | max_age |
+-------------------+----------------------+-----------------+----------------+----------------+---------+---------+
|               895 |                45355 |            1.97 |            840 |             55 |    -7.0 |   109.0 |
+-------------------+----------------------+-----------------+----------------+----------------+---------+---------+


In [60]:
# Cell 3: Compute and summarize flag_duration_anomaly for movies with durations < 15 or > 480 minutes
# Flag movies with potentially anomalous durations (very short or very long)
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT
  COUNTIF(duration_minutes < 15 OR duration_minutes > 480) AS duration_anomalies,
  COUNT(*) AS total_movies,
  ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2)
    AS pct_duration_anomalies,
  COUNTIF(duration_minutes < 15)  AS movies_under_15_min,
  COUNTIF(duration_minutes > 480) AS movies_over_480_min
FROM \\`{project_id}.netflix.movies\\`
WHERE duration_minutes IS NOT NULL
"""

!bq query --nouse_legacy_sql "{sql_query}"

+--------------------+--------------+------------------------+---------------------+---------------------+
| duration_anomalies | total_movies | pct_duration_anomalies | movies_under_15_min | movies_over_480_min |
+--------------------+--------------+------------------------+---------------------+---------------------+
|                115 |         5200 |                   2.21 |                  60 |                  55 |
+--------------------+--------------+------------------------+---------------------+---------------------+


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [63]:
# Compact summary of all data quality and behavioral flags (binge, age, duration)
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_query = f"""
SELECT 'flag_binge' AS flag_name,
       ROUND(100 * COUNTIF(watch_duration_minutes_capped > 480) / COUNT(*), 2) AS pct_of_rows
FROM \\`{project_id}.netflix.watch_history_robust\\`

UNION ALL

SELECT 'flag_age_extreme' AS flag_name,
       ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2) AS pct_of_rows
FROM \\`{project_id}.netflix.users\\`
WHERE age IS NOT NULL

UNION ALL

SELECT 'flag_duration_anomaly' AS flag_name,
       ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2) AS pct_of_rows
FROM \\`{project_id}.netflix.movies\\`
WHERE duration_minutes IS NOT NULL
"""

!bq query --nouse_legacy_sql "{sql_query}"



+-----------------------+-------------+
|       flag_name       | pct_of_rows |
+-----------------------+-------------+
| flag_binge            |         0.0 |
| flag_age_extreme      |        1.97 |
| flag_duration_anomaly |        2.21 |
+-----------------------+-------------+


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

`flag_duration_anomaly` is the most common anomaly flag with 2.21% of movies having durations outside the typical range. I would keep this flag as a data quality feature since it reflects unusual content characteristics that can affect recommendation accuracy. However, I would also keep `flag_binge` as a feature because it captures a highly relevant user behavior which that can help predict engagement, churn, or subscription upgrades. Even though it currently shows 0.0%, it remains valuable for future datasets where binge sessions are more common.

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
