# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

In [7]:
from google.colab import files
import os

# Prompt the user to upload their kaggle.json file.
# This file contains your Kaggle API credentials.
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()

# Create the .kaggle directory if it doesn't exist.
# This is where Kaggle expects to find the credentials file.
os.makedirs('/root/.kaggle', exist_ok=True)

# Save the uploaded file to the correct location.
# Using the first uploaded file as we expect only one (kaggle.json).
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])

# Set file permissions to 0600 (owner read/write only).
# This is crucial for security to prevent other users from accessing your API key.
os.chmod('/root/.kaggle/kaggle.json', 0o600)

# Verify the Kaggle installation by printing the version.
# This confirms the CLI is installed and can access the credentials.
!kaggle --version

# Done: Kaggle setup

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [None]:
#EXAMPLE (from LLM) — Auth + Project/Region (commented; write your own cell using the prompt)
from google.colab import auth
auth.authenticate_user()

import os
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "US"  # keep consistent; change if instructed
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
print("Project:", PROJECT_ID, "| Region:", REGION)

# Set active project for gcloud/BigQuery CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT
!gcloud config get-value project
# Done: Auth + Project/Region set

Enter your GCP Project ID: mgmt-471819-i5
Project: mgmt-471819-i5 | Region: US
Updated property [core/project].
mgmt-471819-i5


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

Consitency- It guarantees that resources are created and aceesed in the intended locations
Cost Managment- Different regions have different prices
Latency: Choosing a geographically close region helps reduce latency

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [None]:
# Query BigQuery's INFORMATION_SCHEMA to check the dataset region
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
    schema_name,
    location
FROM
    `{project_id}`.INFORMATION_SCHEMA.SCHEMATA
WHERE
    schema_name = 'netflix';
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('netflix', 'US'), {'schema_name': 0, 'location': 1})


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [None]:
# Verification step: Check active project and region
!gcloud config get-value project
import os
print("Project:", PROJECT_ID, "| Region:", REGION)

mgmt-471819-i5
Project: mgmt-471819-i5 | Region: US


\**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

Strict `0600` permissions on API tokens are essential for security, granting read/write access only to the file owner. This prevents unauthorized access and usage by other users or processes, mitigating risks of credential theft and a compromised security posture. Enforcing these permissions protects your accounts and data by ensuring the token remains a confidential secret.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [None]:
# Create the directory to store raw data
!mkdir -p /content/data/raw

# Download the dataset using Kaggle CLI to /content/data
# The -d flag specifies the dataset, and -p specifies the download path
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

