In [1]:
!pip install uv
!uv pip install apache-beam --system

Collecting uv
  Downloading uv-0.4.16-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Downloading uv-0.4.16-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.3/12.3 MB[0m [31m25.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: uv
Successfully installed uv-0.4.16
[2K[2mResolved [1m44 packages[0m [2min 3.18s[0m[0m
[2K[36m[1mBuilding[0m[39m crcmod[2m==1.7[0m
[2K[1A[36m[1mBuilding[0m[39m crcmod[2m==1.7[0m
[36m[1mBuilding[0m[39m dill[2m==0.3.1.1[0m
[2K[2A[36m[1mBuilding[0m[39m crcmod[2m==1.7[0m
[36m[1mBuilding[0m[39m dill[2m==0.3.1.1[0m
[36m[1mBuilding[0m[39m hdfs[2m==2.7.3[0m
[2K[3A[36m[1mBuilding[0m[39m crcmod[2m==1.7[0m
[36m[1mBuilding[0m[39m dill[2m==0.3.1.1[0m
[36m[1mBuilding[0m[39m hdfs[2m==2.7.3[0m
[36m[1mBuilding[0m[39m docopt[2m==0.6.2[0m
[2K[4A[36m[1mBuilding[0m[39m crcmod[2m

In [2]:
%%writefile random-string-source.py

import time
import random
import sys

# List of random strings to print
strings = ["Hello, World!", "Apache Beam is awesome!", "Data Science rocks!",
           "Python is fun!", "Keep streaming!", "Machine Learning!",
           "Random string!", "Let's keep coding!", "Hello from the console!",
           "Keep learning!"]

# Function to print random strings for 9 seconds
def print_random_strings(duration=9):
    start_time = time.time()

    # Continue printing until the duration is met
    while time.time() - start_time < duration:
        # Select a random string from the list
        random_string = random.choice(strings)

        # Print the string to stdout
        print(random_string)

        # Flush the output to ensure immediate printing
        sys.stdout.flush()

        # Add a short delay (e.g., 0.5 seconds) between prints
        time.sleep(0.0001)

if __name__ == '__main__':
    print_random_strings()

Writing random-string-source.py


In [3]:
!python3 random-string-source.py > lines.txt

!cat lines.txt >> long_lines.txt
!cat lines.txt >> long_lines.txt
!cat lines.txt >> long_lines.txt
!cat lines.txt >> long_lines.txt
!cat lines.txt >> long_lines.txt
!cat lines.txt >> long_lines.txt
!cat lines.txt >> long_lines.txt
!cat lines.txt >> long_lines.txt
!cat lines.txt >> long_lines.txt
!cat lines.txt >> long_lines.txt

**Pipeline IO**

Example of reading contents of a file from disk and writing outputs to a file on disk.

In [4]:
%%writefile beam-batching-from-file.py

import apache_beam as beam
from apache_beam import DoFn, PTransform, Pipeline
from apache_beam.io import iobase
import time
import random
import datetime
import sys

def my_sum(*args):
  return sum(*args)

def add_random_int(element):
   return (element, random.randint(1, 100))

def run():
    pipeline_options = beam.options.pipeline_options.PipelineOptions()

    with beam.Pipeline(options=pipeline_options) as p:
        # Use the custom connector to read data into the pipeline
        counts = (p
         | "Start" >> beam.io.ReadFromText("long_lines.txt")  # Create a dummy element to kickstart the stream
         | "Add Random Int" >> beam.Map(add_random_int)
         | "Group By Key" >> beam.CombinePerKey(my_sum))

        (counts
         | "Print Data" >> beam.Map(print))

        (counts
         | "Write Results To File" >> beam.io.WriteToText("results", file_name_suffix=".txt"))

if __name__ == '__main__':
    run()

Writing beam-batching-from-file.py


In [5]:
!python3 beam-batching-from-file.py

('Random string!', 2142944)
('Keep learning!', 2082164)
('Hello, World!', 2148437)
('Keep streaming!', 2095592)
('Apache Beam is awesome!', 2131295)
("Let's keep coding!", 2107088)
('Hello from the console!', 2093982)
('Data Science rocks!', 2076476)
('Python is fun!', 2093643)
('Machine Learning!', 2153147)


In [6]:
!cat results-00000-of-00001.txt

('Random string!', 2142944)
('Keep learning!', 2082164)
('Hello, World!', 2148437)
('Keep streaming!', 2095592)
('Apache Beam is awesome!', 2131295)
("Let's keep coding!", 2107088)
('Hello from the console!', 2093982)
('Data Science rocks!', 2076476)
('Python is fun!', 2093643)
('Machine Learning!', 2153147)


**Streaming, ParDo, Triggers, and Windowing**

Example of streaming data (from stdin in this case using the above random-string-source.py), and applying techniques like Windowing/Triggering. Also an example of ParDo.

In [20]:
%%writefile beam-streaming-from-stdin.py

import apache_beam as beam
from apache_beam import DoFn, PTransform, Pipeline
from apache_beam.io import iobase
from apache_beam.transforms.window import FixedWindows, TimestampedValue
import time
import random
import datetime
import sys

class GenerateStreamingData(DoFn):
    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        # Simulate a streaming source by generating random numbers
        while True:
            now = datetime.datetime.now()
            line = sys.stdin.readline().strip()
            if not line:
                break
            yield TimestampedValue((line, random.randint(1, 100)), time.time())

class MyStreamingSource(PTransform):
    def expand(self, pcoll):
        # Apply a DoFn transform that streams data
        return pcoll | beam.ParDo(GenerateStreamingData())

def my_sum(*args):
  # print(args)
  return sum(*args)

def run():
    pipeline_options = beam.options.pipeline_options.PipelineOptions()
    pipeline_options.view_as(beam.options.pipeline_options.StandardOptions).streaming = True

    with beam.Pipeline(options=pipeline_options) as p:
        # Use the custom connector to read data into the pipeline
        (p
         | "Start" >> beam.Create([None])  # Create a dummy element to kickstart the stream
         | "Stream Data" >> MyStreamingSource()  # Use the custom streaming connector
         | "Apply Fixed Window" >> beam.WindowInto(
            FixedWindows(2),
            trigger=beam.transforms.trigger.Repeatedly(
                # beam.transforms.trigger.AfterProcessingTime(1),
                beam.transforms.trigger.AfterAny(
                  beam.transforms.trigger.AfterCount(5),
                  beam.transforms.trigger.AfterWatermark()),
                ),
            accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
          ) \
         | "Group By Key" >> beam.CombinePerKey(my_sum) \
         | "Print Data" >> beam.Map(print))

if __name__ == '__main__':
    run()

Overwriting beam-streaming-from-stdin.py


In [21]:
!python3 random-string-source.py | python3 beam-streaming-from-stdin.py

('Keep streaming!', 50529)
('Keep streaming!', 47961)
('Keep streaming!', 52676)
('Keep streaming!', 51043)
('Keep streaming!', 0)
('Keep streaming!', 0)
('Keep streaming!', 0)
('Keep streaming!', 0)
('Machine Learning!', 47381)
('Machine Learning!', 53096)
('Machine Learning!', 49892)
('Machine Learning!', 50237)
('Machine Learning!', 0)
('Machine Learning!', 0)
('Machine Learning!', 0)
('Machine Learning!', 0)
('Keep learning!', 46242)
('Keep learning!', 52855)
('Keep learning!', 52091)
('Keep learning!', 52062)
('Keep learning!', 0)
('Keep learning!', 0)
('Keep learning!', 0)
('Keep learning!', 0)
('Python is fun!', 48348)
('Python is fun!', 53153)
('Python is fun!', 49419)
('Python is fun!', 50698)
('Python is fun!', 0)
('Python is fun!', 0)
('Python is fun!', 0)
('Python is fun!', 0)
('Apache Beam is awesome!', 43931)
('Apache Beam is awesome!', 53218)
('Apache Beam is awesome!', 51499)
('Apache Beam is awesome!', 52302)
('Apache Beam is awesome!', 0)
('Apache Beam is awesome!', 0

In [14]:
!wget https://file-examples.com/storage/fe3ab4a58d66f32059e89a1/2017/04/file_example_MP4_480_1_5MG.mp4

--2024-09-24 21:17:53--  https://file-examples.com/storage/fe3ab4a58d66f32059e89a1/2017/04/file_example_MP4_480_1_5MG.mp4
Resolving file-examples.com (file-examples.com)... 185.135.88.81
Connecting to file-examples.com (file-examples.com)|185.135.88.81|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1570024 (1.5M) [video/mp4]
Saving to: ‘file_example_MP4_480_1_5MG.mp4’


2024-09-24 21:17:54 (2.39 MB/s) - ‘file_example_MP4_480_1_5MG.mp4’ saved [1570024/1570024]



**Composite Transforms**

Example of streaming a video into frames (i.e. images) and processing them in a composite transform. Then splitting the pipeline into two to process the frames further.

In [22]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import cv2

class StreamVideoFrames(beam.DoFn):
    def __init__(self, source):
        self.source = source

    def setup(self):
        # Open the video source only once
        self.cap = cv2.VideoCapture(self.source)

    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        """Process video frames from the source."""
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            if not ret:
                break
            # You can assign a timestamp or just yield the frame
            yield frame

    def teardown(self):
        # Release the video source when done
        self.cap.release()

class GrayscaleAndHashTransform(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
         | "Grayscale Frame" >> beam.Map(lambda frame: cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY))
         | "Draw Bounding Box" >> beam.Map(lambda frame: cv2.rectangle(frame, (50, 50), (200, 200), (25, 0, 180), 2))
         | "Add Image Hash" >> beam.Map(lambda frame: (frame, cv2.img_hash.pHash(frame))))

def run_pipeline(source=0):
    options = PipelineOptions(streaming=True)
    with beam.Pipeline(options=options) as p:
        frames = (p
         | "Start" >> beam.Create([None])  # Start the pipeline with an initial dummy element
         | "Stream Video Frames" >> beam.ParDo(StreamVideoFrames(source='file_example_MP4_480_1_5MG.mp4'))
        )

        shapes = (frames
         | "Grayscale and Hash" >> GrayscaleAndHashTransform()
         | "Print Frame Shape" >> beam.Map(lambda element: print(element[0].shape))
        )

        hashes = (frames
         | "Grayscale and Hash2" >> GrayscaleAndHashTransform()
         | "Print Image Hash" >> beam.Map(lambda element: print(element[1]))
        )

if __name__ == '__main__':
    run_pipeline()



(270, 480)
[[243 187  60 204 142 227 252  51]]
(270, 480)
[[243 187  60 204 142 227 252  51]]
(270, 480)
[[243 187  60 204 142 227 252  51]]
(270, 480)
[[243 255  60 204 142 227 252  51]]
(270, 480)
[[243 255  60 204 142 243 236  51]]
(270, 480)
[[243 251  60 204 142 243 252  51]]
(270, 480)
[[243 251  60 204 142 243 252  51]]
(270, 480)
[[243 255  60 204 142 227 252  51]]
(270, 480)
[[243 255  60 204 142 227 252  51]]
(270, 480)
[[243 251  60 204 142 243 252  51]]
(270, 480)
[[243 187  60 204 142 243 252  51]]
(270, 480)
[[243 187  60 204 142 243 252  51]]
(270, 480)
[[243 187  60 204 142 243 252 115]]
(270, 480)
[[243 187  60 204 142 243 252  51]]
(270, 480)
[[243 187  60 204 142 243 252  51]]
(270, 480)
[[243 187  60 204 142 251 252  51]]
(270, 480)
[[243 187  60 204 142 251 252  51]]
(270, 480)
[[243 187  60 204 142 243 252  51]]
(270, 480)
[[243 187  60 204 142 243 252  51]]
(270, 480)
[[243 187  60 204 142 115 252  51]]
(270, 480)
[[243 187  60 204 142  99 252  51]]
(270, 480)
[[