In [28]:
import os
import subprocess
from google.colab import auth

def setup_environment():
    print("Authenticating your Google account...")
    auth.authenticate_user()
    print("‚úÖ Authentication successful.")

    project_id = ""
    try:
        project_id_process = subprocess.run(
            ["gcloud", "config", "get-value", "project"],
            capture_output=True, text=True, check=True
        )
        project_id = project_id_process.stdout.strip()
    except Exception:
        pass

    if not project_id:
        print("‚ö†Ô∏è Could not automatically determine GCP Project ID.")
        project_id = input("Please enter your GCP Project ID: ").strip()

    if not project_id:
        raise ValueError("üî¥ ERROR: Project ID is required to continue.")

    print(f"‚úÖ Using GCP Project: {project_id}")

    base_bucket_name = input("Please enter a base name for your GCS bucket (e.g., 'bitcoin-data-final-proj'): ").strip()
    if not base_bucket_name:
        raise ValueError("üî¥ ERROR: A base name for the bucket is required to continue.")

    safe_project_id = project_id.replace('.', '-')
    bucket_name = f"{safe_project_id}-{base_bucket_name}"
    print(f"‚úÖ Bucket will be named: {bucket_name}")

    os.environ["PROJECT_ID"] = project_id
    os.environ["BUCKET_NAME"] = bucket_name

    # Also set gcloud project for this runtime
    subprocess.run(["gcloud", "config", "set", "project", project_id], check=False)

    return project_id, bucket_name

PROJECT_ID, BUCKET_NAME = setup_environment()
print("PROJECT_ID:", PROJECT_ID)
print("BUCKET_NAME:", BUCKET_NAME)



Authenticating your Google account...
‚úÖ Authentication successful.
‚úÖ Using GCP Project: mgmt467-project1
Please enter a base name for your GCS bucket (e.g., 'bitcoin-data-final-proj'): mgmt479-bitcoin-final-lab
‚úÖ Bucket will be named: mgmt467-project1-mgmt479-bitcoin-final-lab
PROJECT_ID: mgmt467-project1
BUCKET_NAME: mgmt467-project1-mgmt479-bitcoin-final-lab


In [29]:
import os
import json
import time
import subprocess
from datetime import datetime, timezone

import requests
from google.colab import auth

auth.authenticate_user()
print("‚úÖ Authenticated")

PROJECT_ID = os.environ.get("PROJECT_ID")
BUCKET_NAME = os.environ.get("BUCKET_NAME")

if not PROJECT_ID or not BUCKET_NAME:
    raise ValueError("Missing PROJECT_ID or BUCKET_NAME. Run your Notebook 1 setup_environment() cell first.")

REGION = "us-central1"

# MUST match Notebook 1 exactly
BQ_DATASET = "bitcoin_data_set"
BATCH_RAW_TABLE = "bitcoin_full_dataset"
BATCH_CLEAN_TABLE = "bitcoin_cleaned"
BATCH_ANALYTICS_VIEW = "bitcoin_analytics_view"

# New streaming objects (Notebook 2)
STREAMING_TABLE = "bitcoin_streaming"
DEADLETTER_TABLE = "bitcoin_deadletter"

PUBSUB_TOPIC = "bitcoin-price-topic"
FUNCTION_NAME = "btc_stream_producer"
FUNCTION_ENTRYPOINT = "btc_stream_producer"
RUNTIME = "python311"

print("PROJECT_ID:", PROJECT_ID)
print("BUCKET_NAME:", BUCKET_NAME)
print("REGION:", REGION)
print("BQ_DATASET:", BQ_DATASET)



‚úÖ Authenticated
PROJECT_ID: mgmt467-project1
BUCKET_NAME: mgmt467-project1-mgmt479-bitcoin-final-lab
REGION: us-central1
BQ_DATASET: bitcoin_data_set


In [30]:
!gcloud config set project $PROJECT_ID

!gcloud services enable \
  cloudfunctions.googleapis.com \
  run.googleapis.com \
  eventarc.googleapis.com \
  pubsub.googleapis.com \
  dataflow.googleapis.com \
  bigquery.googleapis.com \
  cloudscheduler.googleapis.com


