# 定義環境

In [7]:
!pip install apache-beam[gcp,dataframe] --quiet

In [8]:
import argparse
import csv
import json
import os
import torch
from typing import Tuple

import apache_beam as beam
import numpy
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
from apache_beam.options.pipeline_options import PipelineOptions

import warnings
warnings.filterwarnings('ignore')

In [9]:
import os
# 定義常數（Constants）
project = "tibame-gad253-14-dataflow"
bucket = "tibame-gad253-14-bucket-tw"
# 儲存模型的路徑
save_model_dir_multiply_five = 'five_times_table_torch.pt'
save_model_dir_multiply_ten = 'ten_times_table_torch.pt'
# 設定專案 ID。
os.environ['GOOGLE_CLOUD_PROJECT'] = project

In [10]:
from google.colab import auth
auth.authenticate_user()

In [11]:
# 測試 列出值區
!gcloud storage buckets list --project tibame-gad253-14-dataflow

---
creation_time: 2025-09-01T02:09:52+0000
default_storage_class: STANDARD
generation: 1756692591978069247
location: ASIA-EAST1
location_type: region
metageneration: 1
name: tibame-gad253-14-bucket-tw
public_access_prevention: enforced
soft_delete_policy:
  effectiveTime: '2025-09-01T02:09:52.564000+00:00'
  retentionDurationSeconds: '604800'
storage_url: gs://tibame-gad253-14-bucket-tw/
uniform_bucket_level_access: true
update_time: 2025-09-01T02:09:52+0000


# 建立模型

In [12]:
class LinearRegression(torch.nn.Module):
    def __init__(self, input_dim=1, output_dim=1):
        super().__init__()
        self.linear = torch.nn.Linear(input_dim, output_dim)
    def forward(self, x):
        out = self.linear(x)
        return out

In [13]:
x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)
y = (x * 5).reshape(-1, 1)
value_to_predict = numpy.array([20, 40, 60, 90], dtype=numpy.float32).reshape(-1, 1)

## 訓練五倍模型

In [14]:
five_times_model = LinearRegression()
optimizer = torch.optim.Adam(five_times_model.parameters())
loss_fn = torch.nn.L1Loss()

"""
Train the five_times_model
"""
epochs = 10000
tensor_x = torch.from_numpy(x)
tensor_y = torch.from_numpy(y)
for epoch in range(epochs):
    y_pred = five_times_model(tensor_x)
    loss = loss_fn(y_pred, tensor_y)
    five_times_model.zero_grad()
    loss.backward()
    optimizer.step()

## 儲存五倍模型

In [15]:
torch.save(five_times_model.state_dict(), save_model_dir_multiply_five)
print(os.path.exists(save_model_dir_multiply_five)) # Verify that the model is saved.

True


## 建立十倍模型

In [16]:
x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)
y = (x * 10).reshape(-1, 1)

## 訓練十倍模型

In [17]:
ten_times_model = LinearRegression()
optimizer = torch.optim.Adam(ten_times_model.parameters())
loss_fn = torch.nn.L1Loss()

epochs = 10000
tensor_x = torch.from_numpy(x)
tensor_y = torch.from_numpy(y)
for epoch in range(epochs):
    y_pred = ten_times_model(tensor_x)
    loss = loss_fn(y_pred, tensor_y)
    ten_times_model.zero_grad()
    loss.backward()
    optimizer.step()


## 儲存十倍模型

In [18]:
torch.save(ten_times_model.state_dict(), save_model_dir_multiply_ten)
print(os.path.exists(save_model_dir_multiply_ten)) # verify if the model is saved

True


# 建立管道

In [19]:
torch_five_times_model_handler = PytorchModelHandlerTensor(
    state_dict_path=save_model_dir_multiply_five,
    model_class=LinearRegression,
    model_params={'input_dim': 1,
                  'output_dim': 1}
                  )
pipeline = beam.Pipeline()

with pipeline as p:
      (
      p
      | "讀取輸入資料" >> beam.Create(value_to_predict)
      | "將 Numpy 轉換為 Tensor" >> beam.Map(torch.Tensor)
      | "執行 Torch 推論" >> RunInference(torch_five_times_model_handler)
      | beam.Map(print)
      )



PredictionResult(example=tensor([20.]), inference=tensor([100.4237]), model_id='five_times_table_torch.pt')
PredictionResult(example=tensor([40.]), inference=tensor([200.2546]), model_id='five_times_table_torch.pt')
PredictionResult(example=tensor([60.]), inference=tensor([300.0855]), model_id='five_times_table_torch.pt')
PredictionResult(example=tensor([90.]), inference=tensor([449.8319]), model_id='five_times_table_torch.pt')


