<a href="https://colab.research.google.com/github/bjrodarmel/mgmt467-analytics-portfolio/blob/main/Unit2_Lab1_Finished.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [None]:
# prompt: You are my cloud TA. Generate a single Colab code cell that:
# Authenticates to Google Cloud in Colab,
# Prompts for PROJECT_ID via input() and sets REGION="us-central1" (editable),
# Exports GOOGLE_CLOUD_PROJECT,
# Runs gcloud config set project $GOOGLE_CLOUD_PROJECT,
# Prints both values. Add 2–3 comments explaining what/why. End with a comment: # Done: Auth + Project/Region set.

from google.colab import auth
auth.authenticate_user()

# Prompt for project ID and set region
PROJECT_ID = input("Enter your Google Cloud Project ID: ")
REGION = "us-central1"  # Editable

# Export project ID as an environment variable
import os
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID

# Configure gcloud with the project ID
!gcloud config set project $GOOGLE_CLOUD_PROJECT

# Print the configured project ID and region
print(f"Project ID: {PROJECT_ID}")
print(f"Region: {REGION}")

# Done: Auth + Project/Region set


Enter your Google Cloud Project ID: mgmt-467
Updated property [core/project].
Project ID: mgmt-467
Region: us-central1


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [None]:
# prompt: Generate a short cell that prints the active project using gcloud config get-value project and echoes the REGION you set.

print(f"Active Project: {get_ipython().getoutput('gcloud config get-value project')[0]}")
print(f"Region: {REGION}")

Active Project: mgmt-467
Region: us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

Setting `PROJECT_ID` and `REGION` at the top of the notebook is crucial for several reasons:

1.  **Consistency and Reproducibility:** It ensures that all subsequent operations (like creating GCS buckets, running BigQuery jobs, or deploying resources) are performed within the same project and region. This makes the notebook's execution predictable and reproducible. If different parts of the notebook were to implicitly or explicitly use different project IDs or regions, it could lead to confusion, data being placed in unexpected locations, or errors due to resource availability or permissions.

2.  **Cost Management:** Google Cloud resources are billed based on usage within a specific project. By defining the `PROJECT_ID` upfront, you ensure all costs are attributed to the intended project. Similarly, `REGION` selection can impact costs (e.g., data transfer costs between regions) and latency. Setting it once prevents accidental cross-region operations that might incur higher costs or performance penalties.

3.  **Resource Scoping and Permissions:** Google Cloud IAM (Identity and Access Management) policies are often scoped to projects and resources within specific regions. By setting these values, you ensure that the authenticated user or service account has the necessary permissions to interact with resources in that project and region. If not set, the system might default to a project or region that the user doesn't have access to, leading to permission denied errors.

4.  **Clarity and Readability:** It makes the notebook's intent clear to anyone reading it. It's immediately obvious which project and region the notebook is configured to work with, reducing the cognitive load for collaborators or for your future self.

**What can go wrong if we don’t set `PROJECT_ID` and `REGION`?**

*   **Incorrect Project/Region Usage:** Operations might default to a project or region that is not intended, leading to data being stored or processed in the wrong place. This can cause data silos, compliance issues, or make it difficult to find your data later.
*   **Permission Errors:** If the default project or region is one where your credentials lack sufficient permissions, many operations will fail with authorization errors.
*   **Unexpected Costs:** Operations might inadvertently run in a more expensive region or incur data egress charges if not explicitly controlled.
*   **Data Locality and Latency Issues:** If data is processed in a region far from where it's stored or where users are accessing it, it can lead to significant latency.
*   **Confusion and Debugging Difficulties:** When errors occur, it becomes much harder to debug if you're unsure which project or region the failing operation was targeting.
*   **Inability to Create Resources:** Some resources (like certain BigQuery datasets or GCS buckets) are regional or global. Without a clear project and region, the creation process might fail or create resources in an unintended global scope.

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [None]:
# prompt: Generate a single Colab code cell that:
# Prompts me to upload kaggle.json,
# Saves to ~/.kaggle/kaggle.json with 0600 permissions,
# Prints kaggle --version. Add comments about security and reproducibility.

from google.colab import files
import os

# Prompt the user to upload their Kaggle API credentials file.
# This file contains sensitive information and should be kept secure.
print("Please upload your kaggle.json file")
uploaded = files.upload()

# Create the .kaggle directory if it doesn't exist.
# The Kaggle CLI looks for credentials in this specific folder.
os.makedirs('/root/.kaggle', exist_ok=True)

# Move the uploaded kaggle.json to the .kaggle directory.
!mv kaggle.json /root/.kaggle/

# Set the permissions of the file to 600 (read/write for owner only).
# This is a security measure to protect your API key.
!chmod 600 /root/.kaggle/kaggle.json

# Print the Kaggle version to verify that the CLI is installed and configured correctly.
# This confirms that the authentication setup was successful.
!kaggle --version

Please upload your kaggle.json file


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [None]:
# prompt: Generate a one-liner that runs kaggle --help | head -n 20 to show the CLI is ready.

!kaggle --help | head -n 20


usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

Reflection: Why require strict 0600 permissions on API tokens? What risks are we avoiding?
Requiring strict `0600` permissions on API tokens (like `kaggle.json`) is a critical security measure.
#
**Why `0600` permissions?**
The `0600` permission mode in Unix-like systems means:
- The owner of the file can read (`6`) and write (`0`) to it.
- No one else (group or others) has any permissions (`0`).

**Risks we are avoiding:**
#
1.  **Unauthorized Access and Data Breaches:** If the API token file is readable by other users on the system (e.g., `0644` or `0666`), any user with access to the same machine or environment could potentially read the token. This token often grants access to your accounts on services like Kaggle, allowing them to download your data, impersonate you, or perform actions on your behalf.
#
2.  **Account Compromise:** An API token is essentially a form of authentication. If it falls into the wrong hands, attackers can use it to gain unauthorized access to your Kaggle account, potentially leading to the compromise of your personal information, private datasets, or even the ability to manipulate your account settings.
#
3.  **Data Leakage:** If the token is used to download data, and the token itself is leaked, the data that was accessed using that token could also be considered compromised or at risk.
#
4.  **Malicious Use:** An attacker could use your API token to perform actions on Kaggle that violate their terms of service, potentially putting your account at risk of suspension or leading to other negative consequences.
#
In essence, `0600` permissions ensure that only the owner of the file (which should be you, the user running the notebook) can access the sensitive credentials. This minimizes the attack surface and protects your account and data from unauthorized access and misuse.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [None]:
# prompt: Generate a Colab code cell that:
# Creates /content/data/raw,
# Downloads the dataset to /content/data with Kaggle CLI,
# Unzips into /content/data/raw (overwrite OK),
# Lists all CSVs with sizes in a neat table. Include comments describing each step.

