<a href="https://colab.research.google.com/github/c-damien/bayareacoders/blob/main/BayArea_Coders_Collective_Dataflow_ML_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################


####################################################################################
# Main script used to provision the different asset used in the following demo:
#               Bay Area Community event - Dataflow ML
#
# Author: Damien Contreras cdamien@google.com
####################################################################################

# How to use Whisper & Gemini in a Dataflow Pipeline

We have 2 variations on how to use models in the following pipeline in Dataflow
1. Through Hugging Face to download the model locally and do local inference using Dataflow ML
2. Using a Vertex.ai endpoint to do remote inference and using the vertex.ai libraries in Dataflow without leveraging Dataflow ML

For this lab we assume that you are using the following dataset from Kaggle:
[Kaggle Speaker audio](https://www.kaggle.com/datasets/vjcalling/speaker-recognition-audio-dataset/data)

# What is needed:
- Need a GCS bucket containing the audio files you want to use for testing with the right permissions (Read) set to the user authenticate with on this notebook

### Pre-requisites

#### (Optional) set your HuggingFace id

In [None]:
from huggingface_hub import login
login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

#### Set your GCP user
This is to access the GCS bucket where your audio files are stored

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
#@title notebook GCP variables { run: "auto", display-mode: "form" }
PROJECT_ID = "sfsc-srtt-shared" #@param {type:"string"}
GCS_DATA_PATH = "gs://srtt-audio/Speaker_0000" #@param {type:"string"}
REGION = "us-central1" #@param {type:"string"}

#### Adding some necessary libraries

In [None]:
!pip install 'transformers[torch]'
!pip install datasets

!pip install torch --quiet
!pip install tensorflow --quiet
!pip install transformers==4.44.2 --quiet
!pip install apache-beam[gcp]>=2.50 --quiet
!pip install --upgrade google-cloud-aiplatform

!pip install audio2numpy

Collecting datasets
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.2.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl (1

Collecting audio2numpy
  Downloading audio2numpy-0.1.2-py3-none-any.whl.metadata (2.1 kB)
Collecting ffmpeg (from audio2numpy)
  Downloading ffmpeg-1.4.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading audio2numpy-0.1.2-py3-none-any.whl (10 kB)
Building wheels for collected packages: ffmpeg
  Building wheel for ffmpeg (setup.py) ... [?25l[?25hcanceled[31mERROR: Operation cancelled by user[0m[31m
[0m

#### Importing the necessary libraries

In [None]:
import os
import torchaudio

#Vertex
import vertexai
from vertexai.generative_models import (
    GenerationConfig,
    GenerativeModel,
    HarmBlockThreshold,
    HarmCategory,
    Part,
    SafetySetting
)

#dataflow
import apache_beam as beam
from apache_beam.io import fileio

#dataflow ML
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.huggingface_inference import HuggingFacePipelineModelHandler
from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerKeyedTensor
from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerTensor
from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON
from apache_beam.ml.inference.huggingface_inference import PipelineTask

# **Demo**

#### Calling Vertex AI

In [None]:
class CallVertexAIGeminiModel(beam.DoFn):
  """Wrapper to call Vertex AI model

  Args:
    elements: result from Whisper call

  Returns:
    python dict with the results from Whisper and Gemini.
  """

    m = None
    def setup(self):
        vertexai.init(project=PROJECT_ID, location=REGION)

    def process(self, element):
        m = GenerativeModel(
            "gemini-1.5-flash-002",
            system_instruction=[
            "You are a helpful annotator",
            "Your mission is to make summaries of audio file's transcripts",
            ]
        )

        # Set model parameters
        generation_config = GenerationConfig(
            temperature=0.4,
            top_p=1.0,
            top_k=10,
            candidate_count=1,
            max_output_tokens=8192,
        )

        # Set safety settings
        safety_settings = {
            HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
            HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
            HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
            HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
        }

        #prompt
        prompt = """
        Summarize the following transcript:"""+ element.inference[0]['text']

        contents = [prompt]
        response = m.generate_content(contents, generation_config=generation_config, safety_settings=safety_settings,)

        summary = ""
        try:
            summary = response.text
        except Exception as e:
            # Handle the ValueError
            print(f"ValueError occurred: {e}")

        yield {"whisper_transcript": element.inference[0]['text'], "gemini_summary": summary}


### Pre processing on the audio file

In [None]:
class PrepareAudio(beam.DoFn):
  """Audio file pre processing function

  Args:
    elements: file metadata from dataflow

  Returns:
    waveform (array) with the file content
  """
  def process(self, filecontent):
    waveform, sr = torchaudio.load(filecontent[1].read(mime_type='application/octet-stream'))
    if waveform.shape[0] > 1:
        # Do a mean of all channels and keep it in one channel
        waveform = torch.mean(waveform, dim=0, keepdim=True)
    return waveform.numpy()

### model handler for a huggingface hosted model (Whisper from OpenAI)
model_handler_huggingface = HuggingFacePipelineModelHandler(
    task=PipelineTask.AutomaticSpeechRecognition, #in HuggingFace: automatic-speech-recognition,
    model = "openai/whisper-tiny.en",
)

#### Main Dataflow pipeline

In [None]:
with beam.Pipeline() as p:
  retrieve_audio = (
      p
      | "get files" >> beam.Create([GCS_DATA_PATH + "/*"])
      |  fileio.MatchAll()
      |  fileio.ReadMatches()
      | 'set keys' >> beam.util.WithKeys(lambda x: os.path.basename(x.metadata.path).split("_")[1])
  )
  ##using whisper with Dataflow ML
  inferences_whisper = (
      retrieve_audio
      | "pre process" >> beam.ParDo(PrepareAudio())
      | "RunInference" >> RunInference(model_handler_huggingface)
  )

  ##using gemini without Dataflow ML
  inferences_gemini = (
      inferences_whisper
      | "get summarization" >> beam.ParDo( CallVertexAIGeminiModel() )
      | "Show content" >> beam.ParDo(lambda x: print(x))
  )



{'whisper_transcript': " Hello everyone, this is Nien from Eduwaker and welcome to today's session on Art tutorial. So let's not waste any time and let's move forward and look at today's agenda. We'll begin this session by first understanding why do we need analytics and what exactly is business analytics and why do we prefer R over the other tools in the industry. After that we'll begin by deep dying into R by understanding the basic fundamental concepts like variable container, data operators, data types and flow control. Finally, towards the later half of the session, we'll try to understand how you can plot the various types of graphs in R. So, are we clear with today's agenda? Okay, so I have got a confirmation from Eric, Shubam, Dave, Roshni, Gaia 3, Rishma, Pankach. Okay, it's great to see that all of you guys are following. So, I hope you guys are excited for the session as I am. So, let's not waste any more time and let's move forward and look at this.", 'gemini_summary': 'Thi

ERROR:apache_beam.runners.common:KeyboardInterrupt [while running '[15]: RunInference/BeamML_RunInference']
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 917, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1061, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.10/dist-packages/apache_beam/ml/inference/base.py", line 1802, in process
    return self._run_inference(batch, inference_args)
  File "/usr/local/lib/python3.10/dist-packages/apache_beam/ml/inference/base.py", line 1772, in _run_inference
    raise e
  File "/usr/local/lib/python3.10/dist-packages/apache_beam/ml/inference/base.py", line 1758, in _run_inference
    result_generator = self._model_handler.run_inference(
  File "/usr/local/lib/python3.10/dist-packages/apache_beam/ml/infe

RuntimeError: KeyboardInterrupt [while running '[15]: RunInference/BeamML_RunInference']