# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [2]:
from google.colab import auth
import os

# Authenticate to Google Cloud.
# This allows your Colab notebook to access Google Cloud services.
auth.authenticate_user()

# Prompt for the Project ID and set the region.
# Using input() makes the notebook reusable without hardcoding credentials.
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "us-central1"  # You can change this to your preferred region.

# Set the project ID as an environment variable.
# This is a standard way for tools like the gcloud CLI to find the active project.
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID

# Configure the gcloud command-line tool to use your project.
!gcloud config set project $GOOGLE_CLOUD_PROJECT

print(f"Project ID: {PROJECT_ID}")
print(f"Region: {REGION}")

# Done: Auth + Project/Region set

Enter your GCP Project ID: noble-broker-471012-q6
Updated property [core/project].
Project ID: noble-broker-471012-q6
Region: us-central1


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [3]:
# Verification step: Check active project and region
!gcloud config get-value project
import os
print("Project:", PROJECT_ID, "| Region:", REGION)

noble-broker-471012-q6
Project: noble-broker-471012-q6 | Region: us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [4]:
# EXAMPLE (from LLM) — Kaggle setup (commented)
from google.colab import files
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()

import os
os.makedirs('/root/.kaggle', exist_ok=True)
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])
os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only

!kaggle --version

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

In [5]:
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

Strict `0600` permissions are required for API tokens and other credentials as a critical security measure. This permission setting ensures that only the owner of the file (in this case, your user account in the Colab environment) can read and write the file.

**Risks we are avoiding:**
1.  **Unauthorized Access:** If other users or processes on the system had read access, they could steal your API token.
2.  **Impersonation:** A stolen token could be used to act on your behalf, potentially downloading datasets you didn't intend or participating in competitions under your name.
3.  **Resource Misuse:** An attacker could use your credentials to exhaust API rate limits or perform other malicious actions tied to your account.

By enforcing `0600` permissions, the Kaggle CLI client helps prevent accidental or malicious exposure of your secret credentials.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [6]:
# Create the directory to store raw data
!mkdir -p /content/data/raw

# Download the dataset using Kaggle CLI to /content/data
# The -d flag specifies the dataset, and -p specifies the download path
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

