<a href="https://colab.research.google.com/github/elebon26/mgmt467-analytics-portfolio/blob/main/Labs/Unit2/(Ethan_Lebon_Completed)_MGMT467_PromptPlusExamples_Colab_Kaggle_GCS_BQ_DQipynb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [12]:
#EXAMPLE (from LLM) — Auth + Project/Region (commented; write your own cell using the prompt)
from google.colab import auth
auth.authenticate_user()

import os
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "us-central1"  # keep consistent; change if instructed
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
print("Project:", PROJECT_ID, "| Region:", REGION)

# Set active project for gcloud/BigQuery CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT
!gcloud config get-value project
# Done: Auth + Project/Region set

Enter your GCP Project ID: mgmt-467-1234
Project: mgmt-467-1234 | Region: us-central1
Updated property [core/project].
mgmt-467-1234


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [2]:
# Verification step: Check active project and region
!gcloud config get-value project
import os
print("Project:", PROJECT_ID, "| Region:", REGION)

mgmt-467-1234
Project: mgmt-467-1234 | Region: us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [3]:
# EXAMPLE (from LLM) — Kaggle setup (commented)
from google.colab import files
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()

import os
os.makedirs('/root/.kaggle', exist_ok=True)
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])
os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only

!kaggle --version

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle (1).json to kaggle (1).json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [4]:
# Verification step: show the first 20 lines of kaggle --help
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

Strict 0600 permissions on API tokens are used to ensure only the file owner can read and write to the file, which helps prevent unauthorized access. This lowers the risks of credential theft and protects things like api keys.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [6]:
!mkdir -p /content/data/raw

!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