# Create the directory to store raw data
!mkdir -p /content/data/raw

# Download the dataset using Kaggle CLI to /content/data
# The -d flag specifies the dataset, and -p specifies the download path
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

# Unzip the downloaded dataset into the raw data directory
# -o flag overwrites files if they exist
!unzip -o /content/data/*.zip -d /content/data/raw

# List all CSV files in the raw data directory with their sizes in a neat table
!ls -lh /content/data/raw/*.csv


Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 575MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root 

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [None]:
# prompt: Generate a snippet that asserts there are exactly six CSV files and prints their names.

import glob

csv_files = glob.glob('/content/data/raw/*.csv')
assert len(csv_files) == 6, f"Expected 6 CSV files, but found {len(csv_files)}"

print("Found CSV files:")
for f in csv_files:
    print(f)


Found CSV files:
/content/data/raw/watch_history.csv
/content/data/raw/recommendation_logs.csv
/content/data/raw/search_logs.csv
/content/data/raw/users.csv
/content/data/raw/reviews.csv
/content/data/raw/movies.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

Reflection: Why is keeping a clean file inventory (names, sizes) useful downstream?
Keeping a clean file inventory, including file names and their sizes, is crucial for several downstream tasks in a data pipeline:
#
1.  **Data Validation and Integrity Checks:**
     *   **Completeness:** Knowing the expected number of files and their names allows you to quickly verify if all expected data has been downloaded or generated. If a file is missing, it's an immediate red flag.
     *   **Size Verification:** Comparing the downloaded file sizes against expected sizes (or previous runs) can help detect corruption or incomplete downloads. A file that is significantly smaller than expected might indicate an interrupted download or a truncated file.
#
2.  **Pipeline Automation and Reproducibility:**
     *   **Automated Processing:** Scripts can be written to automatically process all files matching a certain pattern (e.g., all `.csv` files in a directory). A consistent inventory ensures these scripts run reliably.
     *   **Reproducibility:** For reproducible research or analysis, it's essential to know exactly which files were used. Documenting the file inventory at each stage of the pipeline allows others (or your future self) to recreate the exact dataset used.
#
 3.  **Resource Management and Monitoring:**
     *   **Storage Planning:** Knowing the total size of the data helps in planning storage requirements and managing costs, especially in cloud environments.
     *   **Performance Monitoring:** Large files can impact processing times. Understanding file sizes helps in estimating processing durations and identifying potential bottlenecks.
#
 4.  **Debugging and Troubleshooting:**
     *   **Error Localization:** If a processing step fails, having a clear inventory of the input files helps pinpoint which file might be causing the issue (e.g., if it's malformed, empty, or unexpectedly large).
     *   **Auditing:** In case of discrepancies or unexpected results, the file inventory serves as an audit trail, showing the state of the data at different points in time.
#
 5.  **Data Understanding and Exploration:**
     *   **Initial Assessment:** A quick glance at file names and sizes can provide initial insights into the structure and scale of the dataset, helping to guide the next steps of exploration and analysis.
#
 In summary, a clean file inventory acts as a foundational element for robust, automated, and understandable data pipelines. It transforms a collection of files into a manageable and verifiable dataset.


## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [None]:
import uuid
import os

# Generate a unique bucket name. Bucket names must be globally unique.
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# The REGION variable was set in a previous cell. We use it to create the bucket
# in the correct location for lower latency and cost.
print(f"Creating bucket: gs://{bucket_name} in region: {REGION}")
!gcloud storage buckets create gs://{bucket_name} --location={REGION}

# Upload all CSVs from the local directory to the GCS bucket.
# Staging data in GCS makes it a reliable and scalable source for BigQuery.
print(f"\nUploading CSV files to gs://{bucket_name}/netflix/")
!gcloud storage cp /content/data/raw/*.csv gs://{bucket_name}/netflix/

# Print the bucket name and explain staging benefits
print(f"\nSuccessfully created bucket: {bucket_name} and uploaded files.")
print("\nBenefits of staging data in GCS:")
print("- **Consistent Source:** GCS provides a stable and versionable location for your data.")
print("- **Scalability:** GCS is highly scalable, handling large datasets easily.")
print("- **Integration:** GCS integrates seamlessly with other Google Cloud services like BigQuery.")
print("- **Cost-Effective:** GCS can be a cost-effective storage solution.")

# Verify that the files have been uploaded to the bucket.
print(f"\nVerifying contents of gs://{bucket_name}/netflix/")
!gcloud storage ls gs://{bucket_name}/netflix/

Creating bucket: gs://mgmt467-netflix-379f8b03 in region: us-central1
Creating gs://mgmt467-netflix-379f8b03/...

Uploading CSV files to gs://mgmt467-netflix-379f8b03/netflix/
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-379f8b03/netflix/movies.csv
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-379f8b03/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-379f8b03/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-379f8b03/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-379f8b03/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-379f8b03/netflix/watch_history.csv

Average throughput: 62.5MiB/s

Successfully created bucket: mgmt467-netflix-379f8b03 and uploaded files.

Benefits of staging data in GCS:
- **Consistent Source:** GCS provides a stable and version

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [None]:
# prompt: Generate a snippet that lists the netflix/ prefix and shows object sizes.

!gcloud storage ls --recursive gs://{os.environ['BUCKET_NAME']}/netflix/


gs://mgmt467-netflix-379f8b03/netflix/:
gs://mgmt467-netflix-379f8b03/netflix/movies.csv
gs://mgmt467-netflix-379f8b03/netflix/recommendation_logs.csv
gs://mgmt467-netflix-379f8b03/netflix/reviews.csv
gs://mgmt467-netflix-379f8b03/netflix/search_logs.csv
gs://mgmt467-netflix-379f8b03/netflix/users.csv
gs://mgmt467-netflix-379f8b03/netflix/watch_history.csv


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

Reflection: Name two benefits of staging in GCS vs loading directly from local Colab.
#
 1.  **Scalability and Durability:** Google Cloud Storage (GCS) is designed for massive scalability and high durability. Unlike the temporary storage available in a Colab environment, GCS can reliably store petabytes of data and is built to withstand hardware failures. This ensures that your data is safe and accessible, even for very large datasets that might exceed Colab's local storage limits or be lost if the Colab session ends unexpectedly.
#
 2.  **Decoupling and Access Control:** Staging data in GCS decouples the storage from the compute environment (Colab). This means that once data is in GCS, it can be accessed by various Google Cloud services (like BigQuery, Dataflow, or Vertex AI) independently of your Colab session. This allows for more robust and flexible data pipelines. Furthermore, GCS provides fine-grained access control, allowing you to manage who can read or write to your data, which is crucial for security and collaboration, something that is less manageable with local Colab files.


## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [None]:
# prompt: Cell A: Create (idempotently) dataset netflix in US multi-region; if it exists, print a friendly message.
DATASET="netflix"
# Attempt to create; ignore if exists
!bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

Dataset 'mgmt-467:netflix' successfully created.


In [None]:
# prompt: Load six CSVs (users, movies, watch_history, recommendation_logs, search_logs, reviews) from GCS
# with autodetect and header skip.
# Run row■count queries to confirm loads. Use big query to help achieve this properly

from google.cloud import bigquery

# Initialize BigQuery client
client = bigquery.Client()

# Define the dataset and table names
dataset_id = "netflix"
tables = [
    "users",
    "movies",
    "watch_history",
    "recommendation_logs",
    "search_logs",
    "reviews",
]

# Construct the GCS URI for the CSV files
gcs_uri_prefix = f"gs://{os.environ['BUCKET_NAME']}/netflix/"

# Load each CSV file into BigQuery
for table_name in tables:
    table_id = f"{dataset_id}.{table_name}"
    source_file = f"{gcs_uri_prefix}{table_name}.csv" # Assuming filenames match table names

    job_config = bigquery.LoadJobConfig(
        skip_leading_rows=1,  # Skip the header row
        autodetect=True,      # Autodetect schema and types
        source_format=bigquery.SourceFormat.CSV,
    )

    print(f"Loading {source_file} into {table_id}...")
    load_job = client.load_table_from_uri(
        source_file, table_id, job_config=job_config
    )
    load_job.result()  # Wait for the job to complete

    print(f"Loaded {load_job.output_rows} rows into {table_id}.")

# Query row counts for each table to verify
print("\nVerifying row counts:")
for table_name in tables:
    table_id = f"{dataset_id}.{table_name}"
    query = f"SELECT COUNT(*) FROM `{table_id}`"
    query_job = client.query(query)
    rows = query_job.result()
    for row in rows:
        print(f"Table '{table_id}': {row[0]} rows")


Loading gs://mgmt467-netflix-379f8b03/netflix/users.csv into netflix.users...
Loaded 10300 rows into netflix.users.
Loading gs://mgmt467-netflix-379f8b03/netflix/movies.csv into netflix.movies...
Loaded 1040 rows into netflix.movies.
Loading gs://mgmt467-netflix-379f8b03/netflix/watch_history.csv into netflix.watch_history...
Loaded 105000 rows into netflix.watch_history.
Loading gs://mgmt467-netflix-379f8b03/netflix/recommendation_logs.csv into netflix.recommendation_logs...
Loaded 52000 rows into netflix.recommendation_logs.
Loading gs://mgmt467-netflix-379f8b03/netflix/search_logs.csv into netflix.search_logs...
Loaded 26500 rows into netflix.search_logs.
Loading gs://mgmt467-netflix-379f8b03/netflix/reviews.csv into netflix.reviews...
Loaded 15450 rows into netflix.reviews.

Verifying row counts:
Table 'netflix.users': 20600 rows
Table 'netflix.movies': 2080 rows
Table 'netflix.watch_history': 210000 rows
Table 'netflix.recommendation_logs': 104000 rows
Table 'netflix.search_logs':

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [None]:
print("\nVerifying row counts:")
for table_name in tables:
    table_id = f"{dataset_id}.{table_name}"
    query = f"SELECT COUNT(*) FROM `{table_id}`"
    query_job = client.query(query)
    rows = query_job.result()
    for row in rows:
        print(f"Table '{table_id}': {row[0]} rows")


Verifying row counts:
Table 'netflix.users': 20600 rows
Table 'netflix.movies': 2080 rows
Table 'netflix.watch_history': 210000 rows
Table 'netflix.recommendation_logs': 104000 rows
Table 'netflix.search_logs': 53000 rows
Table 'netflix.reviews': 30900 rows


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

 **Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?
 `autodetect` is acceptable for:
 1.  **Exploratory Data Analysis (EDA):** When you're quickly exploring a new dataset and don't yet know its structure or data types. It allows for rapid ingestion and initial analysis.
 2.  **Small, Trusted Datasets:** For small, well-understood datasets where you are confident in the data's consistency and format, `autodetect` can save time.
 3.  **Prototyping:** During the initial stages of building a pipeline, `autodetect` can be useful to get data into BigQuery quickly for testing.
#
You should enforce explicit schemas when:
 1.  **Production Pipelines:** In production environments, explicit schemas are crucial for data quality, consistency, and reliability. `autodetect` can be brittle and might infer incorrect data types (e.g., treating a numeric ID as an integer when it should be a string, or misinterpreting dates).
 2.  **Data Governance and Compliance:** Many organizations have strict data governance policies that require defined schemas for data integrity, security, and compliance (e.g., GDPR, HIPAA).
 3.  **Performance Optimization:** Explicitly defining data types and partitioning/clustering can significantly improve query performance and reduce costs in BigQuery. `autodetect` might not choose the optimal configurations.
 4.  **Preventing Data Corruption:** If a CSV file contains unexpected values or formatting, `autodetect` might fail the load job or, worse, infer incorrect types that lead to data corruption or incorrect analysis downstream. An explicit schema provides a clear contract for the data.
 5.  **Schema Evolution Management:** When schemas need to change, having an explicit schema allows for controlled evolution (e.g., adding new nullable columns) rather than relying on `autodetect` to guess the changes, which could lead to unexpected behavior.
#
 **Why enforce explicit schemas?**
 Enforcing explicit schemas provides:
 -   **Data Quality Assurance:** Ensures data conforms to expected types and formats, preventing errors and inconsistencies.
 -   **Predictability:** Guarantees that data is loaded as intended, making downstream processes more reliable.
 -   **Performance:** Allows for optimization through correct data type selection, partitioning, and clustering.
 -   **Cost Control:** Prevents unexpected costs due to inefficient data types or large amounts of scanned data.
 -   **Security and Compliance:** Helps meet regulatory requirements by ensuring data is handled appropriately.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [None]:
# Cell 1: Missingness in users table
# Calculate total rows and percentage of missing values for specified columns.
# This helps identify columns with significant missing data.
print("Calculating missingness in users table...")

from google.cloud import bigquery
client = bigquery.Client()

# Define the SQL query as a Python multi-line string
missingness_query = f"""
SELECT
    COUNT(*) AS total_rows,
    SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS percent_missing_country,
    SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS percent_missing_subscription_plan,
    SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS percent_missing_age
FROM
    `{PROJECT_ID}.netflix.users`
"""

# Execute the query using the BigQuery client library
query_job = client.query(missingness_query)
results = query_job.result()

# Print the results
for row in results:
    print(f"Total Rows: {row.total_rows}")
    print(f"Percent Missing Country: {row.percent_missing_country:.2f}%")
    print(f"Percent Missing Subscription Plan: {row.percent_missing_subscription_plan:.2f}%")
    print(f"Percent Missing Age: {row.percent_missing_age:.2f}%")

Calculating missingness in users table...
Total Rows: 20600
Percent Missing Country: 0.00%
Percent Missing Subscription Plan: 0.00%
Percent Missing Age: 11.93%


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [None]:
# prompt: Generate a query that prints the three missingness percentages from (1), rounded to two decimals.

import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
-- Users: % missing per column
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `{project_id}.netflix.users`
)
SELECT
       ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(f"Percent Missing Country: {row.pct_missing_country}%")
    print(f"Percent Missing Subscription Plan: {row.pct_missing_subscription_plan}%")
    print(f"Percent Missing Age: {row.pct_missing_age}%")

Percent Missing Country: 0.0%
Percent Missing Subscription Plan: 0.0%
Percent Missing Age: 11.93%


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

Based on the previous output, the `age` column has the highest percentage of missing values.

Here's a hypothesis for each column:

*   **`country`**: If the missingness of `country` is independent of other variables in the dataset (e.g., users who don't provide their country are otherwise similar to those who do), it could be **MCAR (Missing Completely At Random)**. However, it's more likely that users in certain regions might be less inclined to share their country, or there might be technical issues in data collection for specific regions, suggesting **MAR (Missing At Random)** or even **MNAR (Missing Not At Random)** if the reason for not providing the country is related to unobserved user characteristics.

*   **`subscription_plan`**: If the missingness of `plan_tier` is related to other observed variables (e.g., users with lower engagement or specific demographics are less likely to have a plan tier recorded), it would be **MAR**. For instance, if users who are on a free trial or have recently churned are less likely to have their `plan_tier` recorded, and these factors are observable elsewhere, it's MAR. If the reason for not having a `plan_tier` recorded is inherently tied to the user's behavior or status in a way that isn't captured by other variables (e.g., they are simply not a paying customer and this status isn't explicitly recorded), it could be **MNAR**.

*   **`age`**: The `age` column often has a high percentage of missing values. This could be **MCAR** if users simply choose not to provide their age, and this choice is unrelated to other factors. However, it's more likely to be **MAR** or **MNAR**. For example, younger users might be more hesitant to share their age than older users, or users who are not interested in age-specific content might not provide it. If the missingness is related to other observable user attributes (like their subscription plan or viewing habits), it's MAR. If the reason for not providing age is intrinsic to the user's profile and not captured by other data points, it's MNAR.

Without further analysis of correlations between missingness and other variables, it's difficult to definitively classify. However, `age` and `subscription_plan` are strong candidates for **MAR** due to the likelihood that their missingness is related to other observable user characteristics or behaviors.


### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [None]:
# prompt: Report duplicate groups on (user_id, movie_id, event_ts, device_type) with counts (top 20).
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{project_id}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('user_03310', 'movie_0640', datetime.date(2024, 9, 8), 'Smart TV', 8), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_00391', 'movie_0893', datetime.date(2024, 8, 26), 'Laptop', 8), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_03043', 'movie_0465', datetime.date(2024, 2, 3), 'Laptop', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_00965', 'movie_0991', datetime.date(2024, 2, 14), 'Desktop', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_08681', 'movie_0332', datetime.date(2024, 6, 13), 'Laptop', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_03021', 'movie_0602', datetime.date(2025, 2, 23), 'Laptop', 6), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_01469', 'movie_0237', datetime.date(2025, 1, 1

In [None]:
# prompt: Create table watch_history_dedup that keeps one row per group (prefer higher progress_ratio, then minutes_watched). Add comments.
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `{project_id}.netflix.watch_history` h
)
WHERE rk = 1;
"""

query_job = client.query(query)
results = query_job.result()

print("Deduplicated table watch_history_dedup created successfully.")

Deduplicated table watch_history_dedup created successfully.


### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [None]:
# prompt: Generate a before/after count query comparing raw vs watch_history_dedup.

import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  (SELECT COUNT(*) FROM `{project_id}.netflix.watch_history`) AS raw_count,
  (SELECT COUNT(*) FROM `{project_id}.netflix.watch_history_dedup`) AS dedup_count;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(f"Raw Count: {row.raw_count}, Deduplicated Count: {row.dedup_count}")

Raw Count: 210000, Deduplicated Count: 100000


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

 **Why duplicates arise:**
#
 *   **Natural Duplicates:** These can occur due to user behavior. For example, a user might accidentally click a "play" button twice, or a network glitch might cause a single interaction to be sent to the server multiple times. In some systems, a single logical event (like watching a movie) might be recorded as multiple distinct events if it spans across different sessions or involves multiple micro-interactions (e.g., buffering events, seeking events).
#
 *   **System-Generated Duplicates:** These are often a result of technical issues or design flaws in the data ingestion or processing pipeline:
     *   **Retries:** If a data ingestion service fails to receive an acknowledgment for a record, it might retry sending the same record, leading to duplicates if the initial send was actually successful.
     *   **Batch Processing Errors:** When data is processed in batches, errors in the batch job logic or failures during the commit phase can result in partial or duplicate writes.
     *   **Distributed Systems:** In distributed systems, coordinating writes across multiple nodes can be complex. Race conditions or inconsistencies in distributed transactions can lead to duplicate entries.
     *   **Data Merging:** When combining data from different sources, if the sources have overlapping records and the merging process isn't designed to handle duplicates, they can be introduced.
#
 **How duplicates corrupt labels and KPIs:**
#
 Duplicates can significantly distort metrics and labels, leading to incorrect insights and flawed decision-making.
#
 *   **Corrupted Labels:**
     *   **Engagement Metrics:** If a user watches a movie twice (or the system records it twice), metrics like "total watch time" or "number of movies watched" will be inflated. This can lead to misinterpreting user engagement levels.
     *   **Conversion Rates:** If a duplicate interaction is counted as a separate event, it can artificially inflate conversion rates or other funnel metrics. For example, if a "sign-up" event is duplicated, the number of sign-ups would appear higher than it actually is.
     *   **User Segmentation:** If duplicate interactions are used to define user segments (e.g., "highly engaged users"), these segments might be based on inflated activity, leading to inaccurate profiling.
#
 *   **Corrupted KPIs (Key Performance Indicators):**
     *   **Revenue and Monetization:** KPIs related to revenue (e.g., average revenue per user, subscription renewals) can be skewed if duplicate transactions or engagement events are counted.
     *   **User Retention:** If duplicate "active user" events are counted, retention rates might appear higher than they are, masking potential churn issues.
     *   **Feature Usage:** KPIs measuring the usage of specific features can be inflated, leading to incorrect assessments of feature adoption and success.
     *   **Performance Benchmarking:** When comparing performance over time or against benchmarks, duplicates can make current performance look better or worse than it truly is, hindering accurate performance evaluation.
#
 In essence, duplicates introduce noise into the data, making it unreliable for analysis. This can lead to over- or under-estimation of user behavior, incorrect resource allocation, and flawed strategic decisions based on misleading metrics. Therefore, identifying and handling duplicates is a critical step in data quality assurance.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [None]:
# prompt: Compute IQR bounds for watch_duration_minutes on watch_history_dedup and report % outliers.

import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
WITH
  quantiles AS (
    SELECT
      APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
      APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
    FROM
      `{project_id}.netflix.watch_history_dedup`
  ),
  bounds AS (
    SELECT
      q1,
      q3,
      q1 - 1.5 * (q3 - q1) AS lower_bound,
      q3 + 1.5 * (q3 - q1) AS upper_bound
    FROM
      quantiles
  )
SELECT
  COUNT(*) AS total_rows,
  SUM(CASE WHEN watch_duration_minutes < b.lower_bound OR watch_duration_minutes > b.upper_bound THEN 1 ELSE 0 END) AS outlier_count,
  SUM(CASE WHEN watch_duration_minutes < b.lower_bound OR watch_duration_minutes > b.upper_bound THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS outlier_percentage
FROM
  `{project_id}.netflix.watch_history_dedup`, bounds b;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(f"Total Rows: {row.total_rows}")
    print(f"Outlier Count: {row.outlier_count}")
    print(f"Outlier Percentage: {row.outlier_percentage:.2f}%")

Total Rows: 100000
Outlier Count: 3508
Outlier Percentage: 3.51%


In [None]:
# prompt: Create watch_history_robust with minutes_watched_capped capped at P01/P99; return quantile summaries before/after.

import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

# Get the P01 and P99 values from the original deduped table first
quantile_values_query = f"""
SELECT
  APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)] AS p01,
  APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(99)] AS p99
FROM
  `{project_id}.netflix.watch_history_dedup`
"""
query_job_quantiles = client.query(quantile_values_query)
quantile_results = query_job_quantiles.result()

p01_val = None
p99_val = None
for row in quantile_results:
    p01_val = row.p01
    p99_val = row.p99
    print(f"Quantile P1 (before capping): {p01_val}")
    print(f"Quantile P99 (before capping): {p99_val}")

# Now, create the robust table with capped minutes_watched
print("\nCreating watch_history_robust table with capped minutes_watched...")
query_create_robust = f"""
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_robust` AS
SELECT
  *, -- Select all original columns
  -- Cap watch_duration_minutes at P01 and P99 values obtained above
  CASE
    WHEN watch_duration_minutes < {p01_val} THEN {p01_val}
    WHEN watch_duration_minutes > {p99_val} THEN {p99_val}
    ELSE watch_duration_minutes
  END AS watch_duration_minutes_capped
FROM
  `{project_id}.netflix.watch_history_dedup`;
"""

query_job_create = client.query(query_create_robust)
query_job_create.result() # Wait for the job to complete
print("Table watch_history_robust created successfully.")

# Finally, get the quantile summaries after capping
print("\nQuantile summaries AFTER capping minutes_watched:")
query_after = f"""
SELECT
  APPROX_QUANTILES(watch_duration_minutes_capped, 100)[OFFSET(1)] AS p1_capped,
  APPROX_QUANTILES(watch_duration_minutes_capped, 100)[OFFSET(99)] AS p99_capped
FROM
  `{project_id}.netflix.watch_history_robust`
"""
query_job_after = client.query(query_after)
results_after = query_job_after.result()
for row in results_after:
    print(f"P1 Capped (after capping): {row.p1_capped}")
    print(f"P99 Capped (after capping): {row.p99_capped}")


Quantile P1 (before capping): 4.4
Quantile P99 (before capping): 366.0

Creating watch_history_robust table with capped minutes_watched...
Table watch_history_robust created successfully.

Quantile summaries AFTER capping minutes_watched:
P1 Capped (after capping): 4.5
P99 Capped (after capping): 358.1


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [None]:
# prompt: Generate a query that shows min/median/max before vs after capping.

import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  'Before Capping' AS stage,
  MIN(watch_duration_minutes) AS min_minutes,
  APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_minutes,
  MAX(watch_duration_minutes) AS max_minutes
FROM
  `{project_id}.netflix.watch_history_dedup`

UNION ALL

SELECT
  'After Capping' AS stage,
  MIN(watch_duration_minutes_capped) AS min_minutes,
  APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_minutes,
  MAX(watch_duration_minutes_capped) AS max_minutes
FROM
  `{project_id}.netflix.watch_history_robust`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(f"Stage: {row.stage}, Min Minutes: {row.min_minutes}, Median Minutes: {row.median_minutes}, Max Minutes: {row.max_minutes}")

Stage: Before Capping, Min Minutes: 0.2, Median Minutes: 51.2, Max Minutes: 799.3
Stage: After Capping, Min Minutes: 4.4, Median Minutes: 51.4, Max Minutes: 366.0


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

**When capping might be harmful:**
#
 Capping (winsorizing) can be harmful if the extreme values, while rare, contain genuinely important information or represent a distinct, valid phenomenon that you don't want to obscure. For example:
#
 1.  **Fraud Detection:** In fraud detection, extreme values (e.g., unusually large transaction amounts) are often the very signals you are trying to detect. Capping these values would remove the evidence of fraud, making the model less effective at identifying fraudulent activities.
 2.  **Rare but Significant Events:** If extreme values represent rare but critical events (e.g., a sudden surge in demand for a product, a major system failure), capping them might mask these important signals, leading to a failure to recognize or respond to these events.
 3.  **Misinterpretation of Data Distribution:** If the extreme values are not truly outliers but represent the upper or lower bounds of a legitimate, albeit skewed, distribution, capping them can distort the true nature of the data and lead to incorrect conclusions about the population.
 4.  **Loss of Information for Specific Analyses:** If the goal is to understand the full range of observed behavior, including the extremes, capping removes this information. For instance, if you're analyzing the impact of very long viewing sessions on user satisfaction, capping these sessions would prevent you from studying that specific relationship.
#
 **Model type less sensitive to outliers and why:**
#
 **Tree-based models**, such as Decision Trees, Random Forests, and Gradient Boosting Machines (like XGBoost or LightGBM), are generally less sensitive to outliers compared to models that rely on distance calculations or assume a specific data distribution (like linear regression or SVMs).
#
 **Why they are less sensitive:**
#
 *   **Splitting Mechanism:** These models work by recursively partitioning the data based on feature values. A split is determined by finding a threshold that best separates the data according to the target variable. Outliers, being extreme values, typically fall into a single partition. While they might influence the exact position of a split, they don't disproportionately affect the overall structure of the tree as much as they would in models that use means, variances, or distances.
 *   **No Assumption of Linearity or Distribution:** Unlike linear models that assume a linear relationship between features and the target and are heavily influenced by the mean and variance, tree-based models make no such assumptions. They can capture complex, non-linear relationships and are robust to variations in data distribution.
 *   **Focus on Relative Order:** The splits in a tree are based on the relative order of data points, not their absolute magnitude. An outlier might be far from the other data points, but its position relative to other points within its partition doesn't drastically alter the splitting criteria for other parts of the tree.
#
 While tree-based models are more robust, extremely influential outliers can still have some impact. However, compared to models like linear regression, they are significantly less affected.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [None]:
# prompt: In watch_history_robust, compute and summarize flag_binge for sessions > 8 hours.

# Note: This cell initially tried to flag extreme ages from 'age_band', which does not exist.
# Correcting to flag extreme ages directly from the 'age' column in the users table.
# This helps identify potential data entry errors or unusual age distributions.

# Re-initializing BigQuery client for clarity in this cell
from google.cloud import bigquery
client = bigquery.Client()
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

print("Summarizing flag_binge for watch_history_robust...")
query_binge = f"""
SELECT
  COUNT(*) AS total_sessions,
  SUM(CASE WHEN watch_duration_minutes_capped > (8 * 60) THEN 1 ELSE 0 END) AS binge_sessions_count,
  SUM(CASE WHEN watch_duration_minutes_capped > (8 * 60) THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS binge_sessions_percentage
FROM
  `{project_id}.netflix.watch_history_robust`
"""

query_job_binge = client.query(query_binge)
results_binge = query_job_binge.result()

for row in results_binge:
    print(f"Total Sessions: {row.total_sessions}")
    print(f"Binge Sessions Count (>8 hours): {row.binge_sessions_count}")
    print(f"Binge Sessions Percentage: {row.binge_sessions_percentage:.2f}%")

print("\nSummarizing flag_age_extreme for users...")
# Compute and summarize flag_age_extreme if age is <10 or >100
query_age_extreme = f"""
SELECT
  COUNT(*) AS total_users,
  SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) AS extreme_age_count,
  SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS extreme_age_percentage
FROM
  `{project_id}.netflix.users`
WHERE age IS NOT NULL
"""

query_job_age_extreme = client.query(query_age_extreme)
results_age_extreme = query_job_age_extreme.result()

for row in results_age_extreme:
    print(f"Total Users (with age data): {row.total_users}")
    print(f"Extreme Age Count (<10 or >100): {row.extreme_age_count}")
    print(f"Extreme Age Percentage: {row.extreme_age_percentage:.2f}%")

print("\nSummarizing flag_duration_anomaly for movies...")
# Compute and summarize flag_duration_anomaly where duration_min < 15 or > 480
query_duration_anomaly = f"""
SELECT
  COUNT(*) AS total_movies,
  SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) AS duration_anomaly_count,
  SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS duration_anomaly_percentage
FROM
  `{project_id}.netflix.movies`
WHERE duration_minutes IS NOT NULL
"""

query_job_duration_anomaly = client.query(query_duration_anomaly)
results_duration_anomaly = query_job_duration_anomaly.result()

for row in results_duration_anomaly:
    print(f"Total Movies (with duration data): {row.total_movies}")
    print(f"Duration Anomaly Count (<15 or >480 minutes): {row.duration_anomaly_count}")
    print(f"Duration Anomaly Percentage: {row.duration_anomaly_percentage:.2f}%")


Summarizing flag_binge for watch_history_robust...
Total Sessions: 100000
Binge Sessions Count (>8 hours): 0
Binge Sessions Percentage: 0.00%

Summarizing flag_age_extreme for users...
Total Users (with age data): 18142
Extreme Age Count (<10 or >100): 358
Extreme Age Percentage: 1.97%

Summarizing flag_duration_anomaly for movies...
Total Movies (with duration data): 2080
Duration Anomaly Count (<15 or >480 minutes): 46
Duration Anomaly Percentage: 2.21%


In [None]:
# prompt: In users, compute and summarize flag_age_extreme if age can be parsed from age_band (<10 or >100).

import os
from google.cloud import bigquery

client = bigquery.Client()
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

print("Summarizing flag_age_extreme for users...")
# Compute and summarize flag_age_extreme if age is <10 or >100
query_age_extreme = f"""
SELECT
  COUNT(*) AS total_users,
  SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) AS extreme_age_count,
  SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS extreme_age_percentage
FROM
  `{project_id}.netflix.users`
WHERE age IS NOT NULL
"""

query_job_age_extreme = client.query(query_age_extreme)
results_age_extreme = query_job_age_extreme.result()

for row in results_age_extreme:
    print(f"Total Users (with age data): {row.total_users}")
    print(f"Extreme Age Count (<10 or >100): {row.extreme_age_count}")
    print(f"Extreme Age Percentage: {row.extreme_age_percentage:.2f}%")


Summarizing flag_age_extreme for users...
Total Users (with age data): 18142
Extreme Age Count (<10 or >100): 358
Extreme Age Percentage: 1.97%


In [None]:
# prompt: In movies, compute and summarize flag_duration_anomaly where duration_min < 15 or > 480 (if exists). Each cell should output count and percentage and include 1–2 comments.

print("\nSummarizing flag_duration_anomaly for movies...")
# Compute and summarize flag_duration_anomaly where duration_min < 15 or > 480
query_duration_anomaly = f"""
SELECT
  COUNT(*) AS total_movies,
  SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) AS duration_anomaly_count,
  SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS duration_anomaly_percentage
FROM
  `{project_id}.netflix.movies`
WHERE duration_minutes IS NOT NULL
"""

query_job_duration_anomaly = client.query(query_duration_anomaly)
results_duration_anomaly = query_job_duration_anomaly.result()

for row in results_duration_anomaly:
    print(f"Total Movies (with duration data): {row.total_movies}")
    print(f"Duration Anomaly Count (<15 or >480 minutes): {row.duration_anomaly_count}")
    print(f"Duration Anomaly Percentage: {row.duration_anomaly_percentage:.2f}%")



Summarizing flag_duration_anomaly for movies...
Total Movies (with duration data): 2080
Duration Anomaly Count (<15 or >480 minutes): 46
Duration Anomaly Percentage: 2.21%


### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [None]:
# prompt: Generate a single compact summary query that returns two columns per flag: flag_name, pct_of_rows.

import os
from google.cloud import bigquery

client = bigquery.Client()
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

query = f"""
WITH
  binge_flags AS (
    SELECT
      'flag_binge' AS flag_name,
      SUM(CASE WHEN watch_duration_minutes_capped > (8 * 60) THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS pct_of_rows
    FROM
      `{project_id}.netflix.watch_history_robust`
  ),
  age_flags AS (
    SELECT
      'flag_age_extreme' AS flag_name,
      SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS pct_of_rows
    FROM
      `{project_id}.netflix.users`
    WHERE age IS NOT NULL
  ),
  duration_flags AS (
    SELECT
      'flag_duration_anomaly' AS flag_name,
      SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS pct_of_rows
    FROM
      `{project_id}.netflix.movies`
    WHERE duration_minutes IS NOT NULL
  )
SELECT * FROM binge_flags
UNION ALL
SELECT * FROM age_flags
UNION ALL
SELECT * FROM duration_flags;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(f"Flag Name: {row.flag_name}, Percentage of Rows: {row.pct_of_rows:.2f}%")


Flag Name: flag_binge, Percentage of Rows: 0.00%
Flag Name: flag_age_extreme, Percentage of Rows: 1.97%
Flag Name: flag_duration_anomaly, Percentage of Rows: 2.21%


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

 To determine which anomaly flag is most common, we need to look at the percentages calculated in the previous verification prompt. Assuming the output showed:
#
 *   `flag_binge`: X%
 *   `flag_age_extreme`: Y%
 *   `flag_duration_anomaly`: Z%
#
 We would identify the flag with the highest percentage as the most common.
#
 **Which flag to keep as a feature and why:**
#
 The choice of which flag to keep as a feature depends heavily on the downstream modeling task and the business objective. However, here's a rationale for each:
#
 1.  **`flag_binge`**:
     *   **Why keep:** This flag indicates users who exhibit a specific, potentially high-engagement behavior (watching for over 8 hours). This could be a strong indicator of user loyalty, addiction to content, or a specific viewing pattern that might be predictive of subscription renewal, churn (if binge-watching leads to burnout), or interest in specific types of long-form content.
     *   **Use case:** Useful for predicting subscription retention, recommending binge-worthy content, or identifying power users.
#
 2.  **`flag_age_extreme`**:
     *   **Why keep:** This flag primarily serves as a data quality indicator. Extreme ages (e.g., <10 or >100) are often data entry errors or represent a very small, potentially unrepresentative, segment of the user base. While it might be useful to flag these for data cleaning or to understand data quality issues, it's less likely to be a direct predictor of user behavior unless there's a specific hypothesis that these extreme (and likely erroneous) ages correlate with certain behaviors.
     *   **Use case:** Primarily for data cleaning and outlier detection in user demographics. Could be used as a feature if the hypothesis is that users who enter nonsensical ages have different engagement patterns (e.g., bot accounts, or users intentionally trying to bypass age restrictions).
#
 3.  **`flag_duration_anomaly`**:
     *   **Why keep:** This flag identifies movies with unusually short (less than 15 minutes) or long (more than 480 minutes) durations.
         *   **Short durations:** Might indicate trailers, short films, or incorrectly recorded data.
         *   **Long durations:** Might indicate documentaries, special features, or very long films.
     *   **Use case:** Useful for understanding content characteristics. For recommendation systems, knowing if a movie is a short film versus a feature-length epic can be crucial for user satisfaction. It can also help identify content that might require different user engagement strategies or marketing approaches.
#
 **Recommendation:**
#
 *   **`flag_binge`** is likely the most valuable feature for predicting user behavior and engagement, as it directly captures a significant interaction pattern.
 *   **`flag_duration_anomaly`** is also valuable, especially for content-based recommendations or understanding content types.
 *   **`flag_age_extreme`** is more of a data quality flag. It might be kept if there's a strong hypothesis that these data errors correlate with specific user behaviors, but it's often better addressed through data cleaning first.
#
 For a general-purpose model predicting user engagement or churn, `flag_binge` would be a strong candidate to include as a feature.

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


In [None]:
# Export all Data Quality SQL queries to a .sql file for reproducibility.
import os

project_id = os.environ['GOOGLE_CLOUD_PROJECT']

sql_content = f"""
-- Data Quality (DQ) Queries for Netflix Dataset
-- Project ID: {project_id}

-- 5.1 Missingness (users) - Query 1
-- Calculate total rows and percentage of missing values for country, subscription_plan, and age.
SELECT
    COUNT(*) AS total_rows,
    SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS percent_missing_country,
    SUM(CASE WHEN subscription_plan IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS percent_missing_subscription_plan,
    SUM(CASE WHEN age IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS percent_missing_age
FROM
    `{project_id}.netflix.users`;

-- 5.1 Missingness (users) - Verification Query (Missingness percentages rounded)
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `{project_id}.netflix.users`
)
SELECT
       ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base;

-- 5.2 Duplicates (watch_history) - Query 1 (Report duplicate groups)
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{project_id}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;

-- 5.2 Duplicates (watch_history) - Query 2 (Create deduplicated table)
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `{project_id}.netflix.watch_history` h
)
WHERE rk = 1;

-- 5.2 Duplicates (watch_history) - Verification Query (Before/after count)
SELECT
  (SELECT COUNT(*) FROM `{project_id}.netflix.watch_history`) AS raw_count,
  (SELECT COUNT(*) FROM `{project_id}.netflix.watch_history_dedup`) AS dedup_count;

-- 5.3 Outliers (minutes_watched) - Query 1 (Compute IQR bounds and % outliers)
WITH
  quantiles AS (
    SELECT
      APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
      APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
    FROM
      `{project_id}.netflix.watch_history_dedup`
  ),
  bounds AS (
    SELECT
      q1,
      q3,
      q1 - 1.5 * (q3 - q1) AS lower_bound,
      q3 + 1.5 * (q3 - q1) AS upper_bound
    FROM
      quantiles
  )
SELECT
  COUNT(*) AS total_rows,
  SUM(CASE WHEN watch_duration_minutes < b.lower_bound OR watch_duration_minutes > b.upper_bound THEN 1 ELSE 0 END) AS outlier_count,
  SUM(CASE WHEN watch_duration_minutes < b.lower_bound OR watch_duration_minutes > b.upper_bound THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS outlier_percentage
FROM
  `{project_id}.netflix.watch_history_dedup`, bounds b;

-- 5.3 Outliers (minutes_watched) - Query 2 (Create robust table with capped values)
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_robust` AS
SELECT
  *, -- Select all original columns
  -- Cap watch_duration_minutes at P01 and P99 values (hardcoded from previous execution for example)
  CASE
    WHEN watch_duration_minutes < 4.4 THEN 4.4 -- Replace with actual P01_val
    WHEN watch_duration_minutes > 366.0 THEN 366.0 -- Replace with actual P99_val
    ELSE watch_duration_minutes
  END AS watch_duration_minutes_capped
FROM
  `{project_id}.netflix.watch_history_dedup`;

-- 5.3 Outliers (minutes_watched) - Verification Query (Min/Median/Max before vs after capping)
SELECT
  'Before Capping' AS stage,
  MIN(watch_duration_minutes) AS min_minutes,
  APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_minutes,
  MAX(watch_duration_minutes) AS max_minutes
FROM
  `{project_id}.netflix.watch_history_dedup`

UNION ALL

SELECT
  'After Capping' AS stage,
  MIN(watch_duration_minutes_capped) AS min_minutes,
  APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_minutes,
  MAX(watch_duration_minutes_capped) AS max_minutes
FROM
  `{project_id}.netflix.watch_history_robust`;

-- 5.4 Business anomaly flags - Query 1 (Summarize flag_binge)
SELECT
  COUNT(*) AS total_sessions,
  SUM(CASE WHEN watch_duration_minutes_capped > (8 * 60) THEN 1 ELSE 0 END) AS binge_sessions_count,
  SUM(CASE WHEN watch_duration_minutes_capped > (8 * 60) THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS binge_sessions_percentage
FROM
  `{project_id}.netflix.watch_history_robust`;

-- 5.4 Business anomaly flags - Query 2 (Summarize flag_age_extreme)
SELECT
  COUNT(*) AS total_users,
  SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) AS extreme_age_count,
  SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS extreme_age_percentage
FROM
  `{project_id}.netflix.users`
WHERE age IS NOT NULL;

-- 5.4 Business anomaly flags - Query 3 (Summarize flag_duration_anomaly)
SELECT
  COUNT(*) AS total_movies,
  SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) AS duration_anomaly_count,
  SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS duration_anomaly_percentage
FROM
  `{project_id}.netflix.movies`
WHERE duration_minutes IS NOT NULL;

-- 5.4 Business anomaly flags - Verification Query (Compact summary)
WITH
  binge_flags AS (
    SELECT
      'flag_binge' AS flag_name,
      SUM(CASE WHEN watch_duration_minutes_capped > (8 * 60) THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS pct_of_rows
    FROM
      `{project_id}.netflix.watch_history_robust`
  ),
  age_flags AS (
    SELECT
      'flag_age_extreme' AS flag_name,
      SUM(CASE WHEN age < 10 OR age > 100 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS pct_of_rows
    FROM
      `{project_id}.netflix.users`
    WHERE age IS NOT NULL
  ),
  duration_flags AS (
    SELECT
      'flag_duration_anomaly' AS flag_name,
      SUM(CASE WHEN duration_minutes < 15 OR duration_minutes > 480 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS pct_of_rows
    FROM
      `{project_id}.netflix.movies`
    WHERE duration_minutes IS NOT NULL
  )
SELECT * FROM binge_flags
UNION ALL
SELECT * FROM age_flags
UNION ALL
SELECT * FROM duration_flags;
"""

# Write the SQL content to a file
with open('dq_queries.sql', 'w') as f:
    f.write(sql_content)

print("All DQ queries have been exported to dq_queries.sql")

All DQ queries have been exported to dq_queries.sql


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)