# 自訂預測輸出結果

In [21]:
class PredictionProcessor(beam.DoFn):
  """
  用於格式化 RunInference 轉換輸出的處理器。
  """
  def process(
      self,
      element: PredictionResult):
    input_value = element.example
    output_value = element.inference
    model_id=element.model_id
    yield (f"輸入值為 {input_value.item()}，輸出值為 {output_value.item()}，模型為{model_id}")

pipeline = beam.Pipeline()

with pipeline as p:
    (
    p
    | "讀取輸入資料" >> beam.Create(value_to_predict)
    | "將 Numpy 轉換為 Tensor" >> beam.Map(torch.Tensor)
    | "執行 Torch 推論" >> RunInference(torch_five_times_model_handler)
    | "後處理預測結果" >> beam.ParDo(PredictionProcessor())
    | beam.Map(print)
    )

輸入值為 20.0，輸出值為 100.4236831665039，模型為five_times_table_torch.pt
輸入值為 40.0，輸出值為 200.2545928955078，模型為five_times_table_torch.pt
輸入值為 60.0，輸出值為 300.08551025390625，模型為five_times_table_torch.pt
輸入值為 90.0，輸出值為 449.8318786621094，模型為five_times_table_torch.pt


# 含key資料處理

## key資料格式化函式

In [22]:
class PredictionWithKeyProcessor(beam.DoFn):
    def __init__(self):
        beam.DoFn.__init__(self)

    def process(
          self,
          element: Tuple[str, PredictionResult]):
        key = element[0]
        input_value = element[1].example
        output_value = element[1].inference
        yield (f"鍵: {key}, 輸入值: {input_value.item()} 輸出值: {output_value.item()}" )

## 建立BigQuery來源

In [23]:
from google.cloud import bigquery

client = bigquery.Client(project=project)

# 確保 dataset_id 在專案中是唯一的。
dataset_id = '{project}.mathspy'.format(project=project)
dataset = bigquery.Dataset(dataset_id)

# 根據專案配置修改位置。
dataset.location = 'asia-east1'
dataset = client.create_dataset(dataset, exists_ok=True)

# BigQuery 資料集中的資料表名稱。
table_name = 'maths_problems_1'

query = """
    CREATE OR REPLACE TABLE
      {project}.maths.{table} ( key STRING OPTIONS(description="A unique key for the maths problem"),
    value FLOAT64 OPTIONS(description="Our maths problem" ) );
    INSERT INTO maths.{table}
    VALUES
      ("first_question", 105.00),
      ("second_question", 108.00),
      ("third_question", 1000.00),
      ("fourth_question", 1013.00)
""".format(project=project, table=table_name)

create_job = client.query(query)
create_job.result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7d9323577d10>

## 從BigQuery抓取資料測試

In [24]:
pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'gs://{bucket}/tmp',
                                                      })
pipeline = beam.Pipeline(options=pipeline_options)

keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)

table_name = 'maths_problems_1'
table_spec = f'{project}:maths.{table_name}'

with pipeline as p:
      (
      p
      | "讀取BigQuery資料" >> beam.io.ReadFromBigQuery(table=table_spec)
      | "提取kay與輸入值" >> beam.Map(lambda x: (x['key'], x['value']))
      | "轉換Numpy格視為Tensor" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | "進行Torch推論" >> RunInference(keyed_torch_five_times_model_handler)
      | "格式化輸出結果" >> beam.ParDo(PredictionWithKeyProcessor())
      | beam.Map(print)
      )



鍵: third_question, 輸入值: 1000.0 輸出值: 4992.138671875
鍵: second_question, 輸入值: 108.0 輸出值: 539.6796875
鍵: first_question, 輸入值: 105.0 輸出值: 524.705078125
鍵: fourth_question, 輸入值: 1013.0 輸出值: 5057.0283203125


## 準備CSV檔案

In [25]:
# 建立CSV範例資料
csv_values = [("first_question", 105.00),
      ("second_question", 108.00),
      ("third_question", 1000.00),
      ("fourth_question", 1013.00)]
input_csv_file = "./maths_problem.csv"

with open(input_csv_file, 'w') as f:
  writer = csv.writer(f)
  writer.writerow(['key', 'value'])
  for row in csv_values:
    writer.writerow(row)

assert os.path.exists(input_csv_file) == True

## 讀取CSV檔案

In [26]:
pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'gs://{bucket}/tmp',})
pipeline = beam.Pipeline(options=pipeline_options)

keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)

