# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [8]:
# Authenticate to Google Cloud in Colab
from google.colab import auth
auth.authenticate_user()

import os
# Prompt for Project ID and set Region
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "us-central1"  # Keep consistent; change if instructed

# Export GOOGLE_CLOUD_PROJECT environment variable
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
# Export REGION environment variable
os.environ["REGION"] = REGION

# Set active project for gcloud and BigQuery CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT

# Print the set values
print(f"Project: {PROJECT_ID} | Region: {REGION}")

# Done: Auth + Project/Region set

Enter your GCP Project ID: our-rock-471819-h7
Updated property [core/project].
Project: our-rock-471819-h7 | Region: us-central1


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [2]:
# Verify active project and region
!gcloud config get-value project
import os
print("Region:", os.environ.get("REGION"))

our-rock-471819-h7
Region: None


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

We set PROJECT_ID and REGION at the top of the notebook for consistency and to avoid potential issues.

Setting these values explicitly ensures that all subsequent commands and API calls within the notebook operate within the specified project and region. If we don't set them, commands might default to incorrect or different projects/regions, leading to errors, unexpected costs, or resources being created in unintended locations. It also makes the notebook more reproducible and easier to share.

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [3]:
# Prompt for kaggle.json upload
from google.colab import files
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()

# Save to ~/.kaggle/kaggle.json with secure permissions
import os
os.makedirs('/root/.kaggle', exist_ok=True)
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])
os.chmod('/root/.kaggle/kaggle.json', 0o600)  # Set owner-only read/write permissions for security

# Verify Kaggle CLI installation and version
# This confirms the API is ready to use for downloading datasets reproducibly
!kaggle --version

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle (1).json to kaggle (1).json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [4]:
# Verify Kaggle CLI is ready by showing the first 20 lines of help
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

Requiring strict 0600 permissions on API tokens means that only the owner of the file can read and write to it. This is crucial for security because API tokens are essentially passwords that grant access to your accounts and data on platforms like Kaggle.

By setting these strict permissions, we are avoiding several risks:

Unauthorized Access: Prevents other users or processes on the system from reading your API token and potentially using it to access your Kaggle account, download data, or submit to competitions under your name.
Data Breaches: Reduces the risk of your API token being compromised in the event of a security vulnerability on the system where it is stored.
Reproducibility Issues: Ensures that your API calls are consistently made with your credentials, avoiding issues that could arise if the wrong credentials were used.
In essence, it's a fundamental security practice to protect sensitive credentials like API tokens from unauthorized access.



## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [5]:
# Create directory for raw data
!mkdir -p /content/data/raw

# Download the dataset from Kaggle to /content/data
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

