<a href="https://colab.research.google.com/github/bigDataNCloud/classResources/blob/main/Lab2_custom_dataflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Lab 2: Building a Custom Dataflow Pipeline with Apache Beam**

Objective: This notebook provides a complete, end-to-end guide for building and launching a custom data pipeline using the Apache Beam SDK. The pipeline will read a CSV file of historical stock data from Google Cloud Storage (GCS), perform necessary transformations (like reformatting dates and cleaning data), and load the processed data into a new table in Google BigQuery. This lab simulates a real-world data engineering task and teaches the fundamentals of creating robust, serverless ETL (Extract, Transform, Load) jobs on Google Cloud Platform.

# Cell 1: Install Required Libraries and Resolve Dependencies

Purpose: Before we can build our pipeline, we need to install the necessary Python libraries. We will install apache-beam with the [gcp] extra, which includes all the components needed to run our pipeline on the Google Cloud Dataflow service.

Important Note on Dependencies: Google Colab comes with many pre-installed packages. Sometimes, these can conflict with the specific versions required by new libraries. In this case, apache-beam requires a newer version of a package called dill. To solve this, we will install both apache-beam and the required version of dill in a single command, which helps the package manager resolve the conflict correctly.

Action Required:

Run this cell to install the libraries.

After the installation completes, you must restart the Colab runtime. This ensures that the newly installed package versions are loaded correctly. Go to the menu and select Runtime > Restart runtime.

In [2]:
# --- Install Apache Beam and its dependencies in a single command ---
# This command installs the Apache Beam SDK with Google Cloud Platform support
# and simultaneously ensures that the 'dill' package is at version 0.3.7 or newer,
# which resolves a common dependency conflict in the Colab environment.

!pip install --upgrade pip
!pip install --upgrade dill
!pip install --upgrade google-cloud-bigquery
!pip install --quiet apache-beam[gcp]

Collecting dill
  Using cached dill-0.4.0-py3-none-any.whl.metadata (10 kB)
Using cached dill-0.4.0-py3-none-any.whl (119 kB)
Installing collected packages: dill
  Attempting uninstall: dill
    Found existing installation: dill 0.3.1.1
    Uninstalling dill-0.3.1.1:
      Successfully uninstalled dill-0.3.1.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
