<a href="https://colab.research.google.com/github/SanjayBhargavKudupudi/DATA_MINING_4/blob/main/Beam/Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install apache-beam[interactive]


Collecting apache-beam[interactive]
  Downloading apache_beam-2.51.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.7/14.7 MB[0m [31m21.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting crcmod<2.0,>=1.7 (from apache-beam[interactive])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam[interactive])
  Downloading orjson-3.9.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (138 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m138.7/138.7 kB[0m [31m14.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam[interactive])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1

In [2]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Read lines" >> beam.Create(["Hello Beam", "Apache Beam is awesome"])
        | "Print lines" >> beam.Map(print)
    )


Hello Beam
Apache Beam is awesome


In [3]:
#@title Composite Transform
class SplitAndLength(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Split" >> beam.FlatMap(lambda x: x.split(" "))
            | "Length" >> beam.Map(lambda x: (x, len(x)))
        )

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Read lines" >> beam.Create(["Hello Beam", "Apache Beam is cool"])
        | "Split and length" >> SplitAndLength()
        | "Print output" >> beam.Map(print)
    )


('Hello', 5)
('Beam', 4)
('Apache', 6)
('Beam', 4)
('is', 2)
('cool', 4)


In [6]:
# @title Windowing
with beam.Pipeline() as pipeline:
    (
        pipeline
        | beam.Create([
            beam.window.TimestampedValue((1, 1), 1),
            beam.window.TimestampedValue((2, 1), 2),
            beam.window.TimestampedValue((3, 5), 5),
            beam.window.TimestampedValue((4, 7), 7)
        ])
        | beam.WindowInto(beam.window.FixedWindows(2))
        | beam.Map(print)
    )


(1, 1)
(2, 1)
(3, 5)
(4, 7)


In [8]:
#@title Triggers
with beam.Pipeline() as pipeline:
    (
        pipeline
        | beam.Create([
            beam.window.TimestampedValue((1, 1), 1),
            beam.window.TimestampedValue((2, 1), 2),
            beam.window.TimestampedValue((3, 5), 5),
            beam.window.TimestampedValue((4, 7), 7)
        ])
        | beam.WindowInto(beam.window.FixedWindows(2), trigger=beam.transforms.trigger.AfterCount(2), accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
        | beam.Map(print)
    )


(1, 1)
(2, 1)
(3, 5)
(4, 7)


In [9]:
# @title Pardo
class ProcessWords(beam.DoFn):
    def process(self, element):
        if "Beam" in element:
            yield element

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Read lines" >> beam.Create(["Hello Beam", "Apache is cool"])
        | "Filter Beam words" >> beam.ParDo(ProcessWords())
        | "Print output" >> beam.Map(print)
    )


Hello Beam
