##Installing Apache Beam



In [1]:
!pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.61.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.4 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cloudpickle~=2.2.1 (from apache-beam)
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastavro<2,>=0.23.6 (from apache-beam)
  Downloading fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam)
  Do

In [3]:
!kaggle datasets download raidevesh05/movie-ratings-dataset

Dataset URL: https://www.kaggle.com/datasets/raidevesh05/movie-ratings-dataset
License(s): ODbL-1.0
Downloading movie-ratings-dataset.zip to /content
  0% 0.00/20.2k [00:00<?, ?B/s]
100% 20.2k/20.2k [00:00<00:00, 29.7MB/s]


In [4]:
!unzip movie-ratings-dataset.zip

Archive:  movie-ratings-dataset.zip
  inflating: movie_ratings.csv       


##Creating Apache Beam Pipeline

In [5]:
class ExtractAndCountWords(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Extract Words' >> beam.FlatMap(lambda x: x.split())
            | 'Count Words' >> beam.combiners.Count.PerElement()
        )


##Pipeline I/O

In [6]:
with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Read File' >> beam.io.ReadFromText('movie_ratings.csv')
        | 'Transform Data' >> beam.Map(lambda x: x.upper())
        | 'Write to File' >> beam.io.WriteToText('output.txt')
    )




##Triggers and Windowing

In [7]:
from apache_beam.transforms.window import FixedWindows

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Create Data' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
        | 'Window' >> beam.WindowInto(FixedWindows(10))
        | 'Sum per Key' >> beam.CombinePerKey(sum)
        | 'Print Results' >> beam.Map(print)
    )

('a', 4)
('b', 2)


##ParDo

In [8]:
class CustomTransform(beam.DoFn):
    def process(self, element):
        yield element.upper()

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Read' >> beam.Create(['hello', 'world'])
        | 'Transform' >> beam.ParDo(CustomTransform())
        | 'Write' >> beam.io.WriteToText('output.txt')
    )

##Streaming

In [9]:
def generate_data():
    for i in range(10):
        yield f"Data {i}"

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Read Stream' >> beam.Create(generate_data())
        | 'Print Stream' >> beam.Map(print)
    )

Data 0
Data 1
Data 2
Data 3
Data 4
Data 5
Data 6
Data 7
Data 8
Data 9