!unzip -o /content/data/*.zip -d /content/data/raw

!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
netflix-2025user-behavior-dataset-210k-records.zip: Skipping, found more recently modified local copy (use --force to force download)
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root root 1.6M Aug  2 1

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [9]:
# Verification step: assert that there are exactly six CSV files and print their names
import glob
csv_files = glob.glob('/content/data/raw/*.csv')
assert len(csv_files) == 6, f"Expected 6 CSV files, but found {len(csv_files)}"
print("Found exactly 6 CSV files:")
for f in csv_files:
    print(f)

Found exactly 6 CSV files:
/content/data/raw/movies.csv
/content/data/raw/watch_history.csv
/content/data/raw/search_logs.csv
/content/data/raw/users.csv
/content/data/raw/recommendation_logs.csv
/content/data/raw/reviews.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

Keeping a clean file inventory helps ensure that all expected files are present and correctly named before processing. This prevents errors in downstream steps that rely on specific file paths and names, and aids in debugging and auditing the data pipeline.

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [13]:
import uuid
import os

# Create a unique bucket name with a random suffix
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Create the bucket
print(f"Creating bucket: gs://{bucket_name} in region: {os.environ.get('REGION')}")
# Use the --location flag with the REGION environment variable
!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION

# Upload all CSV files to the 'netflix/' prefix in the bucket
print(f"\nUploading files to gs://{bucket_name}/netflix/")
!gcloud storage cp /content/data/raw/* gs://$BUCKET_NAME/netflix/

print(f"\nSuccessfully created bucket: {bucket_name} and uploaded files.")
print("\nBenefits of staging data in GCS:")
print("- **Consistent Source:** GCS provides a stable and versionable location for your data.")
print("- **Scalability:** GCS is highly scalable, handling large datasets easily.")
print("- **Integration:** GCS integrates seamlessly with other Google Cloud services like BigQuery.")
print("- **Cost-Effective:** GCS can be a cost-effective storage solution.")

print(f"\nVerifying contents of gs://{bucket_name}/netflix/:")
!gcloud storage ls gs://$BUCKET_NAME/netflix/

Creating bucket: gs://mgmt467-netflix-853acaec in region: us-central1
Creating gs://mgmt467-netflix-853acaec/...

Uploading files to gs://mgmt467-netflix-853acaec/netflix/
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-853acaec/netflix/movies.csv
Copying file:///content/data/raw/README.md to gs://mgmt467-netflix-853acaec/netflix/README.md
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-853acaec/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-853acaec/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-853acaec/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-853acaec/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-853acaec/netflix/watch_history.csv

Average throughput: 27.8MiB/s

Successfully created bucket: mgmt467-netflix-853acaec and uploaded files.

B

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [15]:
# Verification step: list objects in the netflix/ prefix and show sizes
import os
bucket_name = os.environ.get("BUCKET_NAME")
if bucket_name:
  !gcloud storage ls -l gs://$BUCKET_NAME/netflix/
else:
  print("BUCKET_NAME environment variable not set.")

      8002  2025-10-26T18:49:26Z  gs://mgmt467-netflix-853acaec/netflix/README.md
    115942  2025-10-26T18:49:26Z  gs://mgmt467-netflix-853acaec/netflix/movies.csv
   4695557  2025-10-26T18:49:27Z  gs://mgmt467-netflix-853acaec/netflix/recommendation_logs.csv
   1861942  2025-10-26T18:49:27Z  gs://mgmt467-netflix-853acaec/netflix/reviews.csv
   2250902  2025-10-26T18:49:27Z  gs://mgmt467-netflix-853acaec/netflix/search_logs.csv
   1606820  2025-10-26T18:49:26Z  gs://mgmt467-netflix-853acaec/netflix/users.csv
   9269425  2025-10-26T18:49:27Z  gs://mgmt467-netflix-853acaec/netflix/watch_history.csv
TOTAL: 7 objects, 19808590 bytes (18.89MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

Two benefits of staging data in GCS versus loading directly from local Colab are scalability and integration with other Google Cloud services. GCS can handle much larger datasets than Colab's local storage and provides a central, accessible location for services like BigQuery to load data from efficiently.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [20]:
# Create the dataset (idempotent)
import os
project_id = os.environ['GOOGLE_CLOUD_PROJECT']
dataset_id = "netflix"
# Attempt to create; ignore if exists
!bq --location=US mk -d --description "MGMT467 Netflix dataset" {project_id}.{dataset_id} || echo "Dataset may already exist."

BigQuery error in mk operation: Cannot determine dataset described by
mgmt-467-1234.netflix
Dataset may already exist.


In [19]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
bucket_name = os.environ['BUCKET_NAME']
dataset_id = "netflix"

tables = ['users', 'movies', 'watch_history', 'recommendation_logs', 'search_logs', 'reviews']

client = bigquery.Client(project=project_id)

for table_id in tables:
    # Define the table reference
    table_ref = client.dataset(dataset_id).table(table_id)

    # Configure the load job
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
    )

    # Define the source URI
    uri = f"gs://{bucket_name}/netflix/{table_id}.csv"

    # Start the load job
    load_job = client.load_table_from_uri(uri, table_ref, job_config=job_config)

    print(f"Starting load job for {table_id}...")
    load_job.result()  # Wait for the job to complete
    print(f"Load job for {table_id} completed.")

print("\nRow counts for loaded tables:")
for table_id in tables:
    query = f"SELECT COUNT(*) AS row_count FROM `{project_id}.{dataset_id}.{table_id}`"
    query_job = client.query(query)
    results = query_job.result()
    for row in results:
        print(f"{table_id}: {row['row_count']} rows")

Starting load job for users...
Load job for users completed.
Starting load job for movies...
Load job for movies completed.
Starting load job for watch_history...
Load job for watch_history completed.
Starting load job for recommendation_logs...
Load job for recommendation_logs completed.
Starting load job for search_logs...
Load job for search_logs completed.
Starting load job for reviews...
Load job for reviews completed.

Row counts for loaded tables:
users: 41200 rows
movies: 4160 rows
watch_history: 420000 rows
recommendation_logs: 208000 rows
search_logs: 106000 rows
reviews: 61800 rows


### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [25]:
# Verification step: single query for all table row counts
import os
from google.cloud import bigquery
import pandas as pd

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  table_id AS table_name,
  row_count
FROM `{project_id}.netflix.__TABLES__`;
"""

query_job = client.query(query)
results_df = query_job.to_dataframe()

# Print the results as a DataFrame
display(results_df)

Unnamed: 0,table_name,row_count
0,movies,4160
1,recommendation_logs,208000
2,reviews,61800
3,search_logs,106000
4,users,41200
5,watch_history,420000
6,watch_history_dedup,100000
7,watch_history_robust,100000


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

Autodetect is acceptable for initial exploration or when the data structure is simple and consistent. You should enforce explicit schemas when data types are critical for analysis or when dealing with inconsistent data to prevent errors and ensure data integrity.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [29]:
# Query 1: Total rows and % missing in country, subscription_plan, age from users
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query_missing_percentages = f"""
SELECT
  COUNT(*) AS total_rows,
  COUNTIF(country IS NULL) AS missing_country,
  ROUND(100 * COUNTIF(country IS NULL) / COUNT(*), 2) AS pct_missing_country,
  COUNTIF(subscription_plan IS NULL) AS missing_subscription_plan,
  ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan,
  COUNTIF(age IS NULL) AS missing_age,
  ROUND(100 * COUNTIF(age IS NULL) / COUNT(*), 2) AS pct_missing_age
FROM `{project_id}.netflix.users`;
"""

query_job_missing_percentages = client.query(query_missing_percentages)
results_missing_percentages = query_job_missing_percentages.result()

print("Missingness percentages:")
for row in results_missing_percentages:
    print(row)

Missingness percentages:
Row((41200, 0, 0.0, 0, 0.0, 4916, 11.93), {'total_rows': 0, 'missing_country': 1, 'pct_missing_country': 2, 'missing_subscription_plan': 3, 'pct_missing_subscription_plan': 4, 'missing_age': 5, 'pct_missing_age': 6})


In [30]:
# Query 2: % subscription_plan missing by country ordered descending (MAR example)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query_missing_by_country = f"""
SELECT
  country,
  COUNTIF(subscription_plan IS NULL) AS missing_subscription_plan,
  COUNT(*) AS total_in_country,
  ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan
FROM `{project_id}.netflix.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC;
"""

query_job_missing_by_country = client.query(query_missing_by_country)
results_missing_by_country = query_job_missing_by_country.result()

print("\n% subscription_plan missing by country:")
for row in results_missing_by_country:
    print(row)

# Comment on MAR: If the percentage of missing subscription plans varies significantly by country,
# this could indicate Missing At Random (MAR), meaning the missingness is related to the country.


% subscription_plan missing by country:
Row(('Canada', 0, 12384, 0.0), {'country': 0, 'missing_subscription_plan': 1, 'total_in_country': 2, 'pct_missing_subscription_plan': 3})
Row(('USA', 0, 28816, 0.0), {'country': 0, 'missing_subscription_plan': 1, 'total_in_country': 2, 'pct_missing_subscription_plan': 3})


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [32]:
# Verification step: print the three missingness percentages
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query_verification = f"""
SELECT
  ROUND(100 * COUNTIF(country IS NULL) / COUNT(*), 2) AS pct_missing_country,
  ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan,
  ROUND(100 * COUNTIF(age IS NULL) / COUNT(*), 2) AS pct_missing_age
FROM `{project_id}.netflix.users`;
"""

query_job_verification = client.query(query_verification)
results_verification = query_job_verification.result()

print("Verification of missingness percentages:")
for row in results_verification:
    print(row)

Verification of missingness percentages:
Row((0.0, 0.0, 11.93), {'pct_missing_country': 0, 'pct_missing_subscription_plan': 1, 'pct_missing_age': 2})


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

From the missingness analysis, the `age` column is the most missing at 11.93%. `country` and `subscription_plan` have no missing values. The missingness in `age` could be Missing At Random (MAR) if it's related to another variable like `country` (e.g., data collection practices vary by region), or Missing Not At Random (MNAR) if people are less likely to provide their age for privacy reasons.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [34]:
# Query 1: Report duplicate groups on (user_id, movie_id, event_ts, device_type) with counts (top 20)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  user_id,
  movie_id,
  watch_date, -- Assuming watch_date is the timestamp column
  device_type,
  COUNT(*) as duplicate_count
FROM `{project_id}.netflix.watch_history`
GROUP BY
  user_id,
  movie_id,
  watch_date,
  device_type
HAVING
  COUNT(*) > 1
ORDER BY
  duplicate_count DESC
LIMIT 20;
"""

query_job = client.query(query)
results = query_job.result()

print("Top 20 Duplicate Groups in watch_history:")
for row in results:
    print(row)

Top 20 Duplicate Groups in watch_history:
Row(('user_03310', 'movie_0640', datetime.date(2024, 9, 8), 'Smart TV', 16), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'duplicate_count': 4})
Row(('user_00391', 'movie_0893', datetime.date(2024, 8, 26), 'Laptop', 16), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'duplicate_count': 4})
Row(('user_02138', 'movie_0729', datetime.date(2025, 8, 28), 'Laptop', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'duplicate_count': 4})
Row(('user_03094', 'movie_0114', datetime.date(2024, 5, 13), 'Smart TV', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'duplicate_count': 4})
Row(('user_04027', 'movie_0652', datetime.date(2024, 1, 2), 'Mobile', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'duplicate_count': 4})
Row(('user_08799', 'movie_0427', datetime.date(2024, 1, 27), 'Smart TV', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_typ

In [35]:
# Query 2: Create table watch_history_dedup that keeps one row per group (prefer higher progress_ratio, then minutes_watched)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(row_num) FROM (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY user_id, movie_id, watch_date, device_type
      ORDER BY progress_percentage DESC, watch_duration_minutes DESC
    ) as row_num
  FROM `{project_id}.netflix.watch_history`
)
WHERE row_num = 1;
"""

query_job = client.query(query)
results = query_job.result()

print("Deduplicated table watch_history_dedup created successfully.")

Deduplicated table watch_history_dedup created successfully.


### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [36]:
# Verification step: before/after count query comparing raw vs watch_history_dedup
import os
from google.cloud import bigquery
import pandas as pd

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  'raw' AS table_name,
  COUNT(*) AS row_count
FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT
  'deduplicated' AS table_name,
  COUNT(*) AS row_count
FROM `{project_id}.netflix.watch_history_dedup`;
"""

query_job = client.query(query)
results_df = query_job.to_dataframe()

display(results_df)

Unnamed: 0,table_name,row_count
0,deduplicated,100000
1,raw,420000


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

Duplicates can arise from natural user behavior (e.g., refreshing a page) or system errors (e.g., double logging). They corrupt labels and KPIs by artificially inflating counts (e.g., watch time, event triggers), leading to inaccurate analysis and potentially flawed business or model decisions.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [39]:
# Query 1: Compute IQR bounds for watch_duration_minutes on watch_history_dedup and report % outliers.
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

# Calculate Q1, Q3, and IQR using PERCENTILE_CONT
query_iqr = f"""
WITH Quantiles AS (
  SELECT
    PERCENTILE_CONT(watch_duration_minutes, 0.25) OVER() AS q1,
    PERCENTILE_CONT(watch_duration_minutes, 0.75) OVER() AS q3
  FROM `{project_id}.netflix.watch_history_dedup`
)
SELECT
  ANY_VALUE(q1) AS q1,
  ANY_VALUE(q3) AS q3,
  ANY_VALUE(q3) - ANY_VALUE(q1) AS iqr
FROM Quantiles
LIMIT 1;
"""

query_job_iqr = client.query(query_iqr)
results_iqr = query_job_iqr.result()
for row in results_iqr:
    q1 = row['q1']
    q3 = row['q3']
    iqr = row['iqr']
    print(f"Q1: {q1}, Q3: {q3}, IQR: {iqr}")

# Calculate outlier percentage using PERCENTILE_CONT
query_outliers = f"""
WITH Quantiles AS (
  SELECT
    PERCENTILE_CONT(watch_duration_minutes, 0.25) OVER() AS q1,
    PERCENTILE_CONT(watch_duration_minutes, 0.75) OVER() AS q3
  FROM `{project_id}.netflix.watch_history_dedup`
),
Bounds AS (
  SELECT
    ANY_VALUE(q1) - 1.5 * (ANY_VALUE(q3) - ANY_VALUE(q1)) AS lower_bound,
    ANY_VALUE(q3) + 1.5 * (ANY_VALUE(q3) - ANY_VALUE(q1)) AS upper_bound
  FROM Quantiles
  LIMIT 1
)
SELECT
  COUNT(*) AS total_rows,
  COUNTIF(t.watch_duration_minutes < b.lower_bound OR t.watch_duration_minutes > b.upper_bound) AS outlier_rows,
  ROUND(100 * COUNTIF(t.watch_duration_minutes < b.lower_bound OR t.watch_duration_minutes > b.upper_bound) / COUNT(*), 2) AS pct_outliers
FROM `{project_id}.netflix.watch_history_dedup` AS t, Bounds AS b
GROUP BY b.lower_bound, b.upper_bound;
"""

query_job_outliers = client.query(query_outliers)
results_outliers = query_job_outliers.result()

print("\nOutlier percentages for watch_duration_minutes:")
for row in results_outliers:
    print(row)

Q1: 29.1, Q3: 82.4, IQR: 53.300000000000004

Outlier percentages for watch_duration_minutes:
Row((100000, 3531, 3.53), {'total_rows': 0, 'outlier_rows': 1, 'pct_outliers': 2})


In [40]:
# Query 2: Create watch_history_robust with watch_duration_minutes_capped capped at P01/P99; return quantile summaries before/after.
import os
from google.cloud import bigquery
import pandas as pd

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

# Calculate P1 and P99 for capping
query_percentiles = f"""
WITH Percentiles AS (
  SELECT
    PERCENTILE_CONT(watch_duration_minutes, 0.01) OVER() AS p1,
    PERCENTILE_CONT(watch_duration_minutes, 0.99) OVER() AS p99
  FROM `{project_id}.netflix.watch_history_dedup`
)
SELECT
  ANY_VALUE(p1) AS p1,
  ANY_VALUE(p99) AS p99
FROM Percentiles
LIMIT 1;
"""

query_job_percentiles = client.query(query_percentiles)
results_percentiles = query_job_percentiles.result()
for row in results_percentiles:
    p1 = row['p1']
    p99 = row['p99']
    print(f"P1: {p1}, P99: {p99}")

# Create the watch_history_robust table with capped watch_duration_minutes
query_create_table = f"""
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_robust` AS
SELECT
  *,
  CASE
    WHEN watch_duration_minutes < {p1} THEN {p1}
    WHEN watch_duration_minutes > {p99} THEN {p99}
    ELSE watch_duration_minutes
  END AS watch_duration_minutes_capped
FROM `{project_id}.netflix.watch_history_dedup`;
"""

query_job_create_table = client.query(query_create_table)
query_job_create_table.result()
print("\nTable watch_history_robust created with watch_duration_minutes_capped.")

# Quantile summaries before and after capping
query_quantiles_summary = f"""
SELECT
  'before_capping' AS source,
  MIN(watch_duration_minutes) AS min_duration,
  APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(2)] AS median_duration,
  MAX(watch_duration_minutes) AS max_duration
FROM `{project_id}.netflix.watch_history_dedup`
UNION ALL
SELECT
  'after_capping' AS source,
  MIN(watch_duration_minutes_capped) AS min_duration,
  APPROX_QUANTILES(watch_duration_minutes_capped, 4)[OFFSET(2)] AS median_duration,
  MAX(watch_duration_minutes_capped) AS max_duration
FROM `{project_id}.netflix.watch_history_robust`;
"""

query_job_quantiles_summary = client.query(query_quantiles_summary)
results_quantiles_summary_df = query_job_quantiles_summary.to_dataframe()

print("\nQuantile summaries before and after capping:")
display(results_quantiles_summary_df)

P1: 4.4, P99: 364.0359999999989

Table watch_history_robust created with watch_duration_minutes_capped.

Quantile summaries before and after capping:


Unnamed: 0,source,min_duration,median_duration,max_duration
0,before_capping,0.2,51.1,799.3
1,after_capping,4.4,51.1,364.036


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [41]:
# Verification step: show min/median/max before vs after capping
import os
from google.cloud import bigquery
import pandas as pd

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  'before_capping' AS source,
  MIN(watch_duration_minutes) AS min_duration,
  APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(2)] AS median_duration,
  MAX(watch_duration_minutes) AS max_duration
FROM `{project_id}.netflix.watch_history_dedup`
UNION ALL
SELECT
  'after_capping' AS source,
  MIN(watch_duration_minutes_capped) AS min_duration,
  APPROX_QUANTILES(watch_duration_minutes_capped, 4)[OFFSET(2)] AS median_duration,
  MAX(watch_duration_minutes_capped) AS max_duration
FROM `{project_id}.netflix.watch_history_robust`;
"""

query_job = client.query(query)
results_df = query_job.to_dataframe()

display(results_df)

Unnamed: 0,source,min_duration,median_duration,max_duration
0,before_capping,0.2,51.1,799.3
1,after_capping,4.4,51.1,364.036


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

Capping might be harmful if the outliers represent genuine, important data points that hold valuable information, potentially distorting the true distribution or relationship with other variables. Tree-based models like Random Forests or Gradient Boosting Machines are generally less sensitive to outliers because they make decisions based on splitting data at certain values rather than relying on the magnitude of individual data points.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [43]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(watch_duration_minutes_capped > 8*60) AS sessions_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(watch_duration_minutes_capped > 8*60)/COUNT(*),2) AS pct
FROM `{project_id}.netflix.watch_history_robust`;
"""

query_job = client.query(query)
results = query_job.result()

for row in results:
    print(row)

Row((0, 100000, 0.0), {'sessions_over_8h': 0, 'total': 1, 'pct': 2})


In [45]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_rows,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct
FROM `{project_id}.netflix.users`;
"""

query_job = client.query(query)
results = query_job.result()

for row in results:
    print(row)

Row((716, 41200, 1.74), {'extreme_age_rows': 0, 'total': 1, 'pct': 2})


In [46]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(duration_minutes < 15) AS titles_under_15m,
  COUNTIF(duration_minutes > 480) AS titles_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_duration_anomaly
FROM `{project_id}.netflix.movies`;
"""

query_job = client.query(query)
results = query_job.result()


for row in results:
    print(row)

Row((48, 44, 4160, 2.21), {'titles_under_15m': 0, 'titles_over_8h': 1, 'total': 2, 'pct_duration_anomaly': 3})


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [47]:
# Verification step: single compact summary query for anomaly flags
import os
from google.cloud import bigquery
import pandas as pd

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  'flag_binge' AS flag_name,
  ROUND(100 * COUNTIF(watch_duration_minutes_capped > 8*60) / COUNT(*), 2) AS pct_of_rows
FROM `{project_id}.netflix.watch_history_robust`
UNION ALL
SELECT
  'flag_age_extreme' AS flag_name,
  ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2) AS pct_of_rows
FROM `{project_id}.netflix.users`
UNION ALL
SELECT
  'flag_duration_anomaly' AS flag_name,
  ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2) AS pct_of_rows
FROM `{project_id}.netflix.movies`;
"""

query_job = client.query(query)
results_df = query_job.to_dataframe()

display(results_df)

Unnamed: 0,flag_name,pct_of_rows
0,flag_binge,0.0
1,flag_age_extreme,1.74
2,flag_duration_anomaly,2.21


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

Based on the verification query, the flag_duration_anomaly is the most common anomaly flag at 2.21%, followed by flag_age_extreme at 1.74%. The flag_binge flag is 0%. I would likely keep flag_duration_anomaly as a feature because it indicates potentially problematic data points that could affect analysis or models, and it's the most prevalent anomaly.

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