apache-beam 2.66.0 requires dill<0.3.2,>=0.3.1.1, but you have dill 0.4.0 which is incompatible.
datasets 2.14.4 requires dill<0.3.8,>=0.3.0, but you have dill 0.4.0 which is incompatible.[0m[31m
[0mSuccessfully installed dill-0.4.0
Collecting pip
  Downloading pip-25.1.1-py3-none-any.whl.metadata (3.6 kB)
Downloading pip-25.1.1-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m20.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected pack

# Cell 2: Authenticate and Configure Project Variables

Purpose: This cell handles authentication and sets up all the necessary configuration variables for our project.

Authentication: The auth.authenticate_user() command will prompt you to log in with your Google account. This grants your Colab notebook the necessary permissions to interact with your Google Cloud project's resources (like GCS, BigQuery, and Dataflow).

Configuration: We define all our project-specific names and settings in one place. This makes the code cleaner and easier to adapt for future projects. Each variable is explained in the comments.

In [1]:
from google.colab import auth
import os

# Authenticate your Google account
auth.authenticate_user()

# --- CONFIGURATION ---
PROJECT_ID = "prof-big-data"
REGION = "us-central1"

# Use the buckets and dataset you created previously
GCS_INPUT_BUCKET = "nvdia_2025"
GCS_FILE_NAME = "NVDA_HistoricalData.csv"
GCS_TEMP_BUCKET = f"{PROJECT_ID}-df-temp-storage" # The temp bucket from before
BIGQUERY_DATASET = "nvidia_test" # The dataset you created before
BIGQUERY_TABLE = "nvidia_prices_from_beam" # A new table name

# Define the full paths for our resources
input_file = f"gs://{GCS_INPUT_BUCKET}/{GCS_FILE_NAME}"
temp_location = f"gs://{GCS_TEMP_BUCKET}/temp"
table_spec = f"{PROJECT_ID}:{BIGQUERY_DATASET}.{BIGQUERY_TABLE}"

# Define the BigQuery table schema as a string
# This matches the header of your CSV file
table_schema = "Date:DATE,Close_Last:FLOAT,Volume:INTEGER,Open:FLOAT,High:FLOAT,Low:FLOAT"

print("Configuration is complete.")

Configuration is complete.


# Cell 3: Write the Custom Apache Beam Pipeline Script

Purpose: This is the core of our lab. The %%writefile command is a special Colab "magic" command that writes the entire content of the cell to a Python file named beam_pipeline.py. This script contains the logic for our data pipeline.

Key Concepts in the Script:

parse_csv function: This is our custom transformation logic. For each line read from the CSV, it performs several crucial steps:

Skips the Header: It checks for and ignores the header row.

Parses the Line: It splits the comma-separated line into individual fields.

Cleans and Formats Data: It removes dollar signs from financial data and, most importantly, reformats the date from MM/DD/YYYY to YYYY-MM-DD, which is the required format for BigQuery's DATE data type.

Error Handling: A try-except block gracefully handles any rows that are malformed, preventing a single bad row from crashing the entire pipeline.

Returns a Dictionary: It structures the cleaned data into a Python dictionary, where the keys match our BigQuery table's column names.

run() function: This function defines and executes the pipeline.

PipelineOptions: Configures the pipeline to run on the Dataflow service (DataflowRunner) and specifies our project, job name, region, and temporary storage locations.

beam.Pipeline: This is the main pipeline object. The | (pipe) operator is used to chain together different processing steps.

ReadFromText: Reads the source CSV file from GCS line by line.

FlatMap(parse_csv): Applies our custom parse_csv function to every line.

WriteToBigQuery: Writes the final, processed data to our specified BigQuery table. It also defines the table schema and sets rules for creating the table if it doesn't exist (CREATE_IF_NEEDED) and for overwriting existing data (WRITE_TRUNCATE).

In [4]:
%%writefile beam_pipeline.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import re
from datetime import datetime

# --- CORRECTED TRANSFORMATION FUNCTION ---
def parse_csv(line):
    header = "Date,Close/Last,Volume,Open,High,Low"
    if line.strip() == header:
        return [] # Skip the header row

    fields = re.split(r',(?=(?:[^"]*"[^"]*")*[^"]*$)', line)

    try:
        # --- FIX: Reformat the date string ---
        # Original date format is MM/DD/YYYY
        date_obj = datetime.strptime(fields[0], '%m/%d/%Y')
        # New format is YYYY-MM-DD, which BigQuery requires for DATE types
        formatted_date = date_obj.strftime('%Y-%m-%d')

        # Create a dictionary for each row
        return [{
            "Date": formatted_date, # Use the correctly formatted date
            "Close_Last": float(fields[1].replace('$', '')),
            "Volume": int(fields[2]),
            "Open": float(fields[3].replace('$', '')),
            "High": float(fields[4].replace('$', '')),
            "Low": float(fields[5].replace('$', ''))
        }]
    except (ValueError, IndexError) as e:
        # This will catch and log any rows that don't match our format
        print(f"Skipping malformed row: {line} | Error: {e}")
        return []

def run():
    # --- CORRECTED PIPELINE OPTIONS ---
    # We explicitly add the project ID to the options
    options = PipelineOptions(
        runner='DataflowRunner',
        project='prof-big-data', # --- FIX: Explicitly set the project
        job_name='gcs-to-bq-beam-pipeline-v2', # New job name
        staging_location='gs://prof-big-data-df-temp-storage/staging',
        temp_location='gs://prof-big-data-df-temp-storage/temp',
        region='us-central1'
    )

    # Define the pipeline
    with beam.Pipeline(options=options) as p:
        (
            p
            | 'ReadFromGCS' >> beam.io.ReadFromText('gs://nvdia_2025/NVDA_HistoricalData.csv')
            | 'ParseAndFormat' >> beam.FlatMap(parse_csv) # Using the corrected function
            | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
                table='prof-big-data:nvidia_test.nvidia_prices_from_beam',
                schema='Date:DATE,Close_Last:FLOAT,Volume:INTEGER,Open:FLOAT,High:FLOAT,Low:FLOAT',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
            )
        )

if __name__ == '__main__':
    run()

Overwriting beam_pipeline.py


# Cell 4: Launch the Dataflow Job

Purpose: This cell executes the beam_pipeline.py script that you just created. The ! symbol tells Colab to run the command in the underlying Linux shell. This command will package up your pipeline code and submit it to the Google Cloud Dataflow service, which will then provision workers and run the job.

In [5]:
print("Launching the Dataflow job... This may take a minute.")

# Run the beam_pipeline.py script
!python beam_pipeline.py

print("\n---")
print("Job has been submitted to Dataflow.")
print("You can monitor its progress in the GCP console:")
print(f"https://console.cloud.google.com/dataflow/jobs?project={PROJECT_ID}")

Launching the Dataflow job... This may take a minute.

---
Job has been submitted to Dataflow.
You can monitor its progress in the GCP console:
https://console.cloud.google.com/dataflow/jobs?project=prof-big-data


# Cell 5: Verify the Final Result in BigQuery

Purpose: After waiting a few minutes for the Dataflow job to show "Succeeded" in the GCP console, you can run this final cell. It connects to BigQuery, queries the new table created by your pipeline, and displays the results in a clean pandas DataFrame, confirming that your entire ETL process was successful.

In [6]:
from google.cloud import bigquery
import pandas as pd

# --- Verify the Final Result in BigQuery ---

# Use the BigQuery client we configured earlier
client = bigquery.Client(project=PROJECT_ID)

# The full ID of the table your Beam pipeline created
table_id = f"{PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE}"

# SQL query to select the first 15 rows from your table
sql_query = f"""
SELECT
    *
FROM
    `{table_id}`
ORDER BY
    Date DESC
LIMIT 15
"""

try:
    # Execute the query and load the results into a pandas DataFrame
    df = client.query(sql_query).to_dataframe()

    print(f"--- Successfully queried data from '{table_id}' ---")
    # Display the DataFrame. Colab will format it nicely.
    display(df)

except Exception as e:
    print(f"An error occurred while querying the table: {e}")

--- Successfully queried data from 'prof-big-data.nvidia_test.nvidia_prices_from_beam' ---


Unnamed: 0,Date,Close_Last,Volume,Open,High,Low
0,2024-07-01,124.3,284885500,123.47,124.84,118.83
1,2024-06-28,123.54,315516700,124.58,127.71,122.75
2,2024-06-27,123.99,252571700,124.1,126.41,122.92
3,2024-06-26,126.4,362975900,126.13,128.12,122.6
4,2024-06-25,126.09,425787500,121.2,126.5,119.32
5,2024-06-24,118.11,476060900,123.24,124.46,118.04
6,2024-06-21,126.57,655484700,127.12,130.63,124.3
7,2024-06-20,130.78,517768400,139.8,140.76,129.52
8,2024-06-18,135.58,294335100,131.14,136.33,130.69
9,2024-06-17,130.98,288504400,132.99,133.73,129.58


# Linear Regression (Predicting a Stock's High Price)

Objective: Use Linear Regression to predict a continuous value. In this case, we will predict the High price of a stock for a given day based on its Open, Low, and Close_Last prices.

Action: Run the cells below to create, evaluate, and use the model.

In [7]:
# --- 1. Create the Linear Regression Model ---

# This SQL statement creates a model named 'nvidia_high_predictor'.
# It uses the LINEAR_REG model type to predict the 'High' column (our label).
# The FEATURES clause specifies which columns to use for the prediction.
create_linear_model_query = """
CREATE OR REPLACE MODEL `prof-big-data.nvidia_test.nvidia_high_predictor`
OPTIONS(model_type='LINEAR_REG', input_label_cols=['High']) AS
SELECT
  Open,
  High,
  Low,
  Close_Last
FROM
  `prof-big-data.nvidia_test.nvidia_prices_from_beam`
"""

# Execute the query to create and train the model.
# This may take a few minutes.
linear_job = client.query(create_linear_model_query)
linear_job.result() # Wait for the job to complete

print("✅ Linear Regression model created successfully.")

# --- 2. Evaluate the Model ---

# This query uses ML.EVALUATE to see how well our model performed.
# For linear regression, look at metrics like 'r2_score' and 'mean_absolute_error'.
evaluate_linear_model_query = """
SELECT
  *
FROM
  ML.EVALUATE(MODEL `prof-big-data.nvidia_test.nvidia_high_predictor`)
"""
linear_eval_df = client.query(evaluate_linear_model_query).to_dataframe()
print("\n--- Model Evaluation ---")
display(linear_eval_df)


# --- 3. Make Predictions with the Model ---

# This query uses ML.PREDICT to predict the 'High' price for a few sample days.
# The model will output a 'predicted_High' column.
predict_linear_query = """
SELECT
  High AS actual_high,
  predicted_High,
  Open,
  Low,
  Close_Last
FROM
  ML.PREDICT(MODEL `prof-big-data.nvidia_test.nvidia_high_predictor`,
    (
    SELECT
      Open, High, Low, Close_Last
    FROM
      `prof-big-data.nvidia_test.nvidia_prices_from_beam`
    WHERE
      EXTRACT(YEAR FROM Date) = 2024
    LIMIT 5
    )
  )
"""
linear_predict_df = client.query(predict_linear_query).to_dataframe()
print("\n--- Sample Predictions ---")
display(linear_predict_df)

✅ Linear Regression model created successfully.

--- Model Evaluation ---


Unnamed: 0,mean_absolute_error,mean_squared_error,mean_squared_log_error,median_absolute_error,r2_score,explained_variance
0,0.178467,0.07997,9.2e-05,0.108855,0.999829,0.999838



--- Sample Predictions ---


Unnamed: 0,actual_high,predicted_High,Open,Low,Close_Last
0,48.1841,48.130857,47.485,47.32,47.569
1,48.5,48.538684,47.767,47.508,47.998
2,49.547,49.446074,48.462,48.306,49.097
3,49.295,49.757376,49.244,47.595,48.168
4,52.275,51.726606,49.512,49.479,52.253


# Logistic Regression (Predicting a High Volume Day)

Objective: Use Logistic Regression for binary classification. We will predict whether a given day was a "high volume" trading day. To do this, we first need to create our target label: is_high_volume. We'll define "high volume" as any day where the volume was greater than the average volume for the entire dataset.

Action: Run the cells below to create a classification model.

In [13]:
# --- 1. Pre-calculate the Average Volume ---
# This step remains the same. We need the average to create our label.
avg_volume_query = """
SELECT
    AVG(Volume) as avg_vol
FROM
    `prof-big-data.nvidia_test.nvidia_prices_from_beam`
"""
avg_volume = client.query(avg_volume_query).to_dataframe().iloc[0,0]
print(f"Calculated Average Volume: {avg_volume:,.0f}")


# --- 2. Create a Training View with the Label ---
# This is the new, crucial step. We create a VIEW that contains our final, clean data.
# This separates our data preparation logic from our model training logic.
create_view_query = f"""
CREATE OR REPLACE VIEW `prof-big-data.nvidia_test.nvidia_training_data` AS
SELECT
  Open,
  High,
  Low,
  Close_Last,
  -- This is where we create the label that our model will predict.
  CASE
    WHEN Volume > {avg_volume} THEN TRUE
    ELSE FALSE
  END AS is_high_volume
FROM
  `prof-big-data.nvidia_test.nvidia_prices_from_beam`
"""
view_job = client.query(create_view_query)
view_job.result() # Wait for the view to be created
print("\n✅ Training view `nvidia_training_data` created successfully.")


# --- 3. Create the Model from the Clean View ---
# This CREATE MODEL statement is now much simpler. It has NO TRANSFORM clause.
# It simply selects the features and the label from our new view.
create_logistic_model_query = """
CREATE OR REPLACE MODEL `prof-big-data.nvidia_test.nvidia_volume_classifier`
OPTIONS(model_type='LOGISTIC_REG', input_label_cols=['is_high_volume']) AS
SELECT
  *
FROM
  `prof-big-data.nvidia_test.nvidia_training_data`
"""
logistic_job = client.query(create_logistic_model_query)
logistic_job.result() # Wait for the job to complete
print("\n✅ Logistic Regression model created successfully.")


# --- 4. Evaluate and Predict (This part remains the same) ---
evaluate_logistic_model_query = """
SELECT
  *
FROM
  ML.EVALUATE(MODEL `prof-big-data.nvidia_test.nvidia_volume_classifier`)
"""
logistic_eval_df = client.query(evaluate_logistic_model_query).to_dataframe()
print("\n--- Model Evaluation ---")
display(logistic_eval_df)


predict_logistic_query = """
SELECT
  predicted_is_high_volume,
  predicted_is_high_volume_probs AS probability, -- FIX: Corrected the column name for probabilities
  * EXCEPT(predicted_is_high_volume, predicted_is_high_volume_probs) -- Exclude the original prediction columns
FROM
  ML.PREDICT(MODEL `prof-big-data.nvidia_test.nvidia_volume_classifier`,
    (
    -- We still use the original table for prediction, as the model knows how to transform it.
    SELECT
      Open, High, Low, Close_Last, Volume
    FROM
      `prof-big-data.nvidia_test.nvidia_prices_from_beam`
    ORDER BY Date DESC
    LIMIT 5
    )
  )
"""
logistic_predict_df = client.query(predict_logistic_query).to_dataframe()
print("\n--- Sample Predictions ---")
display(logistic_predict_df)

Calculated Average Volume: 455,353,972

✅ Training view `nvidia_training_data` created successfully.

✅ Logistic Regression model created successfully.

--- Model Evaluation ---


Unnamed: 0,precision,recall,accuracy,f1_score,log_loss,roc_auc
0,0.478261,0.08871,0.533582,0.14966,0.691303,0.542645



--- Sample Predictions ---


Unnamed: 0,predicted_is_high_volume,probability,Open,High,Low,Close_Last,Volume
0,True,"[{'label': True, 'prob': 0.5276665512107709}, ...",123.47,124.84,118.83,124.3,284885500
1,True,"[{'label': True, 'prob': 0.5283456788704558}, ...",124.58,127.71,122.75,123.54,315516700
2,True,"[{'label': True, 'prob': 0.5282097257935159}, ...",124.1,126.41,122.92,123.99,252571700
3,True,"[{'label': True, 'prob': 0.5288013981851751}, ...",126.13,128.12,122.6,126.4,362975900
4,True,"[{'label': True, 'prob': 0.5278314543825906}, ...",121.2,126.5,119.32,126.09,425787500


# K-Means Clustering (Grouping Similar Trading Days)

Objective: Use K-Means, an unsupervised learning algorithm, to group similar data points into clusters. We will group trading days into 4 clusters based on their price and volume characteristics. Since Volume is on a much larger scale than the price columns, we must standardize our features using ML.STANDARD_SCALER to ensure all features are weighted equally.

Action: Run the cells below to create the clustering model.

In [11]:
# --- 1. Create the K-Means Clustering Model ---

# This query creates a K-Means model with 4 clusters.
# The TRANSFORM clause is crucial here. It uses ML.STANDARD_SCALER to
# normalize all of our feature columns. This prevents features with large
# scales (like Volume) from dominating the distance calculations.
create_kmeans_model_query = """
CREATE OR REPLACE MODEL `prof-big-data.nvidia_test.nvidia_day_clusterer`
TRANSFORM(
    -- Standardize all features to have a mean of 0 and standard deviation of 1
    ML.STANDARD_SCALER(Open) OVER() AS std_Open,
    ML.STANDARD_SCALER(High) OVER() AS std_High,
    ML.STANDARD_SCALER(Low) OVER() AS std_Low,
    ML.STANDARD_SCALER(Close_Last) OVER() AS std_Close_Last,
    ML.STANDARD_SCALER(Volume) OVER() AS std_Volume
)
OPTIONS(model_type='KMEANS', num_clusters=4) AS
SELECT
  Open,
  High,
  Low,
  Close_Last,
  Volume
FROM
  `prof-big-data.nvidia_test.nvidia_prices_from_beam`
"""
kmeans_job = client.query(create_kmeans_model_query)
kmeans_job.result()
print("✅ K-Means Clustering model created successfully.")


# --- 2. See which Cluster Each Day Belongs To ---

# ML.PREDICT for a K-Means model will add a 'CENTROID_ID' column,
# which indicates the cluster number (from 1 to 4) each row belongs to.
predict_kmeans_query = """
SELECT
  CENTROID_ID,
  *
FROM
  ML.PREDICT(MODEL `prof-big-data.nvidia_test.nvidia_day_clusterer`,
    (
    SELECT
      Open, High, Low, Close_Last, Volume
    FROM
      `prof-big-data.nvidia_test.nvidia_prices_from_beam`
    ORDER BY Date DESC
    LIMIT 10
    )
  )
"""
kmeans_predict_df = client.query(predict_kmeans_query).to_dataframe()
print("\n--- Sample Cluster Assignments ---")
display(kmeans_predict_df)


# --- 3. Analyze the Cluster Centroids ---

# This is the most important step for understanding a K-Means model.
# ML.CENTROIDS shows us the average values for each feature within each cluster.
# By looking at these centroids, we can give each cluster a business meaning.
# For example, one cluster might be "low price, low volume days" while another
# might be "high price, high volatility, high volume days".
analyze_centroids_query = """
SELECT
  *
FROM
  ML.CENTROIDS(MODEL `prof-big-data.nvidia_test.nvidia_day_clusterer`)
"""
centroids_df = client.query(analyze_centroids_query).to_dataframe()
print("\n--- Cluster Centroid Analysis ---")
display(centroids_df)

✅ K-Means Clustering model created successfully.

--- Sample Cluster Assignments ---


Unnamed: 0,CENTROID_ID,CENTROID_ID_1,NEAREST_CENTROIDS_DISTANCE,Open,High,Low,Close_Last,Volume
0,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 2.103097846706...",123.24,124.46,118.04,118.11,476060900
1,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 3.284271169170...",139.8,140.76,129.52,130.78,517768400
2,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 2.495972716912...",123.47,124.84,118.83,124.3,284885500
3,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 2.578369406706...",126.13,128.12,122.6,126.4,362975900
4,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 2.766806932766...",127.12,130.63,124.3,126.57,655484700
5,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 2.312635363399...",121.2,126.5,119.32,126.09,425787500
6,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 3.285539780660...",131.14,136.33,130.69,135.58,294335100
7,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 2.565681209735...",124.58,127.71,122.75,123.54,315516700
8,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 3.168919202117...",132.99,133.73,129.58,130.98,288504400
9,2,2,"[{'CENTROID_ID': 2, 'DISTANCE': 2.683876430440...",124.1,126.41,122.92,123.99,252571700



--- Cluster Centroid Analysis ---


Unnamed: 0,centroid_id,feature,numerical_value,categorical_value
0,1,std_Open,-0.496242,[]
1,1,std_High,-0.499469,[]
2,1,std_Low,-0.494384,[]
3,1,std_Close_Last,-0.496882,[]
4,1,std_Volume,-0.782396,[]
5,2,std_Open,2.847676,[]
6,2,std_High,2.850559,[]
7,2,std_Low,2.843331,[]
8,2,std_Close_Last,2.84622,[]
9,2,std_Volume,0.150408,[]
