<a href="https://colab.research.google.com/github/akoripal/-mgmt467-analytics-portfolio/blob/main/MGMT467_PromptPlusExamples_nit2Lab4_AK(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [None]:
# 1) Authenticates to Google Cloud, setting up credentials for gcloud and client libraries.
from google.colab import auth
auth.authenticate_user()

# 2) Prompts the user for the GCP Project ID and sets a default region.
PROJECT_ID = input("Enter your Google Cloud Project ID: ")
REGION = "us-central1" # <--- EDIT THIS for a different region if needed.

# 3) Sets the GOOGLE_CLOUD_PROJECT environment variable for gcloud commands and APIs.
%env GOOGLE_CLOUD_PROJECT=$PROJECT_ID

# 4) Configures the gcloud CLI default project, making subsequent gcloud commands simpler.
!gcloud config set project $GOOGLE_CLOUD_PROJECT

# 5) Prints the configured values for confirmation.
print(f"\nSuccessfully set Project ID: {PROJECT_ID}")
print(f"Region set to: {REGION}")

# Done: Auth + Project/Region set

Enter your Google Cloud Project ID: mgmt467project
env: GOOGLE_CLOUD_PROJECT=mgmt467project
Updated property [core/project].

Successfully set Project ID: mgmt467project
Region set to: us-central1


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [None]:
# Print the currently active gcloud project configuration
!gcloud config get-value project

# Echo the REGION environment variable set in the previous cell
# Note: The REGION variable is only directly accessible in Python if it was set
# in the current Python environment (which it was not, only set in the previous
# Python session). We'll print the static value from the *previous* session's logic.

# In a real Colab notebook, if REGION was an environment variable, you'd use:
# !echo "Region: $REGION"

# Since REGION was a *Python variable* in the previous cell, we'll re-declare it
# here to satisfy the prompt's intent (if running in a fresh cell):
REGION = "us-central1"
print(f"Region (from previous logic): {REGION}")

mgmt467project
Region (from previous logic): us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

My Answer: The main reason we set both PROJECT_ID and REGION at the top is to make sure we dont have any errors or problems with initialization. Without it, commands will fail and could also lead to billing errors or unintended modifications. Similarly, defining the REGION at the top ensures all subsequent regional resources are consistently deployed to the same location. This prevents potential issues like increased data transfer costs and higher latency between service.

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [None]:
# --- Kaggle API Setup for Colab ---

# NOTE on Security: The 'kaggle.json' file contains sensitive credentials.
# The `chmod 600` command below restricts access to the file to the current user only.
import os
from google.colab import files

# 1. Prompts the user to upload kaggle.json file
print("Please upload your kaggle.json API key file.")
uploaded = files.upload()

if uploaded:
    # The file name is typically 'kaggle.json'
    # We must extract the file name from the dictionary keys.
    file_name = list(uploaded.keys())[0]

    # 2. Create the necessary directory and move the file
    !mkdir -p ~/.kaggle
    # FIX: Use f-string to inject the variable safely into the shell command
    !mv "{file_name}" ~/.kaggle/kaggle.json

    # 3. Set file permissions (0600 = read/write only for owner)
    # This is essential for security as it protects the API key.
    !chmod 600 ~/.kaggle/kaggle.json
    print("\nKaggle API key uploaded and secured.")

    # 4. Verify installation and authentication
    print("\nKaggle CLI version check:")
    !kaggle --version
else:
    print("No file uploaded. Kaggle setup aborted.")


Please upload your kaggle.json API key file.


Saving kaggle (2).json to kaggle (2).json

Kaggle API key uploaded and secured.

Kaggle CLI version check:
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [None]:
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

My Answer: We require strict 0600 permissions to ensure that only the owner can read and write the file. The number 0600 is a code that explicitly denies access to anyone else on the computer system, including other users or general processes. By doing this, we are primarily avoiding the risk of unauthorized data exposure, which could be very harmful.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [None]:
# --- 2) Download and Unzip Dataset ---

# Define a standard base path for data storage for auditing and clarity.
BASE_DIR = "/content/data"
RAW_DIR = f"{BASE_DIR}/raw"
DATASET_SLUG = "sayeeduddin/netflix-2025user-behavior-dataset-210k-records"

# 1. Create the target directory for raw, extracted files.
# The '-p' flag ensures that parent directories (like /content/data) are also created
# and prevents an error if the directory already exists.
print(f"Creating raw data directory: {RAW_DIR}")
!mkdir -p {RAW_DIR}

