# Initial Setup

In [1]:
# Install Apache Beam
!pip install apache-beam[interactive]

# Import required libraries
import apache_beam as beam

# Define a simple pipeline to verify the setup
def run_simple_pipeline():
    with beam.Pipeline() as p:
        # Create a simple pipeline that multiplies numbers by 2
        input_data = [1, 2, 3, 4, 5]

        result = (
            p
            | 'Create Data' >> beam.Create(input_data)
            | 'Multiply by 2' >> beam.Map(lambda x: x * 2)
            | 'Print results' >> beam.Map(print)
        )

# Run the pipeline
run_simple_pipeline()


Collecting apache-beam[interactive]
  Downloading apache_beam-2.60.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[interactive])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam[interactive])
  Downloading orjson-3.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m930.8 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam[interactive])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25h

2
4
6
8
10


In [2]:
# Define a composite transform
class MultiplyAndAdd(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Multiply by 2' >> beam.Map(lambda x: x * 2)
            | 'Add 3' >> beam.Map(lambda x: x + 3)
        )

# Define a pipeline to use the composite transform
def run_composite_transform():
    with beam.Pipeline() as p:
        input_data = [1, 2, 3, 4, 5]

        result = (
            p
            | 'Create Data' >> beam.Create(input_data)
            | 'Apply Composite Transform' >> MultiplyAndAdd()
            | 'Print results' >> beam.Map(print)
        )

# Run the pipeline
run_composite_transform()


5
7
9
11
13


In [3]:
# Sample customer reviews (simulated data)
customer_reviews = [
    "Great service!",
    "Average experience.",
    "Will not recommend.",
    "Loved it, highly recommended!",
    "It was okay."
]

# Save the reviews to a file (as input)
with open('customer_reviews.txt', 'w') as f:
    for review in customer_reviews:
        f.write(review + '\n')

# Now, we'll create a Beam pipeline to read this file, modify the reviews, and write the output to a new file
def run_pipeline_io():
    with beam.Pipeline() as p:
        # Read data from the input text file
        reviews = (
            p
            | 'Read Reviews' >> beam.io.ReadFromText('customer_reviews.txt')
            | 'Transform Reviews' >> beam.Map(lambda review: review.upper())  # Convert to uppercase
            | 'Write Transformed Reviews' >> beam.io.WriteToText('transformed_reviews.txt')
        )

# Run the pipeline
run_pipeline_io()


In [7]:
import time
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterCount, AccumulationMode

# Simulated real-time reviews (we'll manually "stream" them by delaying their entry)
reviews_stream = [
    "Great service!",
    "Average experience.",
    "Loved it, highly recommended!",
    "Will not recommend.",
    "It was okay.",
    "Amazing staff!",
    "Decent pricing."
]

# Define the pipeline for streaming with windowing and triggers
def run_windowing_pipeline():
    with beam.Pipeline() as p:
        # Simulate streaming input with windowing and trigger
        reviews = (
            p
            | 'Create Reviews Stream' >> beam.Create(reviews_stream)
            | 'Window into Fixed Intervals' >> beam.WindowInto(
                FixedWindows(60),  # 1-minute windows
                trigger=beam.trigger.AfterCount(3),  # Trigger after 3 elements
                accumulation_mode=AccumulationMode.DISCARDING
            )
            | 'Print Reviews' >> beam.Map(print)  # Simulate processing by printing results
        )

# Simulate streaming reviews by delaying entries (we'll manually delay in this case)
for review in reviews_stream:
    print(f"New review received: {review}")
    time.sleep(1)  # Simulate time delay between incoming reviews

# Run the pipeline
run_windowing_pipeline()


New review received: Great service!
New review received: Average experience.
New review received: Loved it, highly recommended!
New review received: Will not recommend.
New review received: It was okay.
New review received: Amazing staff!
New review received: Decent pricing.
Great service!
Average experience.
Loved it, highly recommended!
Will not recommend.
It was okay.
Amazing staff!
Decent pricing.


In [9]:
# Import additional necessary libraries
import apache_beam as beam

