In [None]:
from typing import Dict
from typing import Iterable
from typing import Tuple

import tensorflow as tf
import torch
from transformers import AutoTokenizer
from transformers import TFAutoModelForMaskedLM

import apache_beam as beam
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.huggingface_inference import HuggingFacePipelineModelHandler
from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerKeyedTensor
from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerTensor
from apache_beam.ml.inference.huggingface_inference import PipelineTask

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
model_handler = HuggingFacePipelineModelHandler(
    task=PipelineTask.Translation_XX_to_YY,
    model = "google/flan-t5-small",
    load_pipeline_args={'framework': 'pt'},
    inference_args={'max_length': 200}
)

HuggingFaceModelHandler specified a 'GPU' device, but GPUs are not available. Switching to CPU.
HuggingFaceModelHandler specified a 'GPU' device, but GPUs are not available. Switching to CPU.


In [3]:
text = ["translate English to Spanish: How are you doing?",
        "translate English to Spanish: This is the Apache Beam project."]

In [None]:
class FormatOutput(beam.DoFn):
  """
  Extract the results from PredictionResult and print the results.
  """
  def process(self, element):
    example = element.example
    translated_text = element.inference[0]['translation_text']
    print(f'Example: {example}')
    print(f'Translated text: {translated_text}')
    print('-' * 80)

In [6]:
with beam.Pipeline() as beam_pipeline:
  examples = (
      beam_pipeline
      | "CreateExamples" >> beam.Create(text)
  )
  inferences = (
      examples
      | "RunInference" >> RunInference(model_handler)
      | "Print" >> beam.ParDo(FormatOutput())
  )





{"timestamp":"2025-07-15T05:19:10.230435Z","level":"WARN","fields":{"message":"Reqwest(reqwest::Error { kind: Request, source: hyper_util::client::legacy::Error(Connect, ConnectError(\"dns error\", Custom { kind: Uncategorized, error: \"failed to lookup address information: nodename nor servname provided, or not known\" })) }). Retrying..."},"filename":"/Users/runner/work/xet-core/xet-core/cas_client/src/http_client.rs","line_number":242}
{"timestamp":"2025-07-15T05:19:10.230459Z","level":"WARN","fields":{"message":"Retry attempt #0. Sleeping 1.314958139s before the next attempt"},"filename":"/Users/runner/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/reqwest-retry-0.7.0/src/middleware.rs","line_number":171}




Example: translate English to Spanish: How are you doing?
Translated text: Cómo está acerca?
--------------------------------------------------------------------------------
Example: translate English to Spanish: This is the Apache Beam project.
Translated text: Esto es el proyecto Apache Beam.
--------------------------------------------------------------------------------


### Masked Language Modeling Task

In [7]:
model_handler = HuggingFaceModelHandlerKeyedTensor(
    model_uri="stevhliu/my_awesome_eli5_mlm_model",
    model_class=TFAutoModelForMaskedLM,
    framework='tf',
    load_model_args={'from_pt': True},
    max_batch_size=1
)

In [8]:
text = ['The capital of France is Paris .',
    'It is raining cats and dogs .',
    'He looked up and saw the sun and stars .',
    'Today is Monday and tomorrow is Tuesday .',
    'There are 5 coconuts on this palm tree .']

In [9]:
def add_mask_to_last_word(text: str) -> Tuple[str, str]:
  """Replace the last word of sentence with <mask> and return
  the original sentence and the masked sentence."""
  text_list = text.split()
  masked = ' '.join(text_list[:-2] + ['<mask>' + text_list[-1]])
  return text, masked

tokenizer = AutoTokenizer.from_pretrained("stevhliu/my_awesome_eli5_mlm_model")

def tokenize_sentence(
    text_and_mask: Tuple[str, str],
    tokenizer) -> Tuple[str, Dict[str, tf.Tensor]]:
  """Convert string examples to tensors."""
  text, masked_text = text_and_mask
  tokenized_sentence = tokenizer.encode_plus(
      masked_text, return_tensors="tf")

  # Workaround to manually remove batch dim until we have the feature to
  # add optional batching flag.
  # TODO(https://github.com/apache/beam/issues/21863): Remove when optional
  # batching flag added
  return text, {
      k: tf.squeeze(v)
      for k, v in dict(tokenized_sentence).items()
  }

In [10]:
class PostProcessor(beam.DoFn):
  """Processes the PredictionResult to get the predicted word.

  The logits are the output of the BERT Model. To get the word with the highest
  probability of being the masked word, take the argmax.
  """
  def __init__(self, tokenizer):
    super().__init__()
    self.tokenizer = tokenizer

  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
    text, prediction_result = element
    inputs = prediction_result.example
    logits = prediction_result.inference['logits']
    mask_token_index = tf.where(inputs["input_ids"] == self.tokenizer.mask_token_id)[0]
    predicted_token_id = tf.math.argmax(logits[mask_token_index[0]], axis=-1)
    decoded_word = self.tokenizer.decode(predicted_token_id)
    print(f"Actual Sentence: {text}\nPredicted last word: {decoded_word}")
    print('-' * 80)

In [12]:
with beam.Pipeline() as beam_pipeline:
  tokenized_examples = (
      beam_pipeline
      | "CreateExamples" >> beam.Create(text)
      | 'AddMask' >> beam.Map(add_mask_to_last_word)
      | 'TokenizeSentence' >>
      beam.Map(lambda x: tokenize_sentence(x, tokenizer)))

  result = (
      tokenized_examples
      | "RunInference" >> RunInference(KeyedModelHandler(model_handler))
      | "PostProcess" >> beam.ParDo(PostProcessor(tokenizer))
  )

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFRobertaForMaskedLM: ['roberta.embeddings.position_ids']
- This IS expected if you are initializing TFRobertaForMaskedLM from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFRobertaForMaskedLM from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFRobertaForMaskedLM were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaForMaskedLM for predictions without further training.


Actual Sentence: The capital of France is Paris .
Predicted last word:  Paris
--------------------------------------------------------------------------------
Actual Sentence: It is raining cats and dogs .
Predicted last word:  dogs
--------------------------------------------------------------------------------
Actual Sentence: He looked up and saw the sun and stars .
Predicted last word:  moon
--------------------------------------------------------------------------------
Actual Sentence: Today is Monday and tomorrow is Tuesday .
Predicted last word:  Tuesday
--------------------------------------------------------------------------------
Actual Sentence: There are 5 coconuts on this palm tree .
Predicted last word:  tree
--------------------------------------------------------------------------------