# Unzip the downloaded dataset into the raw data directory
# -o flag overwrites files if they exist
!unzip -o /content/data/*.zip -d /content/data/raw

# List all CSV files in the raw data directory with their sizes in a neat table
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 519MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root 

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [7]:
import glob

# Find all CSV files in the raw data directory
csv_files = glob.glob('/content/data/raw/*.csv')

# Assert that there are exactly six CSV files
assert len(csv_files) == 6, f"Expected 6 CSV files, but found {len(csv_files)}"

# Print the names of the files
print("Found 6 CSV files as expected:")
for f in sorted(csv_files):
    print(f.split('/')[-1])

Found 6 CSV files as expected:
movies.csv
recommendation_logs.csv
reviews.csv
search_logs.csv
users.csv
watch_history.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

Keeping a clean file inventory (names, sizes) is useful for several reasons:

*   **Auditing and Verification:** It allows you to quickly verify that the download and unzip processes completed successfully and that you have the expected number of files with reasonable sizes. Any significant deviation can signal a problem early.
*   **Debugging:** If a downstream process fails (e.g., a BigQuery load job), the file inventory is the first place to check. You can confirm the file exists, its name is correct, and its size isn't zero.
*   **Resource Planning:** Knowing file sizes helps in estimating the resources required for subsequent steps, such as GCS storage costs, BigQuery ingest costs, and the memory needed to process the data.

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [8]:
import uuid
import os

# The REGION variable was defined in a previous cell.
# We will generate a unique bucket name to avoid conflicts.
BUCKET_NAME = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = BUCKET_NAME

print(f"Creating bucket: gs://{BUCKET_NAME} in region: {REGION}")
# Use the REGION variable directly in the gcloud command.
!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION

# Upload all CSVs from the raw data directory to a 'netflix' prefix in the bucket.
print(f"\nUploading files to gs://{BUCKET_NAME}/netflix/")
!gcloud storage cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# Staging data in GCS is a best practice for several reasons.
print(f"\nSuccessfully created bucket: {BUCKET_NAME} and uploaded files.")
print("\nBenefits of staging data in GCS:")
print("- **Decoupling & Reusability:** GCS acts as a stable, centralized source of truth. Multiple services (like BigQuery, Dataproc, or Vertex AI) can read from the same data without interfering with each other.")
print("- **Durability & Availability:** GCS is a highly durable and available service, ensuring your raw data is not lost if your Colab runtime disconnects.")
print("- **Efficient Loading:** Loading data into BigQuery from GCS is much faster and more reliable than loading from a local machine.")

# Verify the contents of the bucket to ensure the upload was successful.
print(f"\nVerifying contents of gs://{BUCKET_NAME}/netflix/")
!gcloud storage ls --recursive gs://$BUCKET_NAME

Creating bucket: gs://mgmt467-netflix-3fc7b4d1 in region: us-central1
Creating gs://mgmt467-netflix-3fc7b4d1/...

Uploading files to gs://mgmt467-netflix-3fc7b4d1/netflix/
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-3fc7b4d1/netflix/movies.csv
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-3fc7b4d1/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-3fc7b4d1/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-3fc7b4d1/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-3fc7b4d1/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-3fc7b4d1/netflix/watch_history.csv

Average throughput: 171.6MiB/s

Successfully created bucket: mgmt467-netflix-3fc7b4d1 and uploaded files.

Benefits of staging data in GCS:
- **Decoupling & Reusability:** GCS acts as a stable, central

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [9]:
# List the files in the GCS bucket prefix with human-readable sizes.
!gcloud storage ls --readable-sizes gs://$BUCKET_NAME/netflix/

gs://mgmt467-netflix-3fc7b4d1/netflix/movies.csv
gs://mgmt467-netflix-3fc7b4d1/netflix/recommendation_logs.csv
gs://mgmt467-netflix-3fc7b4d1/netflix/reviews.csv
gs://mgmt467-netflix-3fc7b4d1/netflix/search_logs.csv
gs://mgmt467-netflix-3fc7b4d1/netflix/users.csv
gs://mgmt467-netflix-3fc7b4d1/netflix/watch_history.csv


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

Staging data in Google Cloud Storage (GCS) before loading it into BigQuery offers two key advantages over loading directly from a local Colab environment:

1.  **Durability and Reliability:** GCS is a persistent and highly available storage service. If your Colab notebook disconnects or the runtime is recycled, your local data is lost. By staging the data in GCS, you create a stable, durable source of truth that is decoupled from your Colab session, allowing for reliable and repeatable data loading.

2.  **Performance and Speed:** Loading data into BigQuery from GCS is significantly faster than uploading from a local machine. The transfer occurs over Google's high-speed internal network, bypassing potential bottlenecks like your local internet connection. This makes the ingestion process much more efficient, especially for large datasets.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [10]:
# Cell A: Create (idempotently) dataset `netflix` in US multi-region
DATASET="netflix"
# The `|| echo` part makes this command idempotent.
# If `bq mk` fails (e.g., dataset exists), it will print a message instead of an error.
!bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset '$DATASET' may already exist."

BigQuery error in mk operation: Dataset 'noble-broker-471012-q6:netflix' already
exists.
Dataset '' may already exist.


In [11]:
# Cell B: Load tables from GCS and verify counts
import os
from google.cloud import bigquery

# A dictionary mapping the table names to their corresponding CSV files.
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}

# Retrieve configuration variables set in previous cells.
DATASET = "netflix"
BUCKET_NAME = os.environ.get('BUCKET_NAME')
PROJECT_ID = os.environ.get('GOOGLE_CLOUD_PROJECT')

# Loop through the dictionary to load each CSV file into a BigQuery table.
for table_name, file_name in tables.items():
  source_uri = f"gs://{BUCKET_NAME}/netflix/{file_name}"
  table_ref = f"{PROJECT_ID}:{DATASET}.{table_name}"
  print(f"Loading {table_name} from {source_uri}...")
  # The bq load command ingests data from GCS.
  # --skip_leading_rows=1 tells BQ to ignore the header row in the CSV.
  # --autodetect lets BQ automatically infer the table schema from the data.
  !bq load --project_id={PROJECT_ID} --skip_leading_rows=1 --autodetect --source_format=CSV {table_ref} {source_uri}

print("\n--- Verifying Row Counts ---")

# Initialize the BigQuery client.
client = bigquery.Client(project=PROJECT_ID)

# After loading, loop through the tables again to query their row counts.
# This method uses the Python client library, which is more robust than shell commands for queries.
for table_name in tables.keys():
  table_ref_for_query = f"{PROJECT_ID}.{DATASET}.{table_name}"

  # Construct the query string.
  query = f"SELECT COUNT(*) AS row_count FROM `{table_ref_for_query}`"

  # Execute the query.
  query_job = client.query(query)

  # Fetch the result, which will be an iterator with one row.
  for row in query_job.result():
      print(f"Table: {table_name:<25} Row Count: {row.row_count}")

Loading users from gs://mgmt467-netflix-3fc7b4d1/netflix/users.csv...
Waiting on bqjob_raec275f360d3d_00000199f20c6f64_1 ... (1s) Current status: DONE   
Loading movies from gs://mgmt467-netflix-3fc7b4d1/netflix/movies.csv...
Waiting on bqjob_r1a0113c8608847a_00000199f20c8221_1 ... (1s) Current status: DONE   
Loading watch_history from gs://mgmt467-netflix-3fc7b4d1/netflix/watch_history.csv...
Waiting on bqjob_r5a61043e3ee42452_00000199f20c9497_1 ... (2s) Current status: DONE   
Loading recommendation_logs from gs://mgmt467-netflix-3fc7b4d1/netflix/recommendation_logs.csv...
Waiting on bqjob_r50e9313619a69b5c_00000199f20cab64_1 ... (3s) Current status: DONE   
Loading search_logs from gs://mgmt467-netflix-3fc7b4d1/netflix/search_logs.csv...
Waiting on bqjob_r842a2efdc331f70_00000199f20cc667_1 ... (2s) Current status: DONE   
Loading reviews from gs://mgmt467-netflix-3fc7b4d1/netflix/reviews.csv...
Waiting on bqjob_r3ae339917a6dde67_00000199f20cde0e_1 ... (2s) Current status: DONE   



### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [12]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

# This query uses the `__TABLES__` pseudo-view to get metadata about the tables.
# It's more efficient than querying each table individually.
query = f"""
SELECT
  table_id AS table_name,
  row_count
FROM
  `{project_id}.netflix.__TABLES__`
ORDER BY
  table_id;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results in a formatted table
print(f"Row counts for dataset: {project_id}.netflix")
print("-" * 40)
for row in results:
    print(f"{row.table_name:<25} | {row.row_count:>10,}")
print("-" * 40)

Row counts for dataset: noble-broker-471012-q6.netflix
----------------------------------------
movies                    |      4,160
recommendation_logs       |    208,000
reviews                   |     61,800
search_logs               |    106,000
users                     |     41,200
watch_history             |    420,000
watch_history_dedup       |    100,000
----------------------------------------


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

**When is `autodetect` acceptable? When should you enforce explicit schemas and why?**

`autodetect` is acceptable for initial exploration, ad-hoc analysis, or one-time loads of well-structured data, just like we did here. It's fast and convenient for getting data into BigQuery quickly to start running queries.

However, for production pipelines, you should **always enforce explicit schemas**. Here’s why:

1.  **Data Integrity & Consistency:** An explicit schema ensures that data being loaded conforms to the expected data types (e.g., `INTEGER`, `TIMESTAMP`, `STRING`). This prevents data corruption, such as a date field being misinterpreted as a string.
2.  **Error Prevention:** If the source data changes unexpectedly (e.g., a column is renamed or its data type changes), the load job will fail immediately. This is a good thing—it alerts you to an upstream problem, preventing bad data from silently entering your warehouse.
3.  **Performance & Cost:** Specifying a schema allows BigQuery to optimize storage and queries more effectively from the start. Autodetection has to scan the data to infer types, which can sometimes be less optimal.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [13]:
# EXAMPLE (from LLM) — Missingness profile (commented)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
-- Users: % missing per column
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `{project_id}.netflix.users`
)
SELECT n,
       ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((41200, 0.0, 0.0, 11.93), {'n': 0, 'pct_missing_country': 1, 'pct_missing_subscription_plan': 2, 'pct_missing_age': 3})


In [46]:
# prompt: % plan_tier missing by region ordered descending. Add comments on MAR.

import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
-- Users: % missing plan_tier by region
-- This checks for Missing At Random (MAR) if missingness in plan_tier
-- depends on the region. If so, imputation or modeling needs to account for this.
SELECT
  country,
  ROUND(100 * SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) / COUNT(*), 2) AS pct_missing_plan_tier
FROM
  `{project_id}.netflix.users`
GROUP BY
  country
ORDER BY
  pct_missing_plan_tier DESC;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
print("Percentage of missing subscription_plan by country:")
print("-" * 50)
for row in results:
    print(f"Country: {row.country:<20} | Missing Plan Tier: {row.pct_missing_plan_tier:.2f}%")
print("-" * 50)

Percentage of missing subscription_plan by country:
--------------------------------------------------
Country: USA                  | Missing Plan Tier: 0.00%
Country: Canada               | Missing Plan Tier: 0.00%
--------------------------------------------------


In [40]:
%load_ext bigquery_magics

In [31]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"SELECT h.* FROM `{project_id}.netflix.watch_history` h LIMIT 1"
query_job = client.query(query)
results = query_job.result()

for row in results:
    print(row.keys())

dict_keys(['session_id', 'user_id', 'movie_id', 'watch_date', 'device_type', 'watch_duration_minutes', 'progress_percentage', 'action', 'quality', 'location_country', 'is_download', 'user_rating'])


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [16]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
-- Verification: Print missingness percentages
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `{project_id}.netflix.users`
)
SELECT ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base;"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((0.0, 0.0, 11.93), {'pct_missing_country': 0, 'pct_missing_subscription_plan': 1, 'pct_missing_age': 2})


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

The **`age`** column is the most (and only) missing column, with **11.93%** of its values missing.

Here is a hypothesis about the type of missingness:

*   **MCAR (Missing Completely at Random):** This would mean the reason for the missing age data is unrelated to any other user attribute or the age itself. This is unlikely for demographic data but could occur due to random system errors during data collection.

*   **MAR (Missing at Random):** This would mean the probability of age being missing is related to another observed variable. For example, users in a specific `country` or on a `Basic` `subscription_plan` might be less likely to provide their age due to differences in the sign-up form or regional privacy norms. This is a plausible scenario.

*   **MNAR (Missing Not at Random):** This is the most likely hypothesis. The missingness is related to the value of the `age` column itself. Users at the extreme ends of the age spectrum (e.g., very young or very old) might be more reluctant to disclose their age due to privacy concerns. To confirm this, we would need to find a proxy for age or analyze the data after it has been (if ever) collected.

Given the personal nature of age information, **MNAR is the strongest hypothesis**, as the act of withholding one's age is often directly related to the age itself.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [17]:
# # EXAMPLE (from LLM) — Detect duplicate groups (commented)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{project_id}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('user_03310', 'movie_0640', datetime.date(2024, 9, 8), 'Smart TV', 16), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_00391', 'movie_0893', datetime.date(2024, 8, 26), 'Laptop', 16), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_09815', 'movie_0827', datetime.date(2024, 5, 25), 'Laptop', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_04899', 'movie_0142', datetime.date(2025, 1, 20), 'Desktop', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_01469', 'movie_0237', datetime.date(2025, 1, 17), 'Laptop', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_02652', 'movie_0352', datetime.date(2024, 10, 22), 'Desktop', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_05952', 'movie_0893', datetime.date(2

In [18]:
# # EXAMPLE (from LLM) — Keep-one policy (commented)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `{project_id}.netflix.watch_history` h
)
WHERE rk = 1;
"""

query_job = client.query(query)
results = query_job.result()

print("Deduplicated table watch_history_dedup created successfully.")

Deduplicated table watch_history_dedup created successfully.


### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?


Duplicates fundamentally arise from two main sources: **natural variations** and **system-generated errors**. Natural duplicates, also known as legitimate duplicates, occur when distinct entries genuinely refer to the same real-world entity due to valid, real-world variations, such as different spellings of a name (e.g., "John Smith" vs. "J. Smith"), the integration of data from multiple unharmonized source systems, or time-based updates that fail to consolidate old and new records (e.g., a customer changing their address). Conversely, system-generated, or artificial, duplicates are technical errors introduced by faulty processes, such as flawed Extract, Transform, Load (ETL) scripts that rerun and insert the same batch multiple times, system bugs that cause transactions to be logged repeatedly, or a failure to enforce unique primary keys. Both types corrupt data integrity, but while natural duplicates require complex matching algorithms to resolve, system-generated duplicates usually point to underlying process failures.

The presence of these duplicates severely corrupts both data **labels** and organizational **Key Performance Indicators (KPIs)**. Duplicates introduce significant bias and inaccuracy by creating a false representation of reality. At the level of individual **labels**, duplicates directly inflate counts—for instance, a single customer counted three times makes the total customer base appear artificially larger. This leads to misleading aggregations, where sums (like total revenue) are grossly overstated, and averages (like average order value) are skewed. In the context of machine learning, duplicate records act as synthetic oversampling, biasing training models and causing them to overfit the specific noise of the error. This corruption scales up to affect **KPIs**: metrics like total customer count, market penetration, and conversion rates are often falsely inflated, giving management an overoptimistic and inaccurate view of growth and operational efficiency. Ultimately, corrupted labels and KPIs lead to flawed analytical insights, poor resource allocation, and strategically unsound business decisions.

In [19]:
# Verification: Before/after count query comparing raw vs watch_history_dedup
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT 'raw' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT 'deduplicated' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history_dedup`;
"""

query_job = client.query(query)
results = query_job.result()

print("Row counts before and after deduplication:")
for row in results:
    print(f"Table: {row.table_name:<15} | Rows: {row.row_count}")

Row counts before and after deduplication:
Table: deduplicated    | Rows: 100000
Table: raw             | Rows: 420000


### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [32]:
# Compute IQR bounds for watch_duration_minutes on watch_history_dedup and report % outliers
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `{project_id}.netflix.watch_history_dedup`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM `{project_id}.netflix.watch_history_dedup` h
CROSS JOIN bounds b;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((3433, 100000, 3.43), {'outliers': 0, 'total': 1, 'pct_outliers': 2})


In [21]:
# # EXAMPLE (from LLM) — Winsorize + quantiles (commented)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(98)] AS p99
  FROM `{project_id}.netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM `{project_id}.netflix.watch_history_dedup` h, q;

-- Quantiles before vs after
WITH before AS (
  SELECT 'before' AS which, APPROX_QUANTILES(watch_duration_minutes, 5) AS q
  FROM `{project_id}.netflix.watch_history_dedup`
),
after AS (
  SELECT 'after' AS which, APPROX_QUANTILES(watch_duration_minutes_capped, 5) AS q
  FROM `{project_id}.netflix.watch_history_robust`
)
SELECT * FROM before UNION ALL SELECT * FROM after;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('after', [4.4, 24.6, 41.5, 61.5, 92.0, 203.6]), {'which': 0, 'q': 1})
Row(('before', [0.2, 24.8, 41.9, 61.4, 91.8, 799.3]), {'which': 0, 'q': 1})


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [22]:
# Verification: Show min/median/max before vs after capping
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
WITH before AS (
  SELECT
    'before' AS which,
    MIN(watch_duration_minutes) AS min_val,
    APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_val,
    MAX(watch_duration_minutes) AS max_val
  FROM `{project_id}.netflix.watch_history_dedup`
),
after AS (
  SELECT
    'after' AS which,
    MIN(watch_duration_minutes_capped) AS min_val,
    APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_val,
    MAX(watch_duration_minutes_capped) AS max_val
  FROM `{project_id}.netflix.watch_history_robust`
)
SELECT * FROM before UNION ALL SELECT * FROM after;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('after', 4.4, 51.4, 203.6), {'which': 0, 'min_val': 1, 'median_val': 2, 'max_val': 3})
Row(('before', 0.2, 51.0, 799.3), {'which': 0, 'min_val': 1, 'median_val': 2, 'max_val': 3})


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.


Data capping, or Winsorizing, is a powerful technique for handling outliers, but it becomes actively **harmful** when the extreme values being modified are **genuine, critical data points** rather than simple errors or noise, leading to a serious loss of real business insight by underestimating the impact of rare, high-value events (like a major customer purchase or a sudden system failure) and artificially compressing the data's true variance, which in turn leads to faulty statistical inferences and models that cannot accurately predict worst-case scenarios or peak performance because the high end of the distribution has been manually truncated. The model type that is notably less sensitive to outliers is the **Random Forest** (a type of tree-based ensemble model) because its foundation, the **Decision Tree**, operates using simple splitting rules (e.g., "Is the value greater than X?") rather than distance-based metrics, meaning the exact, extreme magnitude of an outlier doesn't disproportionately affect the splitting logic, and furthermore, the final prediction in a Random Forest is often based on the **median** (or a voting process) of all individual trees, making it inherently more **robust** than models like standard Linear Regression, which use the highly outlier-sensitive mean and minimize the squared error, thereby giving squared-up influence to extreme values.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [23]:
# # EXAMPLE (from LLM) — flag_binge (commented)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(watch_duration_minutes_capped > 8*60) AS sessions_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(watch_duration_minutes_capped > 8*60)/COUNT(*),2) AS pct
FROM `{project_id}.netflix.watch_history_robust`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((0, 100000, 0.0), {'sessions_over_8h': 0, 'total': 1, 'pct': 2})


In [24]:
# # EXAMPLE (from LLM) — flag_age_extreme (commented)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_rows,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct
FROM `{project_id}.netflix.users`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((716, 41200, 1.74), {'extreme_age_rows': 0, 'total': 1, 'pct': 2})


In [25]:
# # EXAMPLE (from LLM) — flag_duration_anomaly (commented)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(duration_minutes < 15) AS titles_under_15m,
  COUNTIF(duration_minutes > 480) AS titles_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_duration_anomaly
FROM `{project_id}.netflix.movies`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((48, 44, 4160, 2.21), {'titles_under_15m': 0, 'titles_over_8h': 1, 'total': 2, 'pct_duration_anomaly': 3})


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

flag_duration_anamoly is the most common. However, when building a machine learning model to predict user behavior (like what they'll watch next or if they will cancel their subscription), the goal is to understand the user. Therefore, features that directly describe the user are often the most powerful.

Here’s a comparison:

flag_age_extreme: This is a direct attribute of the user. It tells us that this person belongs to a specific demographic group (very young or elderly). This is a strong signal because different age groups have vastly different viewing habits, content preferences, and churn risks. A model can learn patterns like "users with flag_age_extreme = true are highly likely to watch animated content."

flag_duration_anomaly: This flag describes the movie, not the user. It tells us a movie is unusually short or long. While a user chooses to watch it, the flag itself is a property of the content. It's less predictive of the user's overall behavior. Knowing a user watched one weirdly long movie doesn't tell us as much about them as knowing their age.

In short, we chose flag_age_extreme because it helps create a clearer profile of the user, which is exactly what a user-centric prediction model needs.

In [26]:
# Verification: Single compact summary query for anomaly flags
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  'flag_binge' AS flag_name,
  ROUND(100*COUNTIF(watch_duration_minutes_capped > 8*60)/COUNT(*),2) AS pct_of_rows
FROM `{project_id}.netflix.watch_history_robust`
UNION ALL
SELECT
  'flag_age_extreme' AS flag_name,
  ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct_of_rows
FROM `{project_id}.netflix.users`
UNION ALL
SELECT
  'flag_duration_anomaly' AS flag_name,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_of_rows
FROM `{project_id}.netflix.movies`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('flag_binge', 0.0), {'flag_name': 0, 'pct_of_rows': 1})
Row(('flag_age_extreme', 1.74), {'flag_name': 0, 'pct_of_rows': 1})
Row(('flag_duration_anomaly', 2.21), {'flag_name': 0, 'pct_of_rows': 1})


In [27]:
# In movies, compute and summarize flag_duration_anomaly where duration_minutes < 15 or > 480.
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(duration_minutes < 15) AS titles_under_15m,
  COUNTIF(duration_minutes > 480) AS titles_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_duration_anomaly
FROM `{project_id}.netflix.movies`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((48, 44, 4160, 2.21), {'titles_under_15m': 0, 'titles_over_8h': 1, 'total': 2, 'pct_duration_anomaly': 3})


## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


### Save & submit checklist:

- [ ] Save this notebook to the team Drive.
- [ ] Export a `.sql` file with your DQ queries and save to repo.
- [ ] Push notebook + SQL to the **team GitHub** with a descriptive commit.
- [ ] Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.

## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)


# Task
Create a project summary markdown cell listing the project ID, region, bucket name, and BigQuery dataset. Then, generate a Python cell that exports all Data Quality (DQ) queries for missingness, duplicates, outliers, and anomaly flags into a single SQL file named "dq_queries.sql".

## Create Project Summary Cell

### Subtask:
Generate a new markdown cell that lists the current PROJECT_ID, REGION, BUCKET_NAME, and the BigQuery dataset name for documentation purposes.


**Reasoning**:
To dynamically create the project summary with live values from the notebook's environment, I will generate a Python code cell. This cell will retrieve the `PROJECT_ID`, `REGION`, and `BUCKET_NAME` variables and then use `IPython.display.Markdown` to render them in a clean, formatted list as requested. This approach correctly uses f-strings as specified and ensures the summary is accurate and reproducible.



In [43]:
import os
from IPython.display import display, Markdown

# Retrieve the project details from environment variables and Python variables
project_id = os.environ.get('GOOGLE_CLOUD_PROJECT', 'PROJECT_ID_NOT_FOUND')
bucket_name = os.environ.get('BUCKET_NAME', 'BUCKET_NAME_NOT_FOUND')
try:
    region = REGION
except NameError:
    region = 'us-central1' # Fallback to the value set in the notebook
dataset = 'netflix'

# Create the markdown content
summary_markdown = f"""
## Project Summary

*   **Project ID:** `{project_id}`
*   **Region:** `{region}`
*   **GCS Bucket:** `{bucket_name}`
*   **BigQuery Dataset:** `{dataset}`
"""

# Display the content as a rendered markdown cell
display(Markdown(summary_markdown))


## Project Summary

*   **Project ID:** `noble-broker-471012-q6`
*   **Region:** `us-central1`
*   **GCS Bucket:** `mgmt467-netflix-3fc7b4d1`
*   **BigQuery Dataset:** `netflix`


## Export DQ Queries to SQL File

### Subtask:
Generate a Python code cell that gathers all the Data Quality (DQ) queries from the notebook (for missingness, duplicates, outliers, and anomaly flags) and saves them into a single file named `dq_queries.sql`.


**Reasoning**:
I will now generate the Python code to collect all the previously used Data Quality queries into a single string, write that string to a `dq_queries.sql` file, and then print the file's contents to verify its creation, as per the detailed instructions.



**Reasoning**:
I will re-generate the Python code cell that combines all the specified Data Quality (DQ) queries into a single f-string, writes this string to a file named `dq_queries.sql`, and then verifies the file's contents by printing them. This directly follows the user's detailed instructions.



In [45]:
import os

# Get project ID from environment variables to make queries runnable
project_id = os.environ.get('GOOGLE_CLOUD_PROJECT', 'your-gcp-project-id')
dataset = 'netflix'

# Combine all Data Quality (DQ) queries into a single f-string
all_dq_queries = f"""
-- ====================================================================
-- MGMT 467: Data Quality (DQ) Queries for Netflix Dataset
-- ====================================================================

-- 5.1 Missingness: Profile missing values in the users table
-- This query calculates the total count and percentage of missing values
-- for key demographic and subscription columns.
WITH base_missingness AS (
  SELECT
    COUNT(*) AS n,
    COUNTIF(country IS NULL) AS miss_country,
    COUNTIF(subscription_plan IS NULL) AS miss_plan,
    COUNTIF(age IS NULL) AS miss_age
  FROM `{project_id}.{dataset}.users`
)
SELECT
  n AS total_rows,
  ROUND(100 * miss_country / n, 2) AS pct_missing_country,
  ROUND(100 * miss_plan / n, 2) AS pct_missing_subscription_plan,
  ROUND(100 * miss_age / n, 2) AS pct_missing_age
FROM base_missingness;

-- 5.2 Duplicates: Detect and remove duplicate watch history records
-- This query identifies duplicate records based on a composite key.
SELECT
  user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{project_id}.{dataset}.watch_history`
GROUP BY 1, 2, 3, 4
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;

-- This query creates a new table with duplicates removed, keeping the most complete record.
CREATE OR REPLACE TABLE `{project_id}.{dataset}.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT
    h.*,
    ROW_NUMBER() OVER (
      PARTITION BY user_id, movie_id, watch_date, device_type
      ORDER BY progress_percentage DESC, watch_duration_minutes DESC
    ) AS rk
  FROM `{project_id}.{dataset}.watch_history` h
)
WHERE rk = 1;

-- 5.3 Outliers: Identify and cap extreme values in watch duration
-- This query calculates outliers based on the 1.5*IQR rule.
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `{project_id}.{dataset}.watch_history_dedup`
),
bounds AS (
  SELECT
    (q3 - q1) AS iqr,
    q1 - 1.5 * (q3 - q1) AS lo,
    q3 + 1.5 * (q3 - q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  ROUND(100 * COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) / COUNT(*), 2) AS pct_outliers
FROM `{project_id}.{dataset}.watch_history_dedup` h CROSS JOIN bounds b;

-- This query creates a robust table by capping (Winsorizing) outliers at the 1st and 99th percentiles.
CREATE OR REPLACE TABLE `{project_id}.{dataset}.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)] AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
  FROM `{project_id}.{dataset}.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM `{project_id}.{dataset}.watch_history_dedup` h, q;

-- 5.4 Business Anomaly Flags
-- Flag 1: Binge watching sessions (> 8 hours)
SELECT
  COUNTIF(watch_duration_minutes_capped > 8 * 60) AS sessions_over_8h,
  ROUND(100 * COUNTIF(watch_duration_minutes_capped > 8 * 60) / COUNT(*), 2) AS pct
FROM `{project_id}.{dataset}.watch_history_robust`;

-- Flag 2: Extreme user ages (< 10 or > 100)
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_rows,
  ROUND(100 * COUNTIF(age < 10 OR age > 100) / COUNT(*), 2) AS pct
FROM `{project_id}.{dataset}.users`;

-- Flag 3: Anomalous movie durations (< 15 min or > 8 hours)
SELECT
  COUNTIF(duration_minutes < 15 OR duration_minutes > 480) AS anomaly_duration_titles,
  ROUND(100 * COUNTIF(duration_minutes < 15 OR duration_minutes > 480) / COUNT(*), 2) AS pct_duration_anomaly
FROM `{project_id}.{dataset}.movies`;
"""

# Write the queries to a .sql file
file_name = 'dq_queries.sql'
with open(file_name, 'w') as f:
    f.write(all_dq_queries)

print(f"Successfully wrote all DQ queries to {file_name}.\n")

# Read the file back and print its contents for verification
print("--- Contents of dq_queries.sql ---")
with open(file_name, 'r') as f:
    print(f.read())

Successfully wrote all DQ queries to dq_queries.sql.

--- Contents of dq_queries.sql ---

-- MGMT 467: Data Quality (DQ) Queries for Netflix Dataset

-- 5.1 Missingness: Profile missing values in the users table
-- This query calculates the total count and percentage of missing values
-- for key demographic and subscription columns.
WITH base_missingness AS (
  SELECT
    COUNT(*) AS n,
    COUNTIF(country IS NULL) AS miss_country,
    COUNTIF(subscription_plan IS NULL) AS miss_plan,
    COUNTIF(age IS NULL) AS miss_age
  FROM `noble-broker-471012-q6.netflix.users`
)
SELECT
  n AS total_rows,
  ROUND(100 * miss_country / n, 2) AS pct_missing_country,
  ROUND(100 * miss_plan / n, 2) AS pct_missing_subscription_plan,
  ROUND(100 * miss_age / n, 2) AS pct_missing_age
FROM base_missingness;

-- 5.2 Duplicates: Detect and remove duplicate watch history records
-- This query identifies duplicate records based on a composite key.
SELECT
  user_id, movie_id, watch_date, device_type, COUNT(*) A