In [1]:
!pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.60.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cloudpickle~=2.2.1 (from apache-beam)
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastavro<2,>=0.23.6 (from apache-beam)
  Downloading fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam)
  Do

In [2]:
import apache_beam as beam

# Define the pipeline
with beam.Pipeline() as pipeline:
    # Example pipeline: Creating a simple collection of numbers
    data = (
        pipeline
        | 'Create data' >> beam.Create([1, 2, 3, 4, 5])
        | 'Multiply by 2' >> beam.Map(lambda x: x * 2)
        | 'Print results' >> beam.Map(print)
    )




2
4
6
8
10


In [3]:
# Define a custom composite transform
class MultiplyAndAddOne(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Multiply by 3' >> beam.Map(lambda x: x * 3)
            | 'Add 1' >> beam.Map(lambda x: x + 1)
        )

# Use the composite transform in the pipeline
with beam.Pipeline() as pipeline:
    transformed_data = (
        pipeline
        | 'Create numbers' >> beam.Create([1, 2, 3, 4, 5])
        | 'Apply Composite Transform' >> MultiplyAndAddOne()
        | 'Print transformed results' >> beam.Map(print)
    )


4
7
10
13
16


In [4]:
# Example: Writing numbers to a text file
with beam.Pipeline() as pipeline:
    numbers = (
        pipeline
        | 'Create numbers' >> beam.Create([10, 20, 30, 40, 50])
        | 'Write to file' >> beam.io.WriteToText('/content/numbers_output.txt')
    )


In [5]:
from apache_beam.transforms.window import FixedWindows

with beam.Pipeline() as pipeline:
    windowed_data = (
        pipeline
        | 'Generate data' >> beam.Create([1, 2, 3, 4, 5, 6])
        | 'Assign timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x))
        | 'Window into fixed intervals' >> beam.WindowInto(FixedWindows(2))  # 2-second fixed window
        | 'Sum within each window' >> beam.CombineGlobally(sum).without_defaults()
        | 'Print windowed results' >> beam.Map(print)
    )


1
5
9
6


In [6]:
class MultiplyFn(beam.DoFn):
    def process(self, element):
        yield element * 10

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create numbers' >> beam.Create([1, 2, 3, 4, 5])
        | 'Multiply by 10 using ParDo' >> beam.ParDo(MultiplyFn())
        | 'Print results' >> beam.Map(print)
    )


10
20
30
40
50


In [7]:
import time

def generate_data(pipeline):
    # Use fixed intervals to simulate streaming data
    return (
        pipeline
        | 'Create data stream' >> beam.Create([time.time() + i for i in range(5)])
        | 'Window by seconds' >> beam.WindowInto(FixedWindows(2))
        | 'Print stream data' >> beam.Map(print)
    )

with beam.Pipeline() as pipeline:
    stream = generate_data(pipeline)


1731116742.748515
1731116743.748516
1731116744.7485166
1731116745.7485168
1731116746.748517