with pipeline as p:
  df = p | beam.dataframe.io.read_csv(input_csv_file)
  pc = to_pcollection(df)
  (pc
    | "轉換Numpy為Tensor" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
    | "進行Torch" >> RunInference(keyed_torch_five_times_model_handler)
    | "格式化輸出結果" >> beam.ParDo(PredictionWithKeyProcessor())
    | beam.Map(print)
    )

鍵: first_question, 輸入值: 105.0 輸出值: 524.705078125
鍵: second_question, 輸入值: 108.0 輸出值: 539.6796875
鍵: third_question, 輸入值: 1000.0 輸出值: 4992.138671875
鍵: fourth_question, 輸入值: 1013.0 輸出值: 5057.0283203125


# 多模型預測

## 準備十倍模型

In [27]:
torch_ten_times_model_handler = PytorchModelHandlerTensor(state_dict_path=save_model_dir_multiply_ten,
                                        model_class=LinearRegression,
                                        model_params={'input_dim': 1,
                                                      'output_dim': 1}
                                        )
keyed_torch_ten_times_model_handler = KeyedModelHandler(torch_ten_times_model_handler)


## 多模型

In [28]:
pipeline_options = PipelineOptions().from_dictionary(
                                      {'temp_location':f'gs://{bucket}/tmp'})

pipeline = beam.Pipeline(options=pipeline_options)

read_from_bq = beam.io.ReadFromBigQuery(table=table_spec)

with pipeline as p:
  multiply_five = (
      p
      |  read_from_bq
      | "CreateMultiplyFiveTuple" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 5'), x['value']))
      | "ConvertNumpyToTensorFiveTuple" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | "RunInferenceTorchFiveTuple" >> RunInference(keyed_torch_five_times_model_handler)
  )
  multiply_ten = (
      p
      | read_from_bq
      | "CreateMultiplyTenTuple" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 10'), x['value']))
      | "ConvertNumpyToTensorTenTuple" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | "RunInferenceTorchTenTuple" >> RunInference(keyed_torch_ten_times_model_handler)
  )

  inference_result = ((multiply_five, multiply_ten) | beam.Flatten()
                                 | beam.ParDo(PredictionWithKeyProcessor()))
  inference_result | beam.Map(print)



鍵: third_question * 10, 輸入值: 1000.0 輸出值: 9897.900390625
鍵: second_question * 10, 輸入值: 108.0 輸出值: 1075.8282470703125
鍵: first_question * 10, 輸入值: 105.0 輸出值: 1046.1575927734375
鍵: fourth_question * 10, 輸入值: 1013.0 輸出值: 10026.4736328125
鍵: third_question * 5, 輸入值: 1000.0 輸出值: 4992.138671875
鍵: second_question * 5, 輸入值: 108.0 輸出值: 539.6796875
鍵: first_question * 5, 輸入值: 105.0 輸出值: 524.705078125
鍵: fourth_question * 5, 輸入值: 1013.0 輸出值: 5057.0283203125


## 多模型依序預測

In [29]:
pipeline_options = PipelineOptions().from_dictionary(
                                      {'temp_location':f'gs://{bucket}/tmp'})

pipeline = beam.Pipeline(options=pipeline_options)

read_from_bq = beam.io.ReadFromBigQuery(table=table_spec)

with pipeline as p:
  multiply_five = (
      p
      |  read_from_bq
      | "CreateMultiplyFiveTuple" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 5'), x['value']))
      | "ConvertNumpyToTensorFiveTuple" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | "RunInferenceTorchFiveTuple" >> RunInference(keyed_torch_five_times_model_handler)
  )
  multiply_ten = (
      p
      | read_from_bq
      | "CreateMultiplyTenTuple" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 10'), x['value']))
      | "ConvertNumpyToTensorTenTuple" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))
      | "RunInferenceTorchTenTuple" >> RunInference(keyed_torch_ten_times_model_handler)
  )

  inference_result = ((multiply_five, multiply_ten) | beam.Flatten()
                                 | beam.ParDo(PredictionWithKeyProcessor()))
  inference_result | beam.Map(print)




鍵: third_question * 5, 輸入值: 1000.0 輸出值: 4992.138671875
鍵: second_question * 5, 輸入值: 108.0 輸出值: 539.6796875
鍵: first_question * 5, 輸入值: 105.0 輸出值: 524.705078125
鍵: fourth_question * 5, 輸入值: 1013.0 輸出值: 5057.0283203125
鍵: third_question * 10, 輸入值: 1000.0 輸出值: 9897.900390625
鍵: second_question * 10, 輸入值: 108.0 輸出值: 1075.8282470703125
鍵: first_question * 10, 輸入值: 105.0 輸出值: 1046.1575927734375
鍵: fourth_question * 10, 輸入值: 1013.0 輸出值: 10026.4736328125