Updated property [core/project].
Operation "operations/acat.p2-718972448119-f1f6036e-b791-4ba0-b560-ddcb614a0df8" finished successfully.


In [31]:
from google.cloud import bigquery

bq = bigquery.Client(project=PROJECT_ID)

# --- 1) REQUIRED: Curated batch table from Notebook 1 ---
batch_clean_table = f"{PROJECT_ID}.{BQ_DATASET}.bitcoin_cleaned"
q1 = f"SELECT COUNT(*) AS n FROM `{batch_clean_table}`"

print("RUNNING:", q1)
res1 = list(bq.query(q1).result())
print("‚úÖ bitcoin_cleaned row count:", res1[0]["n"])

# --- 2) OPTIONAL BUT EXPECTED: Analytics view from Notebook 1 ---
# NOTE: Actual view name confirmed from your dataset
BATCH_ANALYTICS_VIEW = "bitcoin_analytics_view"
analytics_view = f"{PROJECT_ID}.{BQ_DATASET}.{BATCH_ANALYTICS_VIEW}"

try:
    bq.get_table(analytics_view)
    print(f"‚úÖ Analytics view exists: {analytics_view}")

    q2 = f"SELECT * FROM `{analytics_view}` LIMIT 5"
    print("RUNNING:", q2)
    res2 = list(bq.query(q2).result())
    print("‚úÖ Sample row from analytics view:", dict(res2[0]) if res2 else "No rows")

except Exception as e:
    print(f"‚ö†Ô∏è Analytics view not accessible: {analytics_view}")
    print("Reason:", str(e)[:300])
    print("Continuing ‚Äî analytics view is optional for streaming pipeline.")



RUNNING: SELECT COUNT(*) AS n FROM `mgmt467-project1.bitcoin_data_set.bitcoin_cleaned`
‚úÖ bitcoin_cleaned row count: 260878
‚úÖ Analytics view exists: mgmt467-project1.bitcoin_data_set.bitcoin_analytics_view
RUNNING: SELECT * FROM `mgmt467-project1.bitcoin_data_set.bitcoin_analytics_view` LIMIT 5
‚úÖ Sample row from analytics view: {'Timestamp': 1329602940.0, 'datetime': '2012-02-18 22:09:00', 'Open': 4.14, 'High': 4.14, 'Low': 4.14, 'Close': 4.14, 'Volume': 0.0, 'is_positive_minute_change': False}


In [32]:
topic_full = f"projects/{PROJECT_ID}/topics/{PUBSUB_TOPIC}"
existing_topics = subprocess.check_output(
    ["bash", "-lc", "gcloud pubsub topics list --format='value(name)'"]
).decode().splitlines()

if topic_full not in existing_topics:
    !gcloud pubsub topics create $PUBSUB_TOPIC
    print("‚úÖ Created topic:", PUBSUB_TOPIC)
else:
    print("‚úÖ Topic exists:", PUBSUB_TOPIC)


‚úÖ Topic exists: bitcoin-price-topic


In [33]:
from google.cloud import bigquery

stream_table_id = f"{PROJECT_ID}.{BQ_DATASET}.{STREAMING_TABLE}"
dead_table_id = f"{PROJECT_ID}.{BQ_DATASET}.{DEADLETTER_TABLE}"

