<a href="https://colab.research.google.com/github/chandini2595/Exploratory_Data_Analysis/blob/main/Apache_Beam_Features.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Install Apache Beam

In [None]:
!pip install apache-beam

Collecting apache-beam[gcp]
  Downloading apache_beam-2.59.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[gcp])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam[gcp])
  Downloading orjson-3.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m621.9 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam[gcp])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro<2,>=0.

#Basic Apache Beam Pipeline for Transforming and Printing Data

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Define pipeline options
options = PipelineOptions()

# Create a pipeline
with beam.Pipeline(options=options) as p:
    (p
     | 'Read Input' >> beam.Create(['Hello', 'Apache', 'Beam', 'Streaming'])
     | 'Convert to Uppercase' >> beam.Map(lambda x: x.upper())
     | 'Print Results' >> beam.Map(print)
    )





HELLO
APACHE
BEAM
STREAMING


#Composite Transform

In [None]:
class ProcessText(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | 'Convert to Uppercase' >> beam.Map(lambda x: x.upper())
                | 'Add Suffix' >> beam.Map(lambda x: f"{x} - processed"))

with beam.Pipeline(options=options) as p:
    (p
     | 'Read Input' >> beam.Create(['apple', 'banana', 'cherry'])
     | 'Process Text' >> ProcessText()
     | 'Print Results' >> beam.Map(print)
    )



APPLE - processed
BANANA - processed
CHERRY - processed


#Pipeline IO

In [None]:
with beam.Pipeline(options=options) as p:
    (p
     | 'Read File' >> beam.io.ReadFromText('/content/sample_data/input.txt')
     | 'Process Text' >> beam.Map(lambda x: x.upper())
     | 'Write Results' >> beam.io.WriteToText('/content/sample_data/output.txt')
    )



#Triggers

In [None]:
import time
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode
from apache_beam.options.pipeline_options import PipelineOptions

# Pipeline options
options = PipelineOptions()

# Simulate streaming data using a generator function
def generate_streaming_data():
    for i in range(10):
        yield f"event_{i}"
        time.sleep(1)  # Simulates data arriving every second

# Apache Beam pipeline
with beam.Pipeline(options=options) as p:
    (p
     | 'Create Stream' >> beam.Create(generate_streaming_data())
     | 'Window' >> beam.WindowInto(
            FixedWindows(5),
            trigger=AfterProcessingTime(3),
            accumulation_mode=AccumulationMode.DISCARDING  # Specify accumulation mode
        )
     | 'Print Results' >> beam.Map(print)
    )



event_0
event_1
event_2
event_3
event_4
event_5
event_6
event_7
event_8
event_9


#Windowing

In [None]:
import time
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.options.pipeline_options import PipelineOptions

# Pipeline options
options = PipelineOptions()

# Simulate streaming data using a generator function
def generate_streaming_data():
    for i in range(10):
        yield f"event_{i}"
        time.sleep(1)  # Simulates data arriving every second

# Apache Beam pipeline with Windowing
with beam.Pipeline(options=options) as p:
    (p
     | 'Create Stream' >> beam.Create(generate_streaming_data())  # Simulating streaming data
     | 'Window into Fixed Intervals' >> beam.WindowInto(FixedWindows(5))  # Window data into 5-second chunks
     | 'Print Windowed Results' >> beam.Map(print)  # Print results for each window
    )



event_0
event_1
event_2
event_3
event_4
event_5
event_6
event_7
event_8
event_9


#ParDo

In [None]:
class ProcessElement(beam.DoFn):
    def process(self, element):
        yield element + " processed"

with beam.Pipeline(options=options) as p:
    (p
     | 'Read Input' >> beam.Create(['element1', 'element2', 'element3'])
     | 'Apply ParDo' >> beam.ParDo(ProcessElement())
     | 'Print Results' >> beam.Map(print)
    )



element1 processed
element2 processed
element3 processed