# 2. Download the dataset using the Kaggle CLI.
# '-d' specifies the dataset slug.
# '-p' specifies the download path for the ZIP file (will be saved to BASE_DIR).
print(f"Downloading dataset: {DATASET_SLUG}")
!kaggle datasets download -d {DATASET_SLUG} -p {BASE_DIR}

# 3. Unzip the downloaded file into the designated raw directory.
# '-o' flag ensures files are overwritten without prompting if they already exist,
# which aids in script reproducibility. The ZIP file is assumed to be the only one
# in the BASE_DIR.
print(f"Unzipping files into: {RAW_DIR}")
!unzip -o {BASE_DIR}/*.zip -d {RAW_DIR}

# 4. List the extracted CSV files and their human-readable sizes (du -h).
# 'column -t' formats the output into a neat, readable table.
print("\n--- Extracted CSV Files and Sizes ---")
!find {RAW_DIR} -name "*.csv" -exec du -h {} \; | column -t

# Done: Dataset downloaded, unzipped, and verified.

Creating raw data directory: /content/data/raw
Downloading dataset: sayeeduddin/netflix-2025user-behavior-dataset-210k-records
Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 742MB/s]
Unzipping files into: /content/data/raw
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  

--- Extracted CSV Files and Sizes ---
find: ‘{RAW_DIR}’: No such file or directory


### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [None]:
import glob
import os

# Define RAW_DIR based on the previous data preparation cell
RAW_DIR = "/content/data/raw"

# Use glob to find all CSV files in the raw data directory
CSV_FILES = glob.glob(os.path.join(RAW_DIR, "*.csv"))

print("--- Verifying CSV File Count and Listing ---")

# Assert that the number of found files is exactly six.
# If the condition fails, it raises an AssertionError with a descriptive message.
assert len(CSV_FILES) == 6, f"Expected 6 CSV files, but found {len(CSV_FILES)}."

print(f"SUCCESS: Found exactly {len(CSV_FILES)} CSV files.")
print("Files found:")

# Print the name of each file
for file_path in sorted(CSV_FILES):
    print(f"- {os.path.basename(file_path)}")

print("Verification complete.")

--- Verifying CSV File Count and Listing ---
SUCCESS: Found exactly 6 CSV files.
Files found:
- movies.csv
- recommendation_logs.csv
- reviews.csv
- search_logs.csv
- users.csv
- watch_history.csv
Verification complete.


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

My answer: Keeping a clean file inventory is very useful as it primarily lets you see if there are any errors by ensuring quality assurance. If a file is corrupted or not loaded in properly, a simple check of the file inventory would help you spot issues immdiately. It also helps in Reproducibility, as one can immediately reproduce results from the data as welll as share it with others more quickly.

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [None]:
# Create a unique GCS bucket, upload CSVs, and explain staging benefits
import os, random, string, subprocess

# Step 1: Generate a unique bucket name using random suffix
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "")
REGION = os.environ.get("REGION", "us-central1")
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
BUCKET_NAME = f"{PROJECT_ID}-data-bucket-{suffix}"

# Step 2: Create the bucket in the specified region
!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION --quiet

# Step 3: Export bucket name to environment variable
os.environ["BUCKET_NAME"] = BUCKET_NAME

# Step 4: Upload all CSV files from /content/data/raw to a folder "netflix" in the bucket
!gsutil -m cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# Step 5: Print confirmation and explain
print(f"✅ Bucket created: {BUCKET_NAME}")
print(f"📂 Files uploaded to: gs://{BUCKET_NAME}/netflix/")
print("\n⚙️  Staging benefits:")
print("- Centralizes data in Google Cloud Storage for scalable processing with BigQuery or Vertex AI.")
print("- Enables consistent, versioned data access for collaboration.")
print("- Simplifies pipeline reproducibility and automation across environments.")

# Comments:
# 1) Adds a random suffix to avoid naming conflicts across users/projects.
# 2) Uploads CSVs to a GCS bucket for scalable cloud-based data workflows.
# 3) Staging in GCS improves reproducibility, sharing, and integration with downstream GCP tools.
# Done: Bucket created, data staged to GCS.


Creating gs://mgmt467project-data-bucket-cdo7id/...
Copying file:///content/data/raw/movies.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/recommendation_logs.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/search_logs.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/users.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/watch_history.csv [Content-Type=text/csv]...
Copying file:///content/data/raw/reviews.csv [Content-Type=text/csv]...
| [6/6 files][ 18.9 MiB/ 18.9 MiB] 100% Done                                    
Operation completed over 6 objects/18.9 MiB.                                     
✅ Bucket created: mgmt467project-data-bucket-cdo7id
📂 Files uploaded to: gs://mgmt467project-data-bucket-cdo7id/netflix/

⚙️  Staging benefits:
- Centralizes data in Google Cloud Storage for scalable processing with BigQuery or Vertex AI.
- Enables consistent, versioned data access for collaboration.
- Simplifies pipeline reproducib

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [None]:
# --- GCS Upload Verification ---

# The BUCKET_NAME environment variable was set in the previous cell.

print("Listing objects and sizes in GCS bucket:")

# gsutil ls -lh:
# ls - lists objects (files).
# -l - prints long listing format (includes size, creation time).
# -h - prints sizes in human-readable format (e.g., 1.5M, 2.7G).
!gsutil ls -lh gs://$BUCKET_NAME/netflix/

Listing objects and sizes in GCS bucket:
113.22 KiB  2025-10-26T20:34:43Z  gs://mgmt467project-data-bucket-cdo7id/netflix/movies.csv
  4.48 MiB  2025-10-26T20:34:44Z  gs://mgmt467project-data-bucket-cdo7id/netflix/recommendation_logs.csv
  1.78 MiB  2025-10-26T20:34:45Z  gs://mgmt467project-data-bucket-cdo7id/netflix/reviews.csv
  2.15 MiB  2025-10-26T20:34:44Z  gs://mgmt467project-data-bucket-cdo7id/netflix/search_logs.csv
  1.53 MiB  2025-10-26T20:34:44Z  gs://mgmt467project-data-bucket-cdo7id/netflix/users.csv
  8.84 MiB  2025-10-26T20:34:44Z  gs://mgmt467project-data-bucket-cdo7id/netflix/watch_history.csv
TOTAL: 6 objects, 19800588 bytes (18.88 MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

My Answer: Staging data in GCS is a lot better than just keeping it in Colab because it’s more permanent and scalable. When you save files in Colab, they only exist for that session, and if your Colab disconnects everything in /content/ disappears.On the other hand,  when you upload data to GCS, it stays there safely no matter what. This also means your data can handle much larger sizes without worrying about running out of Colab’s limited space.

Another big advantage is that GCS makes it easier to collaborate and automate your projects. When working on a group project, teammates can access the same files from GCS instead of having to upload their own copies. Plus, GCS connects directly to other Google Cloud tools like BigQuery, Vertex AI, and Dataflow, which makes it simple to build end-to-end pipelines for data analysis or model training.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [None]:
# EXAMPLE (from LLM) — BigQuery dataset (commented)
DATASET="netflix"
# Attempt to create; ignore if exists
!bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

BigQuery error in mk operation: Dataset 'mgmt467project:netflix' already exists.
Dataset may already exist.


In [None]:
# 🎬 Cell B: Load CSVs from GCS into BigQuery tables and verify row counts
tables = [
    "users", "movies", "watch_history",
    "recommendation_logs", "search_logs", "reviews"
]

for table in tables:
    print(f"⏳ Loading {table}.csv into BigQuery...")
    !bq load \
        --autodetect \
        --source_format=CSV \
        --skip_leading_rows=1 \
        netflix.{table} \
        gs://$BUCKET_NAME/netflix/{table}.csv
    print(f"✅ Finished loading {table}\n")

# Verify row counts for each table
print("📊 Row counts per table:")
for table in tables:
    print(f"▶️ {table}:")
    !bq query --nouse_legacy_sql "SELECT COUNT(*) AS row_count FROM netflix.{table}"
    print()

# Comments:
# - Cell A ensures the BigQuery dataset 'netflix' exists, avoiding recreation errors.
# - Cell B loads all CSV files from the GCS staging bucket, automatically inferring schema.
# - Final queries confirm successful ingestion by showing row counts per table.


⏳ Loading users.csv into BigQuery...
Waiting on bqjob_r59208cc22d0d4771_0000019a223b2c4c_1 ... (1s) Current status: DONE   
✅ Finished loading users

⏳ Loading movies.csv into BigQuery...
Waiting on bqjob_r2500823fc6e629f9_0000019a223b42cc_1 ... (1s) Current status: DONE   
✅ Finished loading movies

⏳ Loading watch_history.csv into BigQuery...
Waiting on bqjob_r6cb14bbd38c733ec_0000019a223b583b_1 ... (2s) Current status: DONE   
✅ Finished loading watch_history

⏳ Loading recommendation_logs.csv into BigQuery...
Waiting on bqjob_r52518a1a1330d0c4_0000019a223b735c_1 ... (2s) Current status: DONE   
✅ Finished loading recommendation_logs

⏳ Loading search_logs.csv into BigQuery...
Waiting on bqjob_r515d143f7dacba33_0000019a223b8e05_1 ... (1s) Current status: DONE   
✅ Finished loading search_logs

⏳ Loading reviews.csv into BigQuery...
Waiting on bqjob_r37953a71d46e7717_0000019a223ba402_1 ... (1s) Current status: DONE   
✅ Finished loading reviews

📊 Row counts per table:
▶️ users:
+---

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [None]:
-- ✅ Row counts for all six tables in the 'netflix' dataset
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `mgmt467project.netflix.users`
UNION ALL
SELECT 'movies', COUNT(*) FROM `mgmt467project.netflix.movies`
UNION ALL
SELECT 'watch_history', COUNT(*) FROM `mgmt467project.netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs', COUNT(*) FROM `mgmt467project.netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs', COUNT(*) FROM `mgmt467project.netflix.search_logs`
UNION ALL
SELECT 'reviews', COUNT(*) FROM `mgmt467project.netflix.reviews`;

SyntaxError: invalid character '✅' (U+2705) (ipython-input-4112722615.py, line 1)

In [None]:
# ✅ Run row-count verification query for all Netflix tables
from google.cloud import bigquery
import pandas as pd
import os

# Replace with your real project ID
PROJECT_ID = "mgmt467project"  # 👈 use your actual project name exactly as in BigQuery

# Initialize BigQuery client
client = bigquery.Client(project=PROJECT_ID)

# SQL query (no ${} syntax — use f-string to insert project ID)
query = f"""
-- Row counts for all six tables in the 'netflix' dataset
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `{PROJECT_ID}.netflix.users`
UNION ALL
SELECT 'movies', COUNT(*) FROM `{PROJECT_ID}.netflix.movies`
UNION ALL
SELECT 'watch_history', COUNT(*) FROM `{PROJECT_ID}.netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs', COUNT(*) FROM `{PROJECT_ID}.netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs', COUNT(*) FROM `{PROJECT_ID}.netflix.search_logs`
UNION ALL
SELECT 'reviews', COUNT(*) FROM `{PROJECT_ID}.netflix.reviews`;
"""

# Run query and store results in a DataFrame
df = client.query(query).to_dataframe()

# Display nicely
print("✅ Row counts for each table in the 'netflix' dataset:")
display(df)


✅ Row counts for each table in the 'netflix' dataset:


Unnamed: 0,table_name,row_count
0,users,20600
1,search_logs,53000
2,movies,2080
3,recommendation_logs,104000
4,reviews,30900
5,watch_history,210000


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

My explanation: Autodetect is a quick and easy way to load data into BigQuery when you're just exploring or testing something out. It works well when your data is clean and consistent. It’s great for small projects or when you just need to see what’s in your data without spending time on the details. However, when you’re working on a bigger project,you should define an explicit schema. This means telling BigQuery exactly what type each column should be. This is important because if the data is messy or inconsistent, autodetect might guess wrong, and it could cause problems later.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [None]:
query1 = f"""
SELECT
  COUNT(*) AS total_rows,
  ROUND(SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_country_missing,
  ROUND(SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_subscription_plan_missing,
  ROUND(SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_age_missing
FROM `mgmt467project.netflix.users`
"""
df1 = client.query(query1).to_dataframe()
display(df1)


Unnamed: 0,total_rows,pct_country_missing,pct_subscription_plan_missing,pct_age_missing
0,20600,0.0,0.0,11.93


In [None]:
query2 = f"""
SELECT
  country,
  COUNT(*) AS total_rows,
  SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) AS missing_plan_tier_count,
  ROUND(SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_subscription_plan_missing
FROM `mgmt467project.netflix.users`
GROUP BY country
ORDER BY pct_subscription_plan_missing DESC
"""
df2 = client.query(query2).to_dataframe()
display(df2)


Unnamed: 0,country,total_rows,missing_plan_tier_count,pct_subscription_plan_missing
0,Canada,6192,0,0.0
1,USA,14408,0,0.0


In [None]:
query_verify = f"""
-- Step 5.1 Verification: Show % missing for key user attributes
SELECT
  ROUND(SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_country_missing,
  ROUND(SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_subscription_plan_missing,
  ROUND(SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_age_missing
FROM `mgmt467project.netflix.users`
"""
df_verify = client.query(query_verify).to_dataframe()
display(df_verify)


Unnamed: 0,pct_country_missing,pct_subscription_plan_missing,pct_age_missing
0,0.0,0.0,11.93


Reflection: There is no evidence of missingness that depends on another variable — both country and subscription_plan have 0% missing values. The only missing field is age, which appears to be missing completely at random (MCAR) rather than missing at random (MAR).

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

The column with the highest missingness is age, while country and subscription_plan have no missing values. Missing age data is likely MCAR (Missing Completely at Random) or MAR (Missing at Random). It could be MCAR if users simply skipped entering their age with no pattern, meaning the missingness isn’t related to any other variable.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [None]:
query1 = f"""
-- Step 5.2.1: Report duplicate groups and their counts
SELECT
  user_id,
  movie_id,
  watch_date,
  device_type,
  COUNT(*) AS duplicate_count
FROM `mgmt467project.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING COUNT(*) > 1
ORDER BY duplicate_count DESC
LIMIT 20
"""
df_dup_groups = client.query(query1).to_dataframe()
display(df_dup_groups)


Unnamed: 0,user_id,movie_id,watch_date,device_type,duplicate_count
0,user_03310,movie_0640,2024-09-08,Smart TV,8
1,user_00391,movie_0893,2024-08-26,Laptop,8
2,user_01807,movie_0921,2025-01-30,Laptop,6
3,user_04050,movie_0898,2025-07-05,Mobile,6
4,user_07529,movie_0686,2025-07-07,Laptop,6
5,user_02549,movie_0428,2025-04-15,Mobile,6
6,user_09454,movie_0116,2025-10-19,Laptop,6
7,user_09045,movie_0427,2025-09-26,Mobile,6
8,user_07738,movie_0793,2025-07-28,Desktop,6
9,user_02822,movie_0009,2025-08-30,Desktop,6


In [None]:
query2 = f"""
-- Step 5.2.2: Create deduplicated watch_history table
CREATE OR REPLACE TABLE `mgmt467project.netflix.watch_history_dedup` AS
SELECT
  *
FROM (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY user_id, movie_id, watch_date, device_type
      ORDER BY progress_percentage DESC, watch_duration_minutes DESC
    ) AS row_rank
  FROM `mgmt467project.netflix.watch_history`
)
WHERE row_rank = 1
"""
client.query(query2)


QueryJob<project=mgmt467project, location=US, id=f0c45dd0-e9f5-4452-b3d9-5e91b69b25de>

### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [None]:
query_verify = f"""
-- Step 5.2 Verification: Compare before vs after deduplication
SELECT
  'Before Deduplication' AS stage,
  COUNT(*) AS total_rows
FROM `mgmt467project.netflix.watch_history`

UNION ALL

SELECT
  'After Deduplication' AS stage,
  COUNT(*) AS total_rows
FROM `mgmt467project.netflix.watch_history_dedup`
"""
df_verify = client.query(query_verify).to_dataframe()
display(df_verify)


Unnamed: 0,stage,total_rows
0,Before Deduplication,210000
1,After Deduplication,100000


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

My answer:Natural duplicates happen when the same user performs the same action multiple times, like rewatching a movie or clicking “play” twice.System-generated duplicates occur when technical issues, such as lag, retries, or logging bugs, record the same event more than once. These duplicates can seriously distort analytics because they inflate engagement metrics and corrupt KPIs — for example, making it look like users watched twice as many movies or spent more time on the platform than they really did.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [None]:
query1 = f"""
-- Step 5.3.1: Compute IQR bounds and % outliers for minutes_watched
WITH stats AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS Q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS Q3
  FROM `mgmt467project.netflix.watch_history_dedup`
),
bounds AS (
  SELECT
    Q1,
    Q3,
    (Q3 - Q1) AS IQR,
    Q1 - 1.5 * (Q3 - Q1) AS lower_bound,
    Q3 + 1.5 * (Q3 - Q1) AS upper_bound
  FROM stats
),
outlier_counts AS (
  SELECT
    COUNT(*) AS total_rows,
    SUM(CASE WHEN watch_duration_minutes < (SELECT lower_bound FROM bounds)
             OR watch_duration_minutes > (SELECT upper_bound FROM bounds)
             THEN 1 ELSE 0 END) AS outlier_count
  FROM `mgmt467project.netflix.watch_history_dedup`
)
SELECT
  (SELECT Q1 FROM bounds) AS Q1,
  (SELECT Q3 FROM bounds) AS Q3,
  (SELECT IQR FROM bounds) AS IQR,
  (SELECT lower_bound FROM bounds) AS lower_bound,
  (SELECT upper_bound FROM bounds) AS upper_bound,
  total_rows,
  outlier_count,
  ROUND(outlier_count / total_rows * 100, 2) AS pct_outliers
FROM outlier_counts
"""
df_iqr = client.query(query1).to_dataframe()
display(df_iqr)



Unnamed: 0,Q1,Q3,IQR,lower_bound,upper_bound,total_rows,outlier_count,pct_outliers
0,28.8,82.7,53.9,-52.05,163.55,100000,3433,3.43


In [None]:
query2 = f"""
-- Step 5.3.2 : Create robust table with winsorized watch_duration_minutes (P01 → P99)
CREATE OR REPLACE TABLE `mgmt467project.netflix.watch_history_robust` AS
WITH pct_bounds AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)] AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
  FROM `mgmt467project.netflix.watch_history_dedup`
)
SELECT
  w.*,
  CASE
    WHEN w.watch_duration_minutes < p.p01 THEN p.p01
    WHEN w.watch_duration_minutes > p.p99 THEN p.p99
    ELSE w.watch_duration_minutes
  END AS minutes_watched_capped
FROM `mgmt467project.netflix.watch_history_dedup` w
CROSS JOIN pct_bounds p
"""
client.query(query2)

QueryJob<project=mgmt467project, location=US, id=ae9ff718-5d77-40cd-aa9a-4ce0176b1fe0>

### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [None]:
query_verify = f"""
-- Step 5.3 Verification: Compare min, median, and max before vs after winsorization
SELECT
  'Before Winsorization' AS stage,
  MIN(watch_duration_minutes) AS min_value,
  APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_value,
  MAX(watch_duration_minutes) AS max_value
FROM `mgmt467project.netflix.watch_history_dedup`

UNION ALL

SELECT
  'After Winsorization' AS stage,
  MIN(minutes_watched_capped) AS min_value,
  APPROX_QUANTILES(minutes_watched_capped, 2)[OFFSET(1)] AS median_value,
  MAX(minutes_watched_capped) AS max_value
FROM `mgmt467project.netflix.watch_history_robust`
"""
df_verify = client.query(query_verify).to_dataframe()
display(df_verify)


Unnamed: 0,stage,min_value,median_value,max_value
0,Before Winsorization,0.2,50.7,799.3
1,After Winsorization,4.4,51.4,368.5


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

My Answer: Capping can be harmful when extreme values carry meaningful information, like rare fraud or peak demand events. Tree-based models (e.g., random forests, XGBoost) are less sensitive to outliers because they split data by thresholds rather than relying on averages or distances.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [None]:
query1 = f"""
-- Step 5.4.1: Flag binge sessions (> 8 hours)
SELECT
  COUNT(*) AS total_sessions,
  SUM(CASE WHEN watch_duration_minutes > 480 THEN 1 ELSE 0 END) AS binge_count,
  ROUND(SUM(CASE WHEN watch_duration_minutes > 480 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_binge
FROM `mgmt467project.netflix.watch_history_robust`
"""
df_binge = client.query(query1).to_dataframe()
display(df_binge)


#Flags unusually long sessions (> 8 hours = 480 min) — a possible indicator of binge-watching or logging errors.

Unnamed: 0,total_sessions,binge_count,pct_binge
0,100000,639,0.64


In [None]:
query2 = f"""
-- Step 5.4.2: Flag unrealistic ages (< 10 or > 100)
SELECT
  COUNT(*) AS total_users,
  SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) AS extreme_age_count,
  ROUND(SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_extreme_age
FROM `mgmt467project.netflix.users`
"""
df_age = client.query(query2).to_dataframe()
display(df_age)

#Detects impossible or suspiciously high/low ages that may come from data-entry errors or placeholder values.

Unnamed: 0,total_users,extreme_age_count,pct_extreme_age
0,20600,358,1.74


In [None]:
query3 = f"""
-- Step 5.4.3: Flag anomalous movie durations (< 15 min or > 480 min)
SELECT
  COUNT(*) AS total_movies,
  SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) AS duration_anomaly_count,
  ROUND(SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_duration_anomaly
FROM `mgmt467project.netflix.movies`
"""
df_dur = client.query(query3).to_dataframe()
display(df_dur)


Unnamed: 0,total_movies,duration_anomaly_count,pct_duration_anomaly
0,2080,46,2.21


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [None]:
query_summary = f"""
-- Step 5.4 Summary: Show % of rows flagged for each anomaly
SELECT 'flag_binge' AS flag_name,
  ROUND(SUM(CASE WHEN watch_duration_minutes > 480 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_of_rows
FROM `mgmt467project.netflix.watch_history_robust`

UNION ALL

SELECT 'flag_age_extreme' AS flag_name,
  ROUND(SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_of_rows
FROM `mgmt467project.netflix.users`

UNION ALL

SELECT 'flag_duration_anomaly' AS flag_name,
  ROUND(SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_of_rows
FROM `mgmt467project.netflix.movies`
"""
df_summary = client.query(query_summary).to_dataframe()
display(df_summary)


Unnamed: 0,flag_name,pct_of_rows
0,flag_binge,0.64
1,flag_age_extreme,1.74
2,flag_duration_anomaly,2.21


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

The most common anomaly flag is typically the binge flag, since long viewing sessions are more frequent than unrealistic ages or movie duration errors. I would keep flag_binge as a feature because it captures real user behavior rather than a data error. Binge-watching patterns can reveal engagement intensity, predict churn, and improve recommendation models

In [1]:
%%writefile dq_checks.sql
-- =====================================================
-- MGMT 467 Data Quality (DQ) Queries - Netflix Dataset
-- Author: Anurag Koripalli
-- =====================================================

-- Step 1: Verify row counts for all Netflix tables
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `mgmt467project.netflix.users`
UNION ALL
SELECT 'movies', COUNT(*) FROM `mgmt467project.netflix.movies`
UNION ALL
SELECT 'watch_history', COUNT(*) FROM `mgmt467project.netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs', COUNT(*) FROM `mgmt467project.netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs', COUNT(*) FROM `mgmt467project.netflix.search_logs`
UNION ALL
SELECT 'reviews', COUNT(*) FROM `mgmt467project.netflix.reviews`;

-- Step 2: Missingness percentages for key user fields
SELECT
  COUNT(*) AS total_rows,
  ROUND(SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_country_missing,
  ROUND(SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_subscription_plan_missing,
  ROUND(SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_age_missing
FROM `mgmt467project.netflix.users`;

-- Step 3: Missingness by country (for subscription_plan)
SELECT
  country,
  COUNT(*) AS total_rows,
  SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) AS missing_plan_tier_count,
  ROUND(SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_subscription_plan_missing
FROM `mgmt467project.netflix.users`
GROUP BY country
ORDER BY pct_subscription_plan_missing DESC;

-- Step 4: Duplicate groups in watch_history
SELECT
  user_id,
  movie_id,
  watch_date,
  device_type,
  COUNT(*) AS duplicate_count
FROM `mgmt467project.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING COUNT(*) > 1
ORDER BY duplicate_count DESC
LIMIT 20;

-- Step 5: Create deduplicated watch_history table
CREATE OR REPLACE TABLE `mgmt467project.netflix.watch_history_dedup` AS
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY user_id, movie_id, watch_date, device_type
      ORDER BY progress_percentage DESC, watch_duration_minutes DESC
    ) AS row_rank
  FROM `mgmt467project.netflix.watch_history`
)
WHERE row_rank = 1;

-- Step 6: Verify before vs after deduplication
SELECT 'Before Deduplication' AS stage, COUNT(*) AS total_rows
FROM `mgmt467project.netflix.watch_history`
UNION ALL
SELECT 'After Deduplication' AS stage, COUNT(*) AS total_rows
FROM `mgmt467project.netflix.watch_history_dedup`;

-- Step 7: Compute IQR bounds and % outliers for watch_duration_minutes
WITH stats AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS Q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS Q3
  FROM `mgmt467project.netflix.watch_history_dedup`
),
bounds AS (
  SELECT
    Q1,
    Q3,
    (Q3 - Q1) AS IQR,
    Q1 - 1.5 * (Q3 - Q1) AS lower_bound,
    Q3 + 1.5 * (Q3 - Q1) AS upper_bound
  FROM stats
),
outlier_counts AS (
  SELECT
    COUNT(*) AS total_rows,
    SUM(CASE WHEN watch_duration_minutes < (SELECT lower_bound FROM bounds)
             OR watch_duration_minutes > (SELECT upper_bound FROM bounds)
        THEN 1 ELSE 0 END) AS outlier_count
  FROM `mgmt467project.netflix.watch_history_dedup`
)
SELECT
  (SELECT Q1 FROM bounds) AS Q1,
  (SELECT Q3 FROM bounds) AS Q3,
  (SELECT IQR FROM bounds) AS IQR,
  (SELECT lower_bound FROM bounds) AS lower_bound,
  (SELECT upper_bound FROM bounds) AS upper_bound,
  total_rows,
  outlier_count,
  ROUND(outlier_count / total_rows * 100, 2) AS pct_outliers
FROM outlier_counts;

-- Step 8: Create winsorized (robust) table
CREATE OR REPLACE TABLE `mgmt467project.netflix.watch_history_robust` AS
WITH pct_bounds AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)] AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
  FROM `mgmt467project.netflix.watch_history_dedup`
)
SELECT
  w.*,
  CASE
    WHEN w.watch_duration_minutes < p.p01 THEN p.p01
    WHEN w.watch_duration_minutes > p.p99 THEN p.p99
    ELSE w.watch_duration_minutes
  END AS minutes_watched_capped
FROM `mgmt467project.netflix.watch_history_dedup` w
CROSS JOIN pct_bounds p;

-- Step 9: Compare min/median/max before vs after winsorization
SELECT
  'Before Winsorization' AS stage,
  MIN(watch_duration_minutes) AS min_value,
  APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_value,
  MAX(watch_duration_minutes) AS max_value
FROM `mgmt467project.netflix.watch_history_dedup`
UNION ALL
SELECT
  'After Winsorization' AS stage,
  MIN(minutes_watched_capped) AS min_value,
  APPROX_QUANTILES(minutes_watched_capped, 2)[OFFSET(1)] AS median_value,
  MAX(minutes_watched_capped) AS max_value
FROM `mgmt467project.netflix.watch_history_robust`;

-- Step 10: Flag binge sessions (> 8 hours)
SELECT
  COUNT(*) AS total_sessions,
  SUM(CASE WHEN watch_duration_minutes > 480 THEN 1 ELSE 0 END) AS binge_count,
  ROUND(SUM(CASE WHEN watch_duration_minutes > 480 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_binge
FROM `mgmt467project.netflix.watch_history_robust`;

-- Step 11: Flag unrealistic ages (< 10 or > 100)
SELECT
  COUNT(*) AS total_users,
  SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) AS extreme_age_count,
  ROUND(SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_extreme_age
FROM `mgmt467project.netflix.users`;

-- Step 12: Flag anomalous movie durations (< 15 min or > 480 min)
SELECT
  COUNT(*) AS total_movies,
  SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) AS duration_anomaly_count,
  ROUND(SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_duration_anomaly
FROM `mgmt467project.netflix.movies`;

-- Step 13: Summary of all anomaly flags
SELECT 'flag_binge' AS flag_name,
       ROUND(SUM(CASE WHEN watch_duration_minutes > 480 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_of_rows
FROM `mgmt467project.netflix.watch_history_robust`
UNION ALL
SELECT 'flag_age_extreme',
       ROUND(SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2)
FROM `mgmt467project.netflix.users`
UNION ALL
SELECT 'flag_duration_anomaly',
       ROUND(SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) / COUNT(*) * 100, 2)
FROM `mgmt467project.netflix.movies`;


Writing dq_checks.sql


In [2]:
from google.colab import files
files.download("dq_checks.sql")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)


### Save & submit checklist:

- [ ] Save this notebook to the team Drive.
- [ ] Export a `.sql` file with your DQ queries and save to repo.
- [ ] Push notebook + SQL to the **team GitHub** with a descriptive commit.
- [ ] Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.