schema = [
    bigquery.SchemaField("event_time", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("ingestion_time", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("asset", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("price_usd", "FLOAT"),
    bigquery.SchemaField("change_percent_24h", "FLOAT"),
    bigquery.SchemaField("volume_usd_24h", "FLOAT"),
    bigquery.SchemaField("market_cap_usd", "FLOAT"),
    bigquery.SchemaField("raw_json", "STRING"),
]

def ensure_table(table_id):
    t = bigquery.Table(table_id, schema=schema)
    t.time_partitioning = bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="ingestion_time"
    )
    try:
        bq.get_table(table_id)
        print("‚úÖ Table exists:", table_id)
    except Exception:
        bq.create_table(t, exists_ok=True)
        print("‚úÖ Created table:", table_id)

ensure_table(stream_table_id)
ensure_table(dead_table_id)


‚úÖ Table exists: mgmt467-project1.bitcoin_data_set.bitcoin_streaming
‚úÖ Table exists: mgmt467-project1.bitcoin_data_set.bitcoin_deadletter


In [34]:
# Ensure bucket exists (Notebook 1 created it, but we confirm)
try:
    subprocess.check_output(["bash", "-lc", f"gsutil ls -b gs://{BUCKET_NAME}"])
    print("‚úÖ Bucket exists:", BUCKET_NAME)
except subprocess.CalledProcessError:
    print("Bucket not found; creating it...")
    !gsutil mb -p $PROJECT_ID -l $REGION gs://$BUCKET_NAME

DATAFLOW_STAGING = f"gs://{BUCKET_NAME}/dataflow-staging"
print("DATAFLOW_STAGING:", DATAFLOW_STAGING)


‚úÖ Bucket exists: mgmt467-project1-mgmt479-bitcoin-final-lab
DATAFLOW_STAGING: gs://mgmt467-project1-mgmt479-bitcoin-final-lab/dataflow-staging


In [35]:
import pathlib
FUNCTION_DIR = "function_btc_stream"
pathlib.Path(FUNCTION_DIR).mkdir(exist_ok=True)

main_py = r'''
import json
import os
from datetime import datetime, timezone

import requests
from google.cloud import pubsub_v1

PROJECT_ID = os.environ["PROJECT_ID"]
TOPIC_ID = os.environ["TOPIC_ID"]

# Coinbase public endpoint (no auth)
COINBASE_URL = "https://api.coinbase.com/v2/prices/BTC-USD/spot"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)

def btc_stream_producer(request):
    """
    HTTP Cloud Function (2nd gen):
    Coinbase API (no-auth) ‚Üí normalized JSON ‚Üí Pub/Sub
    Keeps the same schema as our BigQuery streaming table.
    """
    try:
        resp = requests.get(COINBASE_URL, timeout=10)
        resp.raise_for_status()
        payload = resp.json()

        # Coinbase response: {"data":{"base":"BTC","currency":"USD","amount":"####.##"}}
        price = float(payload["data"]["amount"])

        now = datetime.now(timezone.utc)

        msg = {
            "event_time": now.isoformat(),
            "ingestion_time": now.isoformat(),
            "asset": "bitcoin",
            "price_usd": price,
            "change_percent_24h": None,
            "volume_usd_24h": None,
            "market_cap_usd": None,
            "raw_json": json.dumps(payload)
        }

        publisher.publish(topic_path, json.dumps(msg).encode("utf-8")).result(timeout=10)
        return (json.dumps({"status":"ok","published":True,"msg":msg}), 200, {"Content-Type":"application/json"})

    except Exception as e:
        return (json.dumps({"status":"error","error":str(e)}), 500, {"Content-Type":"application/json"})
'''

reqs = "google-cloud-pubsub==2.27.1\nrequests==2.32.3\n"

with open(f"{FUNCTION_DIR}/main.py", "w") as f:
    f.write(main_py)
with open(f"{FUNCTION_DIR}/requirements.txt", "w") as f:
    f.write(reqs)

print("‚úÖ Function code written to:", FUNCTION_DIR)



‚úÖ Function code written to: function_btc_stream


In [36]:
!gcloud services enable cloudbuild.googleapis.com artifactregistry.googleapis.com


Operation "operations/acat.p2-718972448119-50914ad1-cb53-4e8e-9bc1-9b794210acef" finished successfully.


In [37]:
!gcloud functions deploy btc_stream_producer \
  --gen2 \
  --runtime=python311 \
  --region=us-central1 \
  --source=function_btc_stream \
  --entry-point=btc_stream_producer \
  --trigger-http \
  --allow-unauthenticated \
  --set-env-vars PROJECT_ID=mgmt467-project1,TOPIC_ID=bitcoin-price-topic

  [INFO] A new revision will be deployed serving with 100% traffic.
You can view your function in the Cloud Console here: https://console.cloud.google.com/functions/details/us-central1/btc_stream_producer?project=mgmt467-project1

buildConfig:
  automaticUpdatePolicy: {}
  build: projects/718972448119/locations/us-central1/builds/115680a2-221e-4da9-aa40-10ca215776ce
  dockerRegistry: ARTIFACT_REGISTRY
  dockerRepository: projects/mgmt467-project1/locations/us-central1/repositories/gcf-artifacts
  entryPoint: btc_stream_producer
  runtime: python311
  serviceAccount: projects/mgmt467-project1/serviceAccounts/718972448119-compute@developer.gserviceaccount.com
  source:
    storageSource:
      bucket: gcf-v2-sources-718972448119-us-central1
      generation: '1765669698241210'
      object: btc_stream_producer/function-source.zip
  sourceProvenance:
    resolvedStorageSource:
      bucket: gcf-v2-sources-718972448119-us-central1
      generation: '1765669698241210'
      object: btc_stre

In [38]:
import subprocess, requests

FUNCTION_URL = subprocess.check_output(
    ["bash", "-lc", f"gcloud functions describe {FUNCTION_NAME} --gen2 --region {REGION} --format='value(serviceConfig.uri)'"]
).decode().strip()

print("FUNCTION_URL:", FUNCTION_URL)


FUNCTION_URL: https://btc-stream-producer-7wf2mbd7nq-uc.a.run.app


In [39]:
r = requests.get(FUNCTION_URL, timeout=30)
print("Status:", r.status_code)
print(r.text[:800])

Status: 200
{"status": "ok", "published": true, "msg": {"event_time": "2025-12-13T23:49:16.336646+00:00", "ingestion_time": "2025-12-13T23:49:16.336646+00:00", "asset": "bitcoin", "price_usd": 90265.2, "change_percent_24h": null, "volume_usd_24h": null, "market_cap_usd": null, "raw_json": "{\"data\": {\"amount\": \"90265.2\", \"base\": \"BTC\", \"currency\": \"USD\"}}"}}


In [40]:
import subprocess

REGION = "us-central1"
SCHEDULER_JOB = "btc-stream-every-minute"

# Use the Cloud Run URL (serviceConfig.uri) for Scheduler
FUNCTION_URL = subprocess.check_output(
    ["bash", "-lc", f"gcloud functions describe {FUNCTION_NAME} --gen2 --region {REGION} --format='value(serviceConfig.uri)'"]
).decode().strip()

print("Scheduler will call:", FUNCTION_URL)

# Delete existing job if it exists
existing_jobs_names = [j.split('/')[-1] for j in subprocess.check_output(
    ["bash", "-lc", f"gcloud scheduler jobs list --location {REGION} --format='value(name)' || true"]
).decode().splitlines()]

if SCHEDULER_JOB in existing_jobs_names:
    !gcloud scheduler jobs delete $SCHEDULER_JOB --location=$REGION --quiet
    print("Deleted old scheduler job.")

# Create a job that calls the function every minute
!gcloud scheduler jobs create http $SCHEDULER_JOB \
  --location=$REGION \
  --schedule="* * * * *" \
  --uri=$FUNCTION_URL \
  --http-method=GET

print("‚úÖ Scheduler created:", SCHEDULER_JOB)


Scheduler will call: https://btc-stream-producer-7wf2mbd7nq-uc.a.run.app
Deleted job [btc-stream-every-minute].
Deleted old scheduler job.
attemptDeadline: 180s
httpTarget:
  headers:
    User-Agent: Google-Cloud-Scheduler
  httpMethod: GET
  uri: https://btc-stream-producer-7wf2mbd7nq-uc.a.run.app/
name: projects/mgmt467-project1/locations/us-central1/jobs/btc-stream-every-minute
retryConfig:
  maxBackoffDuration: 3600s
  maxDoublings: 5
  maxRetryDuration: 0s
  minBackoffDuration: 5s
schedule: '* * * * *'
scheduleTime: '2025-12-13T23:50:00Z'
state: ENABLED
status:
  code: -1
timeZone: Etc/UTC
userUpdateTime: '2025-12-13T23:49:22.259517Z'
‚úÖ Scheduler created: btc-stream-every-minute


In [41]:
import time

DATAFLOW_JOB = f"btc-pubsub-to-bq-{int(time.time())}"
REGION = "us-central1"

INPUT_TOPIC = f"projects/{PROJECT_ID}/topics/{PUBSUB_TOPIC}"
OUTPUT_TABLE = f"{PROJECT_ID}:{BQ_DATASET}.{STREAMING_TABLE}"
DEAD_TABLE   = f"{PROJECT_ID}:{BQ_DATASET}.{DEADLETTER_TABLE}"

PARAMS = (
    f"inputTopic={INPUT_TOPIC},"
    f"outputTableSpec={OUTPUT_TABLE},"
    f"outputDeadletterTable={DEAD_TABLE}"
)

print("INPUT_TOPIC:", INPUT_TOPIC)
print("OUTPUT_TABLE:", OUTPUT_TABLE)
print("DEAD_TABLE:", DEAD_TABLE)
print("PARAMS:", PARAMS)

!gcloud dataflow jobs run $DATAFLOW_JOB \
  --region=$REGION \
  --gcs-location=gs://dataflow-templates-$REGION/latest/PubSub_to_BigQuery \
  --staging-location=gs://$BUCKET_NAME/dataflow-staging \
  --parameters="$PARAMS" \
  --project=$PROJECT_ID

print("‚úÖ Requested Dataflow job start:", DATAFLOW_JOB)


INPUT_TOPIC: projects/mgmt467-project1/topics/bitcoin-price-topic
OUTPUT_TABLE: mgmt467-project1:bitcoin_data_set.bitcoin_streaming
DEAD_TABLE: mgmt467-project1:bitcoin_data_set.bitcoin_deadletter
PARAMS: inputTopic=projects/mgmt467-project1/topics/bitcoin-price-topic,outputTableSpec=mgmt467-project1:bitcoin_data_set.bitcoin_streaming,outputDeadletterTable=mgmt467-project1:bitcoin_data_set.bitcoin_deadletter
createTime: '2025-12-13T23:49:24.567094Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: 2025-12-13_15_49_24-15284570527626875413
location: us-central1
name: btc-pubsub-to-bq-1765669763
projectId: mgmt467-project1
startTime: '2025-12-13T23:49:24.567094Z'
type: JOB_TYPE_STREAMING
‚úÖ Requested Dataflow job start: btc-pubsub-to-bq-1765669763


In [42]:
!gcloud dataflow jobs list --region=us-central1 --status=active


JOB_ID                                    NAME                         TYPE       CREATION_TIME        STATE    REGION
2025-12-13_15_49_24-15284570527626875413  btc-pubsub-to-bq-1765669763  Streaming  2025-12-13 23:49:24  Pending  us-central1
2025-12-13_13_44_42-17487604794254388193  btc-pubsub-to-bq-1765662280  Streaming  2025-12-13 21:44:43  Running  us-central1


In [43]:
from google.cloud import bigquery
bq = bigquery.Client(project=PROJECT_ID)

q = f"""
SELECT ingestion_time, asset, price_usd
FROM `{PROJECT_ID}.bitcoin_data_set.bitcoin_streaming`
ORDER BY ingestion_time DESC
LIMIT 10
"""
for r in bq.query(q).result():
    print(dict(r))



{'ingestion_time': datetime.datetime(2025, 12, 13, 23, 49, 16, 336646, tzinfo=datetime.timezone.utc), 'asset': 'bitcoin', 'price_usd': 90265.2}
{'ingestion_time': datetime.datetime(2025, 12, 13, 23, 49, 1, 675156, tzinfo=datetime.timezone.utc), 'asset': 'bitcoin', 'price_usd': 90258.845}
{'ingestion_time': datetime.datetime(2025, 12, 13, 23, 48, 1, 712327, tzinfo=datetime.timezone.utc), 'asset': 'bitcoin', 'price_usd': 90274.235}
{'ingestion_time': datetime.datetime(2025, 12, 13, 23, 47, 2, 288059, tzinfo=datetime.timezone.utc), 'asset': 'bitcoin', 'price_usd': 90288.0}
{'ingestion_time': datetime.datetime(2025, 12, 13, 23, 46, 2, 183489, tzinfo=datetime.timezone.utc), 'asset': 'bitcoin', 'price_usd': 90268.005}
{'ingestion_time': datetime.datetime(2025, 12, 13, 23, 45, 2, 319864, tzinfo=datetime.timezone.utc), 'asset': 'bitcoin', 'price_usd': 90248.79}
{'ingestion_time': datetime.datetime(2025, 12, 13, 23, 44, 1, 694731, tzinfo=datetime.timezone.utc), 'asset': 'bitcoin', 'price_usd': 

In [44]:
from google.cloud import bigquery
bq = bigquery.Client(project=PROJECT_ID)

q = f"""
WITH latest_batch AS (
  SELECT
    Close AS batch_close,
    TIMESTAMP(datetime) AS batch_ts
  FROM `{PROJECT_ID}.bitcoin_data_set.bitcoin_analytics_view`
  ORDER BY TIMESTAMP(datetime) DESC
  LIMIT 1
),
latest_stream AS (
  SELECT
    ingestion_time,
    price_usd AS streaming_price_usd
  FROM `{PROJECT_ID}.bitcoin_data_set.bitcoin_streaming`
  ORDER BY ingestion_time DESC
  LIMIT 25
)
SELECT
  s.ingestion_time,
  s.streaming_price_usd,
  b.batch_ts AS latest_batch_ts,
  b.batch_close,
  (s.streaming_price_usd - b.batch_close) AS price_diff
FROM latest_stream s
CROSS JOIN latest_batch b
ORDER BY s.ingestion_time DESC;
"""
df2 = bq.query(q).to_dataframe()
df2



Unnamed: 0,ingestion_time,streaming_price_usd,latest_batch_ts,batch_close,price_diff
0,2025-12-13 23:49:16.336646+00:00,90265.2,2025-12-12 23:57:00+00:00,90244.0,21.2
1,2025-12-13 23:49:01.675156+00:00,90258.845,2025-12-12 23:57:00+00:00,90244.0,14.845
2,2025-12-13 23:48:01.712327+00:00,90274.235,2025-12-12 23:57:00+00:00,90244.0,30.235
3,2025-12-13 23:47:02.288059+00:00,90288.0,2025-12-12 23:57:00+00:00,90244.0,44.0
4,2025-12-13 23:46:02.183489+00:00,90268.005,2025-12-12 23:57:00+00:00,90244.0,24.005
5,2025-12-13 23:45:02.319864+00:00,90248.79,2025-12-12 23:57:00+00:00,90244.0,4.79
6,2025-12-13 23:44:01.694731+00:00,90238.025,2025-12-12 23:57:00+00:00,90244.0,-5.975
7,2025-12-13 23:43:02.290970+00:00,90232.225,2025-12-12 23:57:00+00:00,90244.0,-11.775
8,2025-12-13 23:42:01.670099+00:00,90276.995,2025-12-12 23:57:00+00:00,90244.0,32.995
9,2025-12-13 23:41:02.287793+00:00,90255.525,2025-12-12 23:57:00+00:00,90244.0,11.525


In [45]:
view_id = f"{PROJECT_ID}.bitcoin_data_set.bitcoin_realtime_vs_batch_v"

sql = f"""
CREATE OR REPLACE VIEW `{view_id}` AS
WITH latest_batch AS (
  SELECT
    Close AS batch_close,
    TIMESTAMP(datetime) AS batch_ts
  FROM `{PROJECT_ID}.bitcoin_data_set.bitcoin_analytics_view`
  ORDER BY TIMESTAMP(datetime) DESC
  LIMIT 1
),
stream AS (
  SELECT
    ingestion_time,
    price_usd
  FROM `{PROJECT_ID}.bitcoin_data_set.bitcoin_streaming`
)
SELECT
  stream.ingestion_time,
  stream.price_usd,
  batch.batch_ts AS latest_batch_ts,
  batch.batch_close,
  (stream.price_usd - batch.batch_close) AS price_diff
FROM stream
CROSS JOIN latest_batch batch;
"""
bq.query(sql).result()
print("‚úÖ Created view:", view_id)


‚úÖ Created view: mgmt467-project1.bitcoin_data_set.bitcoin_realtime_vs_batch_v