# Unzip the downloaded dataset into the raw data directory
# -o flag overwrites files if they exist
!unzip -o /content/data/*.zip -d /content/data/raw

# List all CSV files in the raw data directory with their sizes in a neat table
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 467MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root 

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [None]:
# Verification step: Assert the number of CSV files and print their names
import glob
import os

csv_files = glob.glob('/content/data/raw/*.csv')
num_csv_files = len(csv_files)

# Assert that there are exactly six CSV files
assert num_csv_files == 6, f"Expected 6 CSV files, but found {num_csv_files}"

print(f"Found {num_csv_files} CSV files:")
for csv_file in csv_files:
    print(os.path.basename(csv_file))

Found 6 CSV files:
movies.csv
reviews.csv
users.csv
recommendation_logs.csv
search_logs.csv
watch_history.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

Keeping a clean file inventory with names and sizes is crucial for downstream processes. It enables auditing and reproducibility by clearly documenting input files, aids troubleshooting by providing a reference for expected data assets, assists in data validation by allowing checks against expected file properties, simplifies automation by providing consistent file information, and serves as concise documentation of the raw data.

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [None]:
# Create a unique bucket in the specified region
import uuid
import os

bucket_name = f"mgmt-471819-i5-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Use the REGION variable directly in the gcloud command
# Changed location to 'US' to match BigQuery dataset location
print(f"Creating bucket: gs://{bucket_name} in region: US")
!gcloud storage buckets create gs://$BUCKET_NAME --location=US

# Upload all CSVs to the bucket
print(f"\nUploading files to gs://{bucket_name}/netflix/")
!gcloud storage cp /content/data/raw/* gs://$BUCKET_NAME/netflix/

# Print the bucket name and explain staging benefits
print(f"\nSuccessfully created bucket: {bucket_name} and uploaded files.")
print("\nBenefits of staging data in GCS:")
print("- **Consistent Source:** GCS provides a stable and versionable location for your data.")
print("- **Scalability:** GCS is highly scalable, handling large datasets easily.")
print("- **Integration:** GCS integrates seamlessly with other Google Cloud services like BigQuery.")
print("- **Cost-Effective:** GCS can be a cost-effective storage solution.")

# Verify contents (optional but recommended)
print(f"\nVerifying contents of gs://{bucket_name}/netflix/:")
!gcloud storage ls gs://$BUCKET_NAME/netflix/

Creating bucket: gs://mgmt-471819-i5-netflix-956ce240 in region: US
Creating gs://mgmt-471819-i5-netflix-956ce240/...

Uploading files to gs://mgmt-471819-i5-netflix-956ce240/netflix/
Copying file:///content/data/raw/movies.csv to gs://mgmt-471819-i5-netflix-956ce240/netflix/movies.csv
Copying file:///content/data/raw/README.md to gs://mgmt-471819-i5-netflix-956ce240/netflix/README.md
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt-471819-i5-netflix-956ce240/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt-471819-i5-netflix-956ce240/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt-471819-i5-netflix-956ce240/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt-471819-i5-netflix-956ce240/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt-471819-i5-netflix-956ce240/netflix/watch_history.csv

Average throughput: 39.1MiB/s

Successfully cr

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [None]:
# Verification step: List objects in the netflix/ prefix with sizes
import os

bucket_name = os.environ["BUCKET_NAME"]
print(f"Listing contents of gs://{bucket_name}/netflix/ with sizes:")
!gcloud storage ls -l gs://$BUCKET_NAME/netflix/

Listing contents of gs://mgmt-471819-i5-netflix-956ce240/netflix/ with sizes:
      8002  2025-10-20T17:59:13Z  gs://mgmt-471819-i5-netflix-956ce240/netflix/README.md
    115942  2025-10-20T17:59:13Z  gs://mgmt-471819-i5-netflix-956ce240/netflix/movies.csv
   4695557  2025-10-20T17:59:13Z  gs://mgmt-471819-i5-netflix-956ce240/netflix/recommendation_logs.csv
   1861942  2025-10-20T17:59:13Z  gs://mgmt-471819-i5-netflix-956ce240/netflix/reviews.csv
   2250902  2025-10-20T17:59:13Z  gs://mgmt-471819-i5-netflix-956ce240/netflix/search_logs.csv
   1606820  2025-10-20T17:59:13Z  gs://mgmt-471819-i5-netflix-956ce240/netflix/users.csv
   9269425  2025-10-20T17:59:13Z  gs://mgmt-471819-i5-netflix-956ce240/netflix/watch_history.csv
TOTAL: 7 objects, 19808590 bytes (18.89MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

Two key benefits of staging data in GCS versus loading directly from local Colab are scalability and reproducibility. GCS offers highly scalable storage that can handle large datasets efficiently, which is essential for big data processing. Staging in GCS also creates a stable, versionable, and accessible source for data loads, making pipelines more reproducible and easier to manage compared to relying on temporary local Colab storage.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

In [None]:
# Create the BigQuery dataset (idempotent)
DATASET = "netflix"
# Attempt to create; ignore if exists
print(f"Attempting to create dataset: {DATASET} in location: US")
!bq --location=US mk -d --description "MGMT467 Netflix dataset" {DATASET} || echo "Dataset may already exist or another error occurred."

Attempting to create dataset: netflix in location: US
BigQuery error in mk operation: Dataset 'mgmt-471819-i5:netflix' already exists.
Dataset may already exist or another error occurred.


### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [None]:
# EXAMPLE (from LLM) — BigQuery dataset (commented)
DATASET="netflix"
# Attempt to create; ignore if exists
!bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

BigQuery error in mk operation: Dataset 'mgmt-471819-i5:netflix' already exists.
Dataset may already exist.


In [None]:
# Load tables from GCS using BigQuery Python Client Library
from google.cloud import bigquery
import os

client = bigquery.Client()

tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}

# Hardcoded dataset ID
DATASET_ID = "mgmt-471819-i5.netflix"
# Hardcoded bucket name from previous successful run - REPLACE WITH YOUR ACTUAL BUCKET NAME if needed
BUCKET_NAME = "mgmt-471819-i5-netflix-e31f9c91"


for tbl, fname in tables.items():
  uri = f"gs://{BUCKET_NAME}/netflix/{fname}"
  table_id = f"{DATASET_ID}.{tbl}"

  job_config = bigquery.LoadJobConfig(
      autodetect=True,
      skip_leading_rows=1,
      source_format=bigquery.SourceFormat.CSV,
  )

  print(f"Loading {tbl} from {uri} into {table_id}")
  # Use the client library to load the data
  load_job = client.load_table_from_uri(
      uri, table_id, job_config=job_config
  )  # Make an API request.

  load_job.result()  # Waits for the job to complete.

  print(f"Load job for {tbl} completed.")

# Row counts (Verification Prompt)
print("\nVerifying row counts:")
for tbl in tables.keys():
    table_id_full = f"{DATASET_ID}.{tbl}"
    query = f"SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `{table_id_full}`"
    query_job = client.query(query)
    results = query_job.result()
    for row in results:
        print(row)

Loading users from gs://mgmt-471819-i5-netflix-e31f9c91/netflix/users.csv into mgmt-471819-i5.netflix.users
Load job for users completed.
Loading movies from gs://mgmt-471819-i5-netflix-e31f9c91/netflix/movies.csv into mgmt-471819-i5.netflix.movies
Load job for movies completed.
Loading watch_history from gs://mgmt-471819-i5-netflix-e31f9c91/netflix/watch_history.csv into mgmt-471819-i5.netflix.watch_history
Load job for watch_history completed.
Loading recommendation_logs from gs://mgmt-471819-i5-netflix-e31f9c91/netflix/recommendation_logs.csv into mgmt-471819-i5.netflix.recommendation_logs
Load job for recommendation_logs completed.
Loading search_logs from gs://mgmt-471819-i5-netflix-e31f9c91/netflix/search_logs.csv into mgmt-471819-i5.netflix.search_logs
Load job for search_logs completed.
Loading reviews from gs://mgmt-471819-i5-netflix-e31f9c91/netflix/reviews.csv into mgmt-471819-i5.netflix.reviews
Load job for reviews completed.

Verifying row counts:
Row(('users', 20600), {'t

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [None]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.users`
UNION ALL
SELECT 'movies' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.movies`
UNION ALL
SELECT 'watch_history' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.search_logs`
UNION ALL
SELECT 'reviews' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.reviews`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('movies', 2080), {'table_name': 0, 'row_count': 1})
Row(('watch_history', 210000), {'table_name': 0, 'row_count': 1})
Row(('search_logs', 53000), {'table_name': 0, 'row_count': 1})
Row(('recommendation_logs', 104000), {'table_name': 0, 'row_count': 1})
Row(('reviews', 30900), {'table_name': 0, 'row_count': 1})
Row(('users', 20600), {'table_name': 0, 'row_count': 1})


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

`autodetect` is acceptable for initial data exploration or when the schema is simple and consistent. However, for production pipelines or when schema changes are anticipated or complex, enforcing explicit schemas is crucial. Explicit schemas provide better control over data types, prevent unexpected errors during loading, ensure data quality, and make pipelines more robust and maintainable.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

In [None]:
# Measure % missing subscription_plan by country (Checking for MAR)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT country,
       COUNT(*) AS n,
       ROUND(100*COUNTIF(subscription_plan IS NULL)/COUNT(*),2) AS pct_missing_subscription_plan
FROM `{project_id}.netflix.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('Canada', 6192, 0.0), {'country': 0, 'n': 1, 'pct_missing_subscription_plan': 2})
Row(('USA', 14408, 0.0), {'country': 0, 'n': 1, 'pct_missing_subscription_plan': 2})


### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [None]:
# EXAMPLE (from LLM) — Missingness profile (commented)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
-- Users: % missing per column
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `{project_id}.netflix.users`
)
SELECT n,
       ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((20600, 0.0, 0.0, 11.93), {'n': 0, 'pct_missing_country': 1, 'pct_missing_subscription_plan': 2, 'pct_missing_age': 3})


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [None]:
import os

project_id = os.environ['GOOGLE_CLOUD_PROJECT']

from google.cloud import bigquery

client = bigquery.Client(project=project_id)

query = f"""
-- Verification: Print the three missingness percentages
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `{project_id}.netflix.users`
)
SELECT ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((0.0, 0.0, 11.93), {'pct_missing_country': 0, 'pct_missing_subscription_plan': 1, 'pct_missing_age': 2})


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

Based on the output of the previous cell, the `age` column is the most missing with 11.93% of values being null, while `country` and `subscription_plan` have no missing values in this dataset. It's difficult to definitively determine the missing data mechanism (MCAR/MAR/MNAR) without more information about how the data was collected. However, one hypothesis could be that `age` is Missing At Random (MAR) if the likelihood of `age` being missing depends on another variable in the dataset, such as the user's country or subscription plan, but not on the age itself. For example, certain countries or subscription plans might have different data collection methods or requirements that lead to age information being less frequently captured.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

In [None]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
-- Report duplicate groups on (user_id, movie_id, watch_date, device_type) with counts (top 20)
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{project_id}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('user_03310', 'movie_0640', datetime.date(2024, 9, 8), 'Smart TV', 8), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_00391', 'movie_0893', datetime.date(2024, 8, 26), 'Laptop', 8), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_00472', 'movie_0719', datetime.date(2024, 12, 4), 'Laptop', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_00965', 'movie_0991', datetime.date(2024, 2, 14), 'Desktop', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_07981', 'movie_0094', datetime.date(2025, 11, 8), 'Laptop', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_06746', 'movie_0858', datetime.date(2024, 1, 23), 'Smart TV', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_01303', 'movie_0858', datetime.date(2025, 9

In [None]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
-- Create table watch_history_dedup keeping one row per group
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `{project_id}.netflix.watch_history` h
)
WHERE rk = 1;
"""

query_job = client.query(query)
query_job.result()

print(f"Table `{project_id}.netflix.watch_history_dedup` created successfully.")

Table `mgmt-471819-i5.netflix.watch_history_dedup` created successfully.


In [None]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
-- Verification: Before/after count query comparing raw vs watch_history_dedup
SELECT 'watch_history_raw' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT 'watch_history_dedup' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history_dedup`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('watch_history_raw', 210000), {'table_name': 0, 'row_count': 1})
Row(('watch_history_dedup', 100000), {'table_name': 0, 'row_count': 1})


### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [None]:

project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{project_id}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('user_00391', 'movie_0893', datetime.date(2024, 8, 26), 'Laptop', 8), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_03310', 'movie_0640', datetime.date(2024, 9, 8), 'Smart TV', 8), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_00928', 'movie_0913', datetime.date(2024, 1, 18), 'Laptop', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_05874', 'movie_0294', datetime.date(2025, 11, 26), 'Mobile', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_01870', 'movie_0844', datetime.date(2024, 6, 2), 'Laptop', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_06087', 'movie_0638', datetime.date(2025, 11, 14), 'Mobile', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_01580', 'movie_0984', datetime.date(2025, 6, 

In [None]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
-- Create table watch_history_dedup keeping one row per group
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `{project_id}.netflix.watch_history` h
)
WHERE rk = 1;
"""

query_job = client.query(query)
query_job.result()

print(f"Table `{project_id}.netflix.watch_history_dedup` created successfully.")

Table `mgmt-471819-i5.netflix.watch_history_dedup` created successfully.


### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [None]:
import os

project_id = os.environ['GOOGLE_CLOUD_PROJECT']

from google.cloud import bigquery

client = bigquery.Client(project=project_id)

query = f"""
-- Verification: Before/after count query comparing raw vs watch_history_dedup
SELECT 'watch_history_raw' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT 'watch_history_dedup' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history_dedup`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('watch_history_dedup', 100000), {'table_name': 0, 'row_count': 1})
Row(('watch_history_raw', 210000), {'table_name': 0, 'row_count': 1})


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

Duplicates can arise from both natural processes (e.g., a user accidentally submitting the same form twice) or system-generated issues (e.g., errors in data collection, ETL processes, or data merging). Regardless of the source, duplicates corrupt labels and KPIs by artificially inflating counts and distorting aggregations. For instance, duplicate watch history records would overcount viewing time, leading to inaccurate engagement metrics. In machine learning, duplicates can bias models by giving undue weight to the duplicated observations, potentially leading to poor generalization on clean data.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [None]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)
query = f"""
-- Compute IQR bounds for watch_duration_minutes on watch_history_dedup and report % outliers
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `{project_id}.netflix.watch_history_dedup`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM `{project_id}.netflix.watch_history_dedup` h
CROSS JOIN bounds b;
"""
query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((3482, 100000, 3.48), {'outliers': 0, 'total': 1, 'pct_outliers': 2})


In [None]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query_create_table = f"""
-- Create watch_history_robust with watch_duration_minutes_capped capped at P01/P99
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
  FROM `{project_id}.netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM `{project_id}.netflix.watch_history_dedup` h, q;
"""

query_job_create_table = client.query(query_create_table)
query_job_create_table.result()

print(f"Table `{project_id}.netflix.watch_history_robust` created successfully.")

# Quantiles before vs after
query_quantiles = f"""
-- Quantiles before vs after capping
WITH before AS (
  SELECT 'before' AS which, APPROX_QUANTILES(watch_duration_minutes, 5) AS q
  FROM `{project_id}.netflix.watch_history_dedup`
),
after AS (
  SELECT 'after' AS which, APPROX_QUANTILES(watch_duration_minutes_capped, 5) AS q
  FROM `{project_id}.netflix.watch_history_robust`
)
SELECT * FROM before UNION ALL SELECT * FROM after;
"""

query_job_quantiles = client.query(query_quantiles)
# Store the result iterator in a variable
results_iterator = query_job_quantiles.result()

# Print the results
print("\nQuantiles before vs after capping:")
for row in results_iterator:
    print(row)

Table `mgmt-471819-i5.netflix.watch_history_robust` created successfully.

Quantiles before vs after capping:
Row(('after', [4.4, 24.6, 41.5, 61.5, 92.0, 366.0]), {'which': 0, 'q': 1})
Row(('before', [0.2, 24.8, 41.7, 61.2, 91.8, 799.3]), {'which': 0, 'q': 1})


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [None]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
-- Verification: Min/median/max before vs after capping
WITH before AS (
  SELECT 'before' AS which,
         MIN(watch_duration_minutes) AS min_val,
         APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_val,
         MAX(watch_duration_minutes) AS max_val
  FROM `{project_id}.netflix.watch_history_dedup`
),
after AS (
  SELECT 'after' AS which,
         MIN(watch_duration_minutes_capped) AS min_val,
         APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_val,
         MAX(watch_duration_minutes_capped) AS max_val
  FROM `{project_id}.netflix.watch_history_robust`
)
SELECT * FROM before UNION ALL SELECT * FROM after;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('after', 4.4, 51.4, 366.0), {'which': 0, 'min_val': 1, 'median_val': 2, 'max_val': 3})
Row(('before', 0.2, 51.1, 799.3), {'which': 0, 'min_val': 1, 'median_val': 2, 'max_val': 3})


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

Capping, while useful for handling extreme values, might be harmful when the outliers represent genuine, important data points that hold valuable information, or when the underlying distribution is naturally heavy-tailed. Removing or altering these values can lead to a loss of information and potentially biased analysis or model training. Tree-based models, such as decision trees and random forests, are generally less sensitive to outliers because they make decisions based on splitting data at certain thresholds rather than relying on the magnitude of individual data points as linear models do.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [None]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
-- In watch_history_robust, compute and summarize flag_binge for sessions > 8 hours
SELECT
  COUNTIF(watch_duration_minutes_capped > 8*60) AS sessions_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(watch_duration_minutes_capped > 8*60)/COUNT(*),2) AS pct
FROM `{project_id}.netflix.watch_history_robust`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((0, 100000, 0.0), {'sessions_over_8h': 0, 'total': 1, 'pct': 2})


In [None]:

query = f"""
-- In users, compute and summarize flag_age_extreme if age is <10 or >100
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_rows,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct
FROM `{project_id}.netflix.users`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((358, 20600, 1.74), {'extreme_age_rows': 0, 'total': 1, 'pct': 2})


In [None]:
# # EXAMPLE (from LLM) — flag_duration_anomaly (commented)
# # SELECT
# #   COUNTIF(duration_min < 15) AS titles_under_15m,
# #   COUNTIF(duration_min > 8*60) AS titles_over_8h,
# #   COUNT(*) AS total
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.movies`;

In [None]:

query = f"""
-- In movies, compute and summarize flag_duration_anomaly where duration_minutes < 15 or > 480
SELECT
  COUNTIF(duration_minutes < 15) AS titles_under_15m,
  COUNTIF(duration_minutes > 480) AS titles_over_480m,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_duration_anomaly
FROM `{project_id}.netflix.movies`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((24, 22, 2080, 2.21), {'titles_under_15m': 0, 'titles_over_480m': 1, 'total': 2, 'pct_duration_anomaly': 3})


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [None]:
query = f"""
-- Verification: Compact summary query for all flags
WITH
  binge_summary AS (
    SELECT 'flag_binge' AS flag_name, ROUND(100*COUNTIF(watch_duration_minutes_capped > 8*60)/COUNT(*),2) AS pct_of_rows
    FROM `{project_id}.netflix.watch_history_robust`
  ),
  age_summary AS (
    SELECT 'flag_age_extreme' AS flag_name, ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct_of_rows
    FROM `{project_id}.netflix.users`
  ),
  duration_summary AS (
    SELECT 'flag_duration_anomaly' AS flag_name, ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_of_rows
    FROM `{project_id}.netflix.movies`
  )
SELECT * FROM binge_summary
UNION ALL SELECT * FROM age_summary
UNION ALL SELECT * FROM duration_summary;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('flag_binge', 0.0), {'flag_name': 0, 'pct_of_rows': 1})
Row(('flag_age_extreme', 1.74), {'flag_name': 0, 'pct_of_rows': 1})
Row(('flag_duration_anomaly', 2.21), {'flag_name': 0, 'pct_of_rows': 1})


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

Based on the verification query results, the `flag_duration_anomaly` is the most common at 2.21% of rows, followed by `flag_age_extreme` at 1.74%, while `flag_binge` is 0.0%. I would keep all of them as features, especially `flag_duration_anomaly` and `flag_age_extreme` due to their non-zero occurrence. These flags represent potentially interesting behaviors or data issues that could be valuable for downstream analysis or machine learning models. For instance, `flag_duration_anomaly` might indicate issues with movie metadata or unusual viewing patterns, while `flag_age_extreme` could highlight data entry errors or a small segment of the user base with potentially unique behavior. Even if the percentage is low, these flags can help identify edge cases or specific user segments that might require different handling or analysis.

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


In [11]:
import re
import os
import json

# Define the path for the SQL file
sql_file_path = 'dq_queries.sql'
# Define the path to the current notebook file
notebook_path = '/content/drive/MyDrive/MGMT467/Unit2_Lab1_PromptPlusExamples_Colab_Kaggle_GCS_BQ_DQ(Final).ipynb' # **IMPORTANT: Replace with the actual path to your notebook**

# Initialize an empty string to store SQL queries
all_sql_queries = ""

try:
    # Read the notebook content from the file
    with open(notebook_path, 'r') as f:
        notebook_content = json.load(f)

    # Iterate through cells and extract SQL queries from code cells
    for cell in notebook_content.get('cells', []):
        if cell.get('cell_type') == 'code':
            code_content = "".join(cell.get('source', [])) # Join list of lines into a single string
            # Look for lines that contain BigQuery query patterns (e.g., SELECT, WITH, --, CREATE, etc.)
            # This regex looks for lines starting with SQL keywords or comments within code cells
            sql_lines = re.findall(r"^\s*(SELECT|WITH|--|CREATE|ALTER|DROP|INSERT|UPDATE|DELETE).*$", code_content, re.MULTILINE | re.IGNORECASE)
            if sql_lines:
                 # A more reliable way to extract the full query might be to look for multi-line strings
                 # often used for SQL queries in Python code.
                 query_match = re.search(r"query\s*=\s*f?\"\"\"(.*?)\"\"\"", code_content, re.DOTALL)
                 if query_match:
                    all_sql_queries += f"-- Query from cell (potentially): {cell.get('metadata', {}).get('id', 'unknown')}\n"
                    all_sql_queries += query_match.group(1).strip() + ";\n\n"


    # Write the extracted queries to the SQL file
    with open(sql_file_path, 'w') as f:
        f.write(all_sql_queries)

    print(f"SQL queries exported to {sql_file_path}")

    # You would then use git commands in a shell cell to add and commit this file to your repo
    # Example:
    # !git add dq_queries.sql
    # !git commit -m "Export DQ queries"
    # !git push

except FileNotFoundError:
    print(f"Error: Notebook file not found at {notebook_path}. Please update the 'notebook_path' variable with the correct path.")
except Exception as e:
    print(f"An error occurred: {e}")

SQL queries exported to dq_queries.sql


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