# Unzip the downloaded dataset into the raw data directory
# -o flag is used to overwrite files without prompting
!unzip -o /content/data/*.zip -d /content/data/raw

# List all CSV files in the raw data directory with their sizes
# -l flag for long listing format, -h for human-readable sizes
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 513MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root 

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [6]:
import glob
import os

csv_files = glob.glob('/content/data/raw/*.csv')
# Assert there are exactly six CSV files
assert len(csv_files) == 6, f"Expected 6 CSV files, but found {len(csv_files)}"

print("Found exactly 6 CSV files:")
for csv_file in csv_files:
    print(os.path.basename(csv_file))

Found exactly 6 CSV files:
movies.csv
watch_history.csv
search_logs.csv
users.csv
recommendation_logs.csv
reviews.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

Keeping a clean file inventory with names and sizes is useful downstream for several reasons:

Auditing and Reproducibility: It provides a clear record of the raw data files used in the analysis. This is essential for auditing purposes and ensuring that the process can be reproduced exactly in the future.
Troubleshooting: If there are issues later in the pipeline (e.g., missing data, incorrect counts), you can refer back to the file inventory to quickly check if all expected files were downloaded and if their sizes are as anticipated.
Resource Management: Knowing the sizes of the files helps in estimating storage requirements in GCS and BigQuery, which is important for cost management and planning.
Scripting and Automation: Consistent file names and locations make it easier to write scripts and automate the data processing pipeline.
Collaboration: When working in a team, a clear file inventory helps everyone understand the data sources and their characteristics.


## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [9]:
import uuid
import os

# Generate a unique bucket name
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Create the GCS bucket
# The --location flag is set to the REGION environment variable
print(f"Creating bucket: {bucket_name} in region: {os.environ.get('REGION')}")
!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION

# Upload all CSV files to the bucket under the 'netflix/' prefix
print(f"Uploading files to gs://{bucket_name}/netflix/")
!gcloud storage cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# Print the bucket name and explain staging benefits
print("\nBucket created and files uploaded successfully.")
print(f"Bucket Name: {bucket_name}")
print("""
Benefits of staging data in GCS:
- Centralized storage: Provides a single, accessible location for your data.
- Versioning: GCS can manage versions of your data, allowing for rollback if needed.
- Integration with GCP services: Seamlessly integrates with services like BigQuery, Dataproc, and AI Platform.
- Durability and availability: Data is stored redundantly across multiple locations.
- Cost-effective: Generally cheaper for long-term storage compared to keeping data in Colab.
""")

Creating bucket: mgmt467-netflix-05f85191 in region: us-central1
Creating gs://mgmt467-netflix-05f85191/...
Uploading files to gs://mgmt467-netflix-05f85191/netflix/
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-05f85191/netflix/movies.csv
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-05f85191/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-05f85191/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-05f85191/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-05f85191/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-05f85191/netflix/watch_history.csv

Average throughput: 60.8MiB/s

Bucket created and files uploaded successfully.
Bucket Name: mgmt467-netflix-05f85191

Benefits of staging data in GCS:
- Centralized storage: Provides a single, accessible loc

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [10]:
# List the objects in the 'netflix/' prefix of the GCS bucket with sizes
import os
bucket_name = os.environ.get("BUCKET_NAME")
if bucket_name:
  print(f"Listing contents of gs://{bucket_name}/netflix/")
  !gcloud storage ls -l gs://$BUCKET_NAME/netflix/
else:
  print("BUCKET_NAME environment variable not set.")

Listing contents of gs://mgmt467-netflix-05f85191/netflix/
    115942  2025-10-24T22:21:45Z  gs://mgmt467-netflix-05f85191/netflix/movies.csv
   4695557  2025-10-24T22:21:45Z  gs://mgmt467-netflix-05f85191/netflix/recommendation_logs.csv
   1861942  2025-10-24T22:21:45Z  gs://mgmt467-netflix-05f85191/netflix/reviews.csv
   2250902  2025-10-24T22:21:45Z  gs://mgmt467-netflix-05f85191/netflix/search_logs.csv
   1606820  2025-10-24T22:21:45Z  gs://mgmt467-netflix-05f85191/netflix/users.csv
   9269425  2025-10-24T22:21:45Z  gs://mgmt467-netflix-05f85191/netflix/watch_history.csv
TOTAL: 6 objects, 19800588 bytes (18.88MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

Staging data in GCS before loading it into BigQuery offers several benefits compared to loading directly from local Colab:

Scalability and Performance: GCS is a managed, scalable storage service. Loading data from GCS to BigQuery is generally faster and more efficient, especially for large datasets, as it leverages Google Cloud's internal network and optimized data transfer paths. Loading directly from Colab can be slower and less reliable for large files due to potential network constraints and the ephemeral nature of Colab runtimes.
Reliability and Durability: Data stored in GCS is highly durable and available, with built-in redundancy. Loading directly from a local Colab environment means the data source is tied to that specific instance, which could be interrupted. GCS provides a persistent and reliable source for your data loads.

Accessibility and Collaboration: Data in GCS is easily accessible by other Google Cloud services and can be shared among team members. Loading from local Colab makes the data less accessible for other processes or collaborators.
Versioning and Lifecycle Management: GCS offers features like object versioning and lifecycle management, which are not easily replicated when loading directly from a local environment. These features are crucial for data governance and reproducibility.


## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [11]:
# Cell A: Create (idempotently) dataset netflix in US multi-region
DATASET = "netflix"
print(f"Attempting to create dataset: {DATASET}")
# Attempt to create; ignore if exists
# The --location flag is set to US for multi-region
!bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist or other error occurred."

Attempting to create dataset: netflix
BigQuery error in mk operation: Dataset 'our-rock-471819-h7:netflix' already
exists.
Dataset may already exist or other error occurred.


In [19]:
# Cell B: Load tables from gs://$BUCKET_NAME/netflix/
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}

import os
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
bucket_name = os.environ.get("BUCKET_NAME")
dataset_name = "netflix" # Defined in Cell A

if not project_id:
    print("Error: GOOGLE_CLOUD_PROJECT environment variable not set.")
elif not bucket_name:
    print("Error: BUCKET_NAME environment variable not set.")
else:
    for tbl, fname in tables.items():
        src = f"gs://{bucket_name}/netflix/{fname}"
        print(f"Loading table: {dataset_name}.{tbl} from {src}")
        # Load data with autodetect, skipping header row
        !bq load --skip_leading_rows=1 --autodetect --source_format=CSV {dataset_name}.{tbl} {src}

    # Finish with row-count queries for each table
    print("\nVerifying row counts:")
    for tbl in tables.keys():
        print(f"Row count for {dataset_name}.{tbl}:")
        !bq query --nouse_legacy_sql f"SELECT COUNT(*) AS n FROM `{project_id}.{dataset_name}.{tbl}`"

Loading table: netflix.users from gs://mgmt467-netflix-05f85191/netflix/users.csv
Waiting on bqjob_r409a64e99576f0fc_0000019a185aa4a9_1 ... (1s) Current status: DONE   
Loading table: netflix.movies from gs://mgmt467-netflix-05f85191/netflix/movies.csv
Waiting on bqjob_r450dc9ac4182dbfc_0000019a185ac052_1 ... (1s) Current status: DONE   
Loading table: netflix.watch_history from gs://mgmt467-netflix-05f85191/netflix/watch_history.csv
Waiting on bqjob_r5084d17934bfd73d_0000019a185ad616_1 ... (2s) Current status: DONE   
Loading table: netflix.recommendation_logs from gs://mgmt467-netflix-05f85191/netflix/recommendation_logs.csv
Waiting on bqjob_r77d2c66d5a7b723_0000019a185af3c7_1 ... (1s) Current status: DONE   
Loading table: netflix.search_logs from gs://mgmt467-netflix-05f85191/netflix/search_logs.csv
Waiting on bqjob_r5ad1073293443352_0000019a185b0a41_1 ... (1s) Current status: DONE   
Loading table: netflix.reviews from gs://mgmt467-netflix-05f85191/netflix/reviews.csv
Waiting on b

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [23]:
%%bigquery
-- Generate a single query that returns table_name, row_count for all six tables
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `netflix.users`
UNION ALL
SELECT 'movies' AS table_name, COUNT(*) AS row_count FROM `netflix.movies`
UNION ALL
SELECT 'watch_history' AS table_name, COUNT(*) AS row_count FROM `netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs' AS table_name, COUNT(*) AS row_count FROM `netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs' AS table_name, COUNT(*) AS row_count FROM `netflix.search_logs`
UNION ALL
SELECT 'reviews' AS table_name, COUNT(*) AS row_count FROM `netflix.reviews`;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,table_name,row_count
0,search_logs,185500
1,users,72100
2,movies,7280
3,reviews,108150
4,watch_history,735000
5,recommendation_logs,364000


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

autodetect in BigQuery is acceptable and convenient for initial data exploration, quick loads of relatively clean data, or when the schema is straightforward and unlikely to change. It saves time by inferring column names, data types, and modes (nullable, required, repeated).

However, you should enforce explicit schemas when:

Data Quality and Consistency are Critical: Explicit schemas provide strict type enforcement. If incoming data doesn't match the schema, the load will fail, immediately alerting you to data quality issues. This prevents incorrect data types from being silently ingested, which can cause errors or unexpected behavior in downstream analysis or applications.
Schema Stability is Required: For production pipelines or long-term data storage, an explicit schema ensures stability. If the source data schema changes unexpectedly, a load with autodetect might infer a different schema, breaking downstream queries or processes that expect the old structure. An explicit schema acts as a contract.
Performance Optimization: Explicit schemas can sometimes lead to better query performance, especially for complex data types or partitioning/clustering strategies that rely on specific column types.
Documentation and Governance: An explicit schema serves as clear documentation of the data structure, which is important for data governance and for other users who need to understand and query the data.
Handling Ambiguous Data: autodetect might make incorrect inferences for ambiguous data, such as strings that could be interpreted as dates or numbers. An explicit schema allows you to define the correct interpretation.
In summary, use autodetect for speed and convenience during exploration, but enforce explicit schemas for reliability, data quality, performance, and maintainability in production or critical workflows.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [33]:
import os
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
dataset_name = "netflix"
table_name = "users"

if project_id:
  print(f"Showing schema for {project_id}:{dataset_name}.{table_name}:")
  !bq show {project_id}:{dataset_name}.{table_name}
else:
  print("GOOGLE_CLOUD_PROJECT environment variable not set.")

Showing schema for our-rock-471819-h7:netflix.users:
Table our-rock-471819-h7:netflix.users

   Last modified                 Schema                Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Total Logical Bytes   Total Physical Bytes   Labels  
 ----------------- ---------------------------------- ------------ ------------- ------------ ------------------- ------------------ --------------------- ---------------------- -------- 
  24 Oct 22:33:07   |- user_id: string                 72100        10639972                                                          10639972              2243077                        
                    |- email: string                                                                                                                                                       
                    |- first_name: string                                                                                                                  

In [36]:
%%bigquery
-- Cell 1: Total rows and % missing in subscription_plan and age from users
SELECT
  COUNT(*) AS total_rows,
  COUNTIF(subscription_plan IS NULL) AS missing_subscription_plan,
  ROUND(SAFE_DIVIDE(COUNTIF(subscription_plan IS NULL), COUNT(*)) * 100, 2) AS pct_missing_subscription_plan,
  COUNTIF(age IS NULL) AS missing_age,
  ROUND(SAFE_DIVIDE(COUNTIF(age IS NULL), COUNT(*)) * 100, 2) AS pct_missing_age
FROM `netflix.users`;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,total_rows,missing_subscription_plan,pct_missing_subscription_plan,missing_age,pct_missing_age
0,72100,0,0.0,8603,11.93


In [37]:
%%bigquery
-- Cell 2: % subscription_plan missing by country ordered descending.
-- Examining missingness by country can help identify if missingness is
-- related to the country (Missing At Random - MAR).
SELECT
  country,
  COUNT(*) AS total_in_country,
  COUNTIF(subscription_plan IS NULL) AS missing_subscription_plan_in_country,
  ROUND(SAFE_DIVIDE(COUNTIF(subscription_plan IS NULL), COUNT(*)) * 100, 2) AS pct_missing_subscription_plan
FROM `netflix.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,country,total_in_country,missing_subscription_plan_in_country,pct_missing_subscription_plan
0,Canada,21672,0,0.0
1,USA,50428,0,0.0


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [38]:
%%bigquery
-- Query to print the three missingness percentages from the first analysis query
SELECT
  ROUND(SAFE_DIVIDE(COUNTIF(subscription_plan IS NULL), COUNT(*)) * 100, 2) AS pct_missing_subscription_plan,
  ROUND(SAFE_DIVIDE(COUNTIF(age IS NULL), COUNT(*)) * 100, 2) AS pct_missing_age
FROM `netflix.users`;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,pct_missing_subscription_plan,pct_missing_age
0,0.0,11.93


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

Based on the output of the missingness analysis:

age is the most missing column with 11.93% missing values.
subscription_plan has no missing values (0.0%).
For age, it's possible that the missingness is Missing At Random (MAR). This could be because users might be less likely to provide their age if they fall into certain age groups, or perhaps the data collection method had issues for specific demographics. Without more information or further analysis (like checking if missingness in age is correlated with other user characteristics like country or device type), it's difficult to definitively say if it's MAR or even MNAR (e.g., if older or younger users actively chose not to provide their age).

For subscription_plan, since there are no missing values, we don't need to hypothesize about MCAR/MAR/MNAR for this column

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [43]:
import os
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
dataset_name = "netflix"
table_name = "watch_history"

if project_id:
  print(f"Showing schema for {project_id}:{dataset_name}.{table_name}:")
  !bq show {project_id}:{dataset_name}.{table_name}
else:
  print("GOOGLE_CLOUD_PROJECT environment variable not set.")

Showing schema for our-rock-471819-h7:netflix.watch_history:
Table our-rock-471819-h7:netflix.watch_history

   Last modified                 Schema                Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Total Logical Bytes   Total Physical Bytes   Labels  
 ----------------- ---------------------------------- ------------ ------------- ------------ ------------------- ------------------ --------------------- ---------------------- -------- 
  24 Oct 22:33:20   |- session_id: string              735000       68409684                                                          68409684              8238391                        
                    |- user_id: string                                                                                                                                                     
                    |- movie_id: string                                                                                                    

In [44]:
%%bigquery
-- Cell 1: Report duplicate groups on (user_id, movie_id, watch_date, device_type) with counts (top 20)
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,user_id,movie_id,watch_date,device_type,dup_count
0,user_00391,movie_0893,2024-08-26,Laptop,28
1,user_03310,movie_0640,2024-09-08,Smart TV,28
2,user_04027,movie_0652,2024-01-02,Mobile,21
3,user_03563,movie_0291,2024-02-18,Smart TV,21
4,user_04698,movie_0482,2025-01-18,Mobile,21
5,user_09045,movie_0427,2025-09-26,Mobile,21
6,user_04050,movie_0898,2025-07-05,Mobile,21
7,user_09331,movie_0073,2024-03-23,Smart TV,21
8,user_09815,movie_0827,2024-05-25,Laptop,21
9,user_01807,movie_0921,2025-01-30,Laptop,21


In [45]:
%%bigquery
-- Cell 2: Create table watch_history_dedup that keeps one row per group
-- This query uses a window function (ROW_NUMBER) to assign a rank within each partition
-- defined by the grouping columns (user_id, movie_id, watch_date, device_type).
-- It orders by progress_percentage and then watch_duration_minutes to determine which row to keep.
-- Finally, it selects only the rows where the rank is 1, effectively keeping one "best" row per group.
CREATE OR REPLACE TABLE `netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `netflix.watch_history` h
)
WHERE rk = 1;

Query is running:   0%|          |

### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [46]:
%%bigquery
-- Before/after count query comparing raw vs watch_history_dedup
SELECT
  (SELECT COUNT(*) FROM `netflix.watch_history`) AS raw_row_count,
  (SELECT COUNT(*) FROM `netflix.watch_history_dedup`) AS dedup_row_count,
  (SELECT COUNT(*) FROM `netflix.watch_history`) - (SELECT COUNT(*) FROM `netflix.watch_history_dedup`) AS rows_removed_by_deduplication;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,raw_row_count,dedup_row_count,rows_removed_by_deduplication
0,735000,100000,635000


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

Duplicates in data can arise from both natural and system-generated causes:

Natural Causes:

User Error: A user might accidentally submit the same information multiple times (e.g., submitting a review twice).
Real-World Events: In some datasets, natural events might genuinely result in identical records within a certain timeframe.
System-Generated Causes:

Data Entry Issues: Errors during manual data entry can lead to accidental duplication.
Integration Problems: When combining data from multiple sources, inconsistencies in identifiers or timing can create duplicate records.
Faulty Data Pipelines: Errors in ETL (Extract, Transform, Load) processes can accidentally duplicate records during data movement or transformation.
Retries: Network issues or temporary errors can cause systems to retry operations, leading to duplicate entries if not handled idempotently.
Sensor or Device Malfunctions: Faulty sensors or devices might send duplicate readings.
How they corrupt labels and KPIs:

Duplicates can significantly corrupt labels and Key Performance Indicators (KPIs) by inflating counts and distorting distributions. For example:

Inflated Metrics: If you count the number of watch events or reviews, duplicates will artificially increase these numbers, leading to an overestimation of user activity or engagement.
Incorrect Aggregations: Calculations like average watch duration or total revenue will be skewed by duplicate records.
Biased Machine Learning Models: If duplicates are used to train ML models, the model might learn to overemphasize the patterns present in the duplicated data, leading to biased predictions. For instance, a recommendation model trained on duplicated watch history might unfairly favor content that appears more frequently due to duplicates.
Misleading KPIs: Business metrics like daily active users, conversion rates, or churn rates will be inaccurate, leading to poor business decisions. If a single user's activity is duplicated, it might appear as multiple active users or inflated engagement for that user.
In essence, duplicates create a false representation of the underlying reality, leading to inaccurate insights and potentially harmful decisions based on corrupted data.



### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [47]:
%%bigquery
-- Cell 1: Compute IQR bounds for watch_duration_minutes on watch_history_dedup and report % outliers.
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `netflix.watch_history_dedup`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM `netflix.watch_history_dedup` h
CROSS JOIN bounds b;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,outliers,total,pct_outliers
0,3472,100000,3.47


In [48]:
%%bigquery
-- Cell 2: Create watch_history_robust with watch_duration_minutes_capped capped at P01/P99;
-- return quantile summaries before/after.
CREATE OR REPLACE TABLE `netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(98)] AS p99
  FROM `netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM `netflix.watch_history_dedup` h, q;

-- Quantiles before vs after (using watch_duration_minutes and watch_duration_minutes_capped)
WITH before AS (
  SELECT 'before' AS which, APPROX_QUANTILES(watch_duration_minutes, 5) AS q
  FROM `netflix.watch_history_dedup`
),
after AS (
  SELECT 'after' AS which, APPROX_QUANTILES(watch_duration_minutes_capped, 5) AS q
  FROM `netflix.watch_history_robust`
)
SELECT * FROM before UNION ALL SELECT * FROM after;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,which,q
0,after,"[4.4, 24.6, 41.5, 61.5, 92.0, 203.6]"
1,before,"[0.2, 24.9, 41.7, 61.3, 91.7, 799.3]"


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [49]:
%%bigquery
-- Query to show min/median/max before vs after capping
WITH before AS (
  SELECT
    'before_capping' AS stage,
    MIN(watch_duration_minutes) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes) AS max_duration
  FROM `netflix.watch_history_dedup`
),
after AS (
  SELECT
    'after_capping' AS stage,
    MIN(watch_duration_minutes_capped) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes_capped) AS max_duration
  FROM `netflix.watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,stage,min_duration,median_duration,max_duration
0,after_capping,4.4,51.4,203.6
1,before_capping,0.2,51.2,799.3


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

Capping might be harmful when:

Outliers represent valid, important data: If the extreme values are not due to errors but represent genuine events or characteristics (e.g., a user watching for an extremely long time during a special event), capping them would distort the true distribution and lead to loss of valuable information.
The underlying relationship is non-linear: Capping can artificially compress the range of a variable, which might obscure non-linear relationships or interactions with other features that are important for modeling.
The model is designed to handle outliers: Some models are naturally robust to outliers. Capping might not be necessary and could potentially harm performance if it removes meaningful variance.
A model type less sensitive to outliers is tree-based models, such as Decision Trees, Random Forests, and Gradient Boosting Machines (like LightGBM or XGBoost).

Why? Tree-based models make decisions based on splitting data at specific threshold values. The exact magnitude of an outlier beyond a certain threshold doesn't significantly influence the split point once it's determined. For example, if a split is at watch_duration_minutes > 100, both a session of 200 minutes and a session of 800 minutes will fall into the same branch (> 100), and their specific values won't change how the split is calculated or where other data points fall. This contrasts with models like linear regression, where outliers can disproportionately influence the slope and intercept.



### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [55]:
import os
project_id = os.environ.get("GOOGLE_CLOUD_PROJECT")
dataset_name = "netflix"
table_name = "movies"

if project_id:
  print(f"Showing schema for {project_id}:{dataset_name}.{table_name}:")
  !bq show {project_id}:{dataset_name}.{table_name}
else:
  print("GOOGLE_CLOUD_PROJECT environment variable not set.")

Showing schema for our-rock-471819-h7:netflix.movies:
Table our-rock-471819-h7:netflix.movies

   Last modified                Schema                Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Total Logical Bytes   Total Physical Bytes   Labels  
 ----------------- --------------------------------- ------------ ------------- ------------ ------------------- ------------------ --------------------- ---------------------- -------- 
  24 Oct 22:33:14   |- movie_id: string               7280         814604                                                            814604                60676                          
                    |- title: string                                                                                                                                                      
                    |- content_type: string                                                                                                                  

In [50]:
%%bigquery
-- Cell 1: Compute and summarize flag_binge for sessions > 8 hours in watch_history_robust
-- A session is considered a "binge" if the watch duration is over 8 hours (480 minutes).
SELECT
  COUNTIF(watch_duration_minutes > 480) AS binge_sessions,
  COUNT(*) AS total_sessions,
  ROUND(100*COUNTIF(watch_duration_minutes > 480)/COUNT(*),2) AS pct_binge_sessions
FROM `netflix.watch_history_robust`;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,binge_sessions,total_sessions,pct_binge_sessions
0,639,100000,0.64


In [51]:
%%bigquery
-- Cell 2: Compute and summarize flag_age_extreme if age is <10 or >100 in users.
-- This flags users with potentially extreme or erroneous age values.
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_users,
  COUNT(*) AS total_users,
  ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct_extreme_age_users
FROM `netflix.users`;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,extreme_age_users,total_users,pct_extreme_age_users
0,1253,72100,1.74


In [56]:
%%bigquery
-- Cell 3: Compute and summarize flag_duration_anomaly where duration_minutes < 15 or duration_minutes > 480 in movies.
-- This flags movies with unusually short or long durations.
SELECT
  COUNTIF(duration_minutes < 15 OR duration_minutes > 480) AS anomalous_duration_movies,
  COUNT(*) AS total_movies,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_anomalous_duration_movies
FROM `netflix.movies`;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,anomalous_duration_movies,total_movies,pct_anomalous_duration_movies
0,161,7280,2.21


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [57]:
%%bigquery
-- Generate a single compact summary query that returns two columns per flag: flag_name, pct_of_rows.
SELECT
  'flag_binge' AS flag_name,
  ROUND(100 * COUNTIF(watch_duration_minutes > 480) / COUNT(*), 2) AS pct_of_rows
FROM
  `netflix.watch_history_robust`
UNION ALL
SELECT
  'flag_age_extreme' AS flag_name,
  ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2) AS pct_of_rows
FROM
  `netflix.users`
UNION ALL
SELECT
  'flag_duration_anomaly' AS flag_name,
  ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2) AS pct_of_rows
FROM
  `netflix.movies`;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,flag_name,pct_of_rows
0,flag_binge,0.64
1,flag_age_extreme,1.74
2,flag_duration_anomaly,2.21


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

Based on the summary query output:

flag_duration_anomaly is the most common anomaly flag with 2.21% of movies having unusual durations.
flag_age_extreme is the next most common with 1.74% of users having extreme ages.
flag_binge is the least common with 0.64% of watch sessions flagged as binge sessions.
Which flag to keep as a feature depends on the specific business or ML problem. However, flag_binge could be a particularly valuable feature for several reasons:

Direct Behavioral Insight: Binge-watching is a significant user behavior pattern on streaming platforms. A flag for this directly captures this behavior.
Potential for Engagement Modeling: Binge behavior is likely correlated with high engagement and user satisfaction for specific content. This flag could be a strong predictor in models aiming to understand user engagement or predict churn.
Recommendation Systems: Identifying binge-watchers and the content they binge on could be very useful in recommendation systems, allowing for personalized content suggestions.
Targeted Marketing/Content Strategy: This flag could inform marketing campaigns or content acquisition strategies by highlighting the content that drives binge behavior.
While flag_age_extreme and flag_duration_anomaly are useful for data cleaning and understanding data quality issues, flag_binge is more likely to be a direct indicator of user interaction and preference, making it a potentially powerful feature for various downstream applications.



## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


## Final Submission Checklist

- [ ] Save this notebook to the team Drive.
- [ ] Export a `.sql` file with your DQ queries and save to repo.
- [ ] Push notebook + SQL to the **team GitHub** with a descriptive commit.
- [ ] Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.

## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)


In [None]:
%%bigquery
-- Data Quality Queries

-- Missingness Analysis (users)
-- Cell 1: Total rows and % missing in subscription_plan and age from users
SELECT
  COUNT(*) AS total_rows,
  COUNTIF(subscription_plan IS NULL) AS missing_subscription_plan,
  ROUND(SAFE_DIVIDE(COUNTIF(subscription_plan IS NULL), COUNT(*)) * 100, 2) AS pct_missing_subscription_plan,
  COUNTIF(age IS NULL) AS missing_age,
  ROUND(SAFE_DIVIDE(COUNTIF(age IS NULL), COUNT(*)) * 100, 2) AS pct_missing_age
FROM `netflix.users`;

-- Cell 2: % subscription_plan missing by country ordered descending.
-- Examining missingness by country can help identify if missingness is
-- related to the country (Missing At Random - MAR).
SELECT
  country,
  COUNT(*) AS total_in_country,
  COUNTIF(subscription_plan IS NULL) AS missing_subscription_plan_in_country,
  ROUND(SAFE_DIVIDE(COUNTIF(subscription_plan IS NULL), COUNT(*)) * 100, 2) AS pct_missing_subscription_plan
FROM `netflix.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC;

-- Missingness Verification
-- Query to print the three missingness percentages from the first analysis query
SELECT
  ROUND(SAFE_DIVIDE(COUNTIF(subscription_plan IS NULL), COUNT(*)) * 100, 2) AS pct_missing_subscription_plan,
  ROUND(SAFE_DIVIDE(COUNTIF(age IS NULL), COUNT(*)) * 100, 2) AS pct_missing_age
FROM `netflix.users`;

-- Duplicates Analysis (watch_history)
-- Cell 1: Report duplicate groups on (user_id, movie_id, watch_date, device_type) with counts (top 20)
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;

-- Cell 2: Create table watch_history_dedup that keeps one row per group
-- This query uses a window function (ROW_NUMBER) to assign a rank within each partition
-- defined by the grouping columns (user_id, movie_id, watch_date, device_type).
-- It orders by progress_percentage and then watch_duration_minutes to determine which row to keep.
-- Finally, it selects only the rows where the rank is 1, effectively keeping one "best" row per group.
CREATE OR REPLACE TABLE `netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `netflix.watch_history` h
)
WHERE rk = 1;

-- Duplicates Verification
-- Before/after count query comparing raw vs watch_history_dedup
SELECT
  (SELECT COUNT(*) FROM `netflix.watch_history`) AS raw_row_count,
  (SELECT COUNT(*) FROM `netflix.watch_history_dedup`) AS dedup_row_count,
  (SELECT COUNT(*) FROM `netflix.watch_history`) - (SELECT COUNT(*) FROM `netflix.watch_history_dedup`) AS rows_removed_by_deduplication;

-- Outliers Analysis (watch_duration_minutes)
-- Cell 1: Compute IQR bounds for watch_duration_minutes on watch_history_dedup and report % outliers.
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `netflix.watch_history_dedup`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM `netflix.watch_history_dedup` h
CROSS JOIN bounds b;

-- Cell 2: Create watch_history_robust with watch_duration_minutes_capped capped at P01/P99;
-- return quantile summaries before/after.
CREATE OR REPLACE TABLE `netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(98)] AS p99
  FROM `netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM `netflix.watch_history_dedup` h, q;

-- Outliers Verification
-- Query to show min/median/max before vs after capping
WITH before AS (
  SELECT
    'before_capping' AS stage,
    MIN(watch_duration_minutes) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes) AS max_duration
  FROM `netflix.watch_history_dedup`
),
after AS (
  SELECT
    'after_capping' AS stage,
    MIN(watch_duration_minutes_capped) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes_capped) AS max_duration
  FROM `netflix.watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after;

-- Business Anomaly Flags
-- Cell 1: Compute and summarize flag_binge for sessions > 8 hours in watch_history_robust
-- A session is considered a "binge" if the watch duration is over 8 hours (480 minutes).
SELECT
  COUNTIF(watch_duration_minutes > 480) AS binge_sessions,
  COUNT(*) AS total_sessions,
  ROUND(100*COUNTIF(watch_duration_minutes > 480)/COUNT(*),2) AS pct_binge_sessions
FROM `netflix.watch_history_robust`;

-- Cell 2: Compute and summarize flag_age_extreme if age is <10 or >100 in users.
-- This flags users with potentially extreme or erroneous age values.
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_users,
  COUNT(*) AS total_users,
  ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct_extreme_age_users
FROM `netflix.users`;

-- Cell 3: Compute and summarize flag_duration_anomaly where duration_minutes < 15 or duration_minutes > 480 in movies.
-- This flags movies with unusually short or long durations.
SELECT
  COUNTIF(duration_minutes < 15 OR duration_minutes > 480) AS anomalous_duration_movies,
  COUNT(*) AS total_movies,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_anomalous_duration_movies
FROM `netflix.movies`;

-- Anomaly Flags Verification
-- Generate a single compact summary query that returns two columns per flag: flag_name, pct_of_rows.
SELECT
  'flag_binge' AS flag_name,
  ROUND(100 * COUNTIF(watch_duration_minutes > 480) / COUNT(*), 2) AS pct_of_rows
FROM
  `netflix.watch_history_robust`
UNION ALL
SELECT
  'flag_age_extreme' AS flag_name,
  ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2) AS pct_of_rows
FROM
  `netflix.users`
UNION ALL
SELECT
  'flag_duration_anomaly' AS flag_name,
  ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2) AS pct_of_rows
FROM
  `netflix.movies`;