<a href="https://colab.research.google.com/github/SiriBatchu/Apache-Beam/blob/main/Apache_Beam_Features.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.60.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cloudpickle~=2.2.1 (from apache-beam)
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastavro<2,>=0.23.6 (from apache-beam)
  Downloading fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam)
  Do

In [9]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Define a basic pipeline
def run_basic_pipeline():
    # Create a pipeline using DirectRunner (local execution)
    with beam.Pipeline(options=PipelineOptions()) as pipeline:
        # Simple pipeline that multiplies each number by 10
        numbers = (pipeline
                   | 'Create Numbers' >> beam.Create([1, 2, 3, 4, 5])
                   | 'Multiply by 5' >> beam.Map(lambda x: x * 5)
                   | 'Print Output' >> beam.Map(print))

# Run the basic pipeline
run_basic_pipeline()




5
10
15
20
25


In [10]:
# Composite transform to multiply numbers by 10 and add 5
class MultiplyAndAdd(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | 'Multiply by 5' >> beam.Map(lambda x: x * 5)
                | 'Add 5' >> beam.Map(lambda x: x + 5))

def run_composite_transform_pipeline():
    with beam.Pipeline(options=PipelineOptions()) as pipeline:
        (pipeline
         | 'Create Numbers' >> beam.Create([1, 2, 3, 4, 5])
         | 'Apply Composite Transform' >> MultiplyAndAdd()
         | 'Print Output' >> beam.Map(print))

run_composite_transform_pipeline()




10
15
20
25
30


In [11]:
def run_pipeline_io():
    # Reading from a text file and writing to another text file
    with beam.Pipeline(options=PipelineOptions()) as pipeline:
        (pipeline
         | 'Read from File' >> beam.io.ReadFromText('input.txt')
         | 'Convert to Uppercase' >> beam.Map(str.upper)
         | 'Write to File' >> beam.io.WriteToText('output'))

# Create a simple input file
with open('input.txt', 'w') as f:
    f.write("hello\nworld")

run_pipeline_io()

# Output file will be written in the 'output-00000-of-00001' file.
!cat output-00000-of-00001



HELLO
WORLD


In [12]:
class MultiplyDoFn(beam.DoFn):
    def process(self, element):
        yield element * 5

def run_pardo_pipeline():
    with beam.Pipeline(options=PipelineOptions()) as pipeline:
        (pipeline
         | 'Create Numbers' >> beam.Create([1, 2, 3, 4, 5])
         | 'Multiply using ParDo' >> beam.ParDo(MultiplyDoFn())
         | 'Print Output' >> beam.Map(print))

run_pardo_pipeline()




5
10
15
20
25


In [13]:
import time
import random

def run_windowing_pipeline():
    with beam.Pipeline(options=PipelineOptions()) as pipeline:
        (pipeline
         | 'Create Numbers with Timestamps' >> beam.Create([(i, random.randint(1, 50)) for i in range(10)])
         | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time()))
         | 'Window into Fixed Intervals' >> beam.WindowInto(beam.window.FixedWindows(10))
         | 'Sum per Window' >> beam.CombinePerKey(sum)
         | 'Print Output' >> beam.Map(print))

run_windowing_pipeline()



(0, 14)
(1, 18)
(2, 42)
(3, 11)
(4, 9)
(5, 7)
(6, 37)
(7, 10)
(8, 43)
(9, 35)


In [14]:
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
import time
import random
from apache_beam import window

def run_trigger_pipeline():
    with beam.Pipeline(options=PipelineOptions()) as pipeline:
        (pipeline
         | 'Create Stream' >> beam.Create([(i, random.randint(1, 50)) for i in range(10)])
         | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time()))
         | 'Window into Fixed Intervals' >> beam.WindowInto(
             beam.window.FixedWindows(5),
             trigger=AfterWatermark(),  # Use AfterWatermark trigger
             accumulation_mode=AccumulationMode.DISCARDING)
         | 'Sum per Window with Trigger' >> beam.CombinePerKey(sum)
         | 'Print Output' >> beam.Map(print))

run_trigger_pipeline()




(0, 44)
(1, 11)
(2, 14)
(3, 47)
(4, 31)
(5, 25)
(6, 1)
(7, 20)
(8, 40)
(9, 23)


In [15]:
import time

def run_streaming_pipeline():
    with beam.Pipeline(options=PipelineOptions(streaming=True)) as pipeline:
        (pipeline
         | 'Generate Sequence' >> beam.io.ReadFromPubSub(topic='projects/my-project/topics/my-topic')
         | 'Add 5 to each element' >> beam.Map(lambda x: int(x) + 5)
         | 'Print Output' >> beam.Map(print))

# Note: The streaming example requires a real Pub/Sub source for true streaming.