# Define the ParDo class to perform sentiment analysis
class SentimentAnalysis(beam.DoFn):
    def process(self, review):
        positive_words = ['great', 'loved', 'amazing', 'recommended']
        negative_words = ['not recommend', 'average', 'okay', 'will not']

        # Simple sentiment analysis based on keyword matching
        if any(word in review.lower() for word in positive_words):
            yield f"Positive Review: {review}"
        elif any(word in review.lower() for word in negative_words):
            yield f"Negative Review: {review}"
        else:
            yield f"Neutral Review: {review}"

# Define the pipeline for ParDo transform
def run_pardo_pipeline():
    with beam.Pipeline() as p:
        reviews = (
            p
            | 'Create Reviews Stream' >> beam.Create(reviews_stream)
            | 'Sentiment Analysis' >> beam.ParDo(SentimentAnalysis())  # Apply custom ParDo for sentiment analysis
            | 'Print Results' >> beam.Map(print)  # Output the results
        )

# Simulate streaming reviews by delaying entries (we'll manually delay in this case)
for review in reviews_stream:
    print(f"New review received: {review}")
    time.sleep(1)  # Simulate time delay between incoming reviews

# Run the ParDo pipeline
run_pardo_pipeline()


New review received: Great service!
New review received: Average experience.
New review received: Loved it, highly recommended!
New review received: Will not recommend.
New review received: It was okay.
New review received: Amazing staff!
New review received: Decent pricing.
Positive Review: Great service!
Negative Review: Average experience.
Positive Review: Loved it, highly recommended!
Negative Review: Will not recommend.
Negative Review: It was okay.
Positive Review: Amazing staff!
Neutral Review: Decent pricing.


In [13]:
import apache_beam as beam
import time
from apache_beam.options.pipeline_options import PipelineOptions

# Simulate streaming real-time reviews using a generator inside a DoFn
class GenerateRealTimeReviews(beam.DoFn):
    def process(self, element):
        reviews_stream = [
            "Great service!",
            "Average experience.",
            "Will not recommend.",
            "Loved it, highly recommended!",
            "It was okay.",
            "Amazing staff!",
            "Decent pricing."
        ]
        for review in reviews_stream:
            yield review
            time.sleep(2)  # Simulate time interval between reviews

# Example of a simple BeamML integration (Dummy model for now)
class DummyMLInference(beam.DoFn):
    def process(self, review):
        # Example: Predict a "satisfaction score" for each review (This is a dummy example)
        satisfaction_score = len(review) % 10  # Simple mock prediction based on length of review
        yield f"Review: {review}, Predicted Satisfaction Score: {satisfaction_score}"

# Define the streaming pipeline with BeamML integration
def run_streaming_pipeline():
    options = PipelineOptions(streaming=True)  # Enable streaming mode
    with beam.Pipeline(options=options) as p:
        reviews = (
            p
            | 'Start' >> beam.Create([None])  # Start the pipeline with a dummy element
            | 'Generate Reviews Stream' >> beam.ParDo(GenerateRealTimeReviews())  # Simulate streaming source
            | 'Sentiment Analysis' >> beam.ParDo(SentimentAnalysis())  # Reuse the ParDo for sentiment analysis
            | 'ML Inference' >> beam.ParDo(DummyMLInference())  # Apply a simple ML inference using BeamML
            | 'Print Results' >> beam.Map(print)  # Output the results
        )

# Run the streaming pipeline
run_streaming_pipeline()




Review: Positive Review: Great service!, Predicted Satisfaction Score: 1
Review: Negative Review: Average experience., Predicted Satisfaction Score: 6
Review: Negative Review: Will not recommend., Predicted Satisfaction Score: 6
Review: Positive Review: Loved it, highly recommended!, Predicted Satisfaction Score: 6
Review: Negative Review: It was okay., Predicted Satisfaction Score: 9
Review: Positive Review: Amazing staff!, Predicted Satisfaction Score: 1
Review: Neutral Review: Decent pricing., Predicted Satisfaction Score: 1
