#Step 1: Set Up the Colab Environment

In [9]:
!pip install apache-beam[gcp] beam-nuggets --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m74.7/74.7 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.5/54.5 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.0/45.0 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m14.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m93.1/93.1 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m105.0/105.0 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following depen

In [2]:
!pip install transformers --quiet

# Step 2: Creating a Basic Pipeline

In [3]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

In [11]:
import apache_beam as beam

# Sample dataset of e-commerce transactions
sample_data = [
    {'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000},
    {'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500},
    {'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150},
    {'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30},
    {'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20}
]

# Function to calculate the total cost of each transaction
def calculate_total_cost(transaction):
    total_cost = transaction['quantity'] * transaction['price']
    transaction['total_cost'] = total_cost
    return transaction

# Define the pipeline
with beam.Pipeline() as pipeline:

    # Step 1: Create a PCollection from the sample data
    transactions = pipeline | 'Create transactions' >> beam.Create(sample_data)

    # Step 2: Apply a ParDo to calculate the total cost for each transaction
    total_costs = transactions | 'Calculate total cost' >> beam.Map(calculate_total_cost)

    # Step 3: Print the results (transactions with total cost)
    total_costs | 'Print results' >> beam.Map(print)


{'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000, 'total_cost': 1000}
{'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500, 'total_cost': 1000}
{'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150, 'total_cost': 450}
{'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30, 'total_cost': 150}
{'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20, 'total_cost': 200}


# Step 3: Adding Windowing and Triggers
Windowing allows us to divide the data into logical chunks (windows) based on time. Triggers define when the results for each window should be emitted.

In this step, we’ll simulate streaming e-commerce transactions over time and use windowing to group transactions into 1-minute intervals.

In [12]:
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
import time

# Simulated streaming data with timestamps
sample_streaming_data = [
    {'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000, 'timestamp': 1},
    {'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500, 'timestamp': 2},
    {'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150, 'timestamp': 61},
    {'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30, 'timestamp': 62},
    {'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20, 'timestamp': 120}
]

# Function to calculate the total cost
def calculate_total_cost(transaction):
    total_cost = transaction['quantity'] * transaction['price']
    transaction['total_cost'] = total_cost
    return transaction

# Simulate event-time using Beam's timestamp
class AddTimestamp(beam.DoFn):
    def process(self, element):
        yield beam.window.TimestampedValue(element, element['timestamp'])

# Define the pipeline
with beam.Pipeline() as pipeline:

    # Step 1: Create a PCollection from the streaming data and add timestamps
    transactions = (pipeline
                    | 'Create streaming transactions' >> beam.Create(sample_streaming_data)
                    | 'Add timestamps' >> beam.ParDo(AddTimestamp()))

    # Step 2: Apply windowing to group transactions into 1-minute windows
    windowed_transactions = (transactions
                             | 'Apply windowing' >> beam.WindowInto(
                                 FixedWindows(60),
                                 trigger=AfterWatermark(),  # Trigger on watermark
                                 accumulation_mode=AccumulationMode.DISCARDING))

    # Step 3: Calculate total cost for each transaction
    total_costs = windowed_transactions | 'Calculate total cost' >> beam.Map(calculate_total_cost)

    # Step 4: Print the results for each window
    total_costs | 'Print results' >> beam.Map(print)


{'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000, 'timestamp': 1, 'total_cost': 1000}
{'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500, 'timestamp': 2, 'total_cost': 1000}
{'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150, 'timestamp': 61, 'total_cost': 450}
{'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30, 'timestamp': 62, 'total_cost': 150}
{'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20, 'timestamp': 120, 'total_cost': 200}


#Step 4: Using ParDo for Complex Processing
ParDo is one of Apache Beam’s most powerful transforms. It allows you to perform more complex processing on each element in a PCollection, especially when you need to work with side outputs, handle multiple data fields, or perform multi-step processing.

In [13]:
import apache_beam as beam

# Function to calculate total cost and discounted price, and separate high/low value transactions
class ProcessTransaction(beam.DoFn):
    def process(self, transaction):
        # Calculate total cost
        transaction['total_cost'] = transaction['quantity'] * transaction['price']

        # Apply a discount of 10% if the total cost is greater than $500
        if transaction['total_cost'] > 500:
            transaction['discounted_price'] = transaction['total_cost'] * 0.9
        else:
            transaction['discounted_price'] = transaction['total_cost']

        # Categorize into high-value and low-value transactions
        if transaction['total_cost'] > 1000:
            yield beam.pvalue.TaggedOutput('high_value', transaction)  # Side output: high-value
        else:
            yield beam.pvalue.TaggedOutput('low_value', transaction)   # Side output: low-value

# Define the pipeline
with beam.Pipeline() as pipeline:

    # Step 1: Create a PCollection from the sample data
    sample_data = [
        {'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000},
        {'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500},
        {'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150},
        {'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30},
        {'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20}
    ]

    transactions = pipeline | 'Create transactions' >> beam.Create(sample_data)

    # Step 2: Apply ParDo to process transactions
    processed_transactions = (transactions
                              | 'Process Transactions' >> beam.ParDo(ProcessTransaction())
                              .with_outputs('high_value', 'low_value'))

    # Step 3: Print high-value transactions
    high_value_transactions = processed_transactions.high_value
    high_value_transactions | 'Print high-value transactions' >> beam.Map(print)

    # Step 4: Print low-value transactions
    low_value_transactions = processed_transactions.low_value
    low_value_transactions | 'Print low-value transactions' >> beam.Map(print)


{'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000, 'total_cost': 1000, 'discounted_price': 900.0}
{'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500, 'total_cost': 1000, 'discounted_price': 900.0}
{'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150, 'total_cost': 450, 'discounted_price': 450}
{'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30, 'total_cost': 150, 'discounted_price': 150}
{'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20, 'total_cost': 200, 'discounted_price': 200}


# Step 5: Working with Streaming Data
Since we’re in Colab and can’t directly stream real-time data like a production environment, we will simulate streaming by:

Using a generator to produce new transactions at fixed intervals (mimicking a stream).
Processing the transactions as they arrive in real time.


In [19]:
import apache_beam as beam
import time

# Predefined transaction data
sample_data = [
    {'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000},
    {'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500},
    {'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150},
    {'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30},
    {'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20}
]

# Function to calculate total cost and discounted price
class ProcessTransaction(beam.DoFn):
    def process(self, transaction):
        transaction['total_cost'] = transaction['quantity'] * transaction['price']

        # Apply a discount of 10% if the total cost is greater than $500
        if transaction['total_cost'] > 500:
            transaction['discounted_price'] = transaction['total_cost'] * 0.9
        else:
            transaction['discounted_price'] = transaction['total_cost']

        # Categorize into high-value and low-value transactions
        if transaction['total_cost'] > 1000:
            yield beam.pvalue.TaggedOutput('high_value', transaction)
        else:
            yield beam.pvalue.TaggedOutput('low_value', transaction)

# Define the pipeline
def run_pipeline():
    with beam.Pipeline() as pipeline:

        # Step 1: Process each transaction one at a time with a delay to simulate streaming
        for idx, transaction in enumerate(sample_data):
            time.sleep(2)  # Simulate a delay between transaction arrivals

            # Unique label for each transaction
            transaction_label = f'CreateTransaction_{idx}'

            # Create a PCollection for the current transaction
            transactions = pipeline | transaction_label >> beam.Create([transaction])

            # Step 2: Apply ParDo with a unique label for processing each transaction
            processed_transactions = (transactions
                                      | f'ProcessTransaction_{idx}' >> beam.ParDo(ProcessTransaction())
                                      .with_outputs('high_value', 'low_value'))

            # Step 3: Print high-value transactions with a unique label
            high_value_label = f'PrintHighValue_{idx}'
            high_value_transactions = processed_transactions.high_value
            high_value_transactions | high_value_label >> beam.Map(print)

            # Step 4: Print low-value transactions with a unique label
            low_value_label = f'PrintLowValue_{idx}'
            low_value_transactions = processed_transactions.low_value
            low_value_transactions | low_value_label >> beam.Map(print)

# Run the simulated streaming pipeline
run_pipeline()


{'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20, 'total_cost': 200, 'discounted_price': 200}
{'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000, 'total_cost': 1000, 'discounted_price': 900.0}
{'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500, 'total_cost': 1000, 'discounted_price': 900.0}
{'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30, 'total_cost': 150, 'discounted_price': 150}
{'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150, 'total_cost': 450, 'discounted_price': 450}


# Step 6: Integrating BeamML for Machine Learning Tasks

BeamML allows us to use machine learning models and libraries in Apache Beam pipelines. We can run inference, apply pre-trained models, or even train models. In this step, we will focus on using a pre-trained machine learning model to classify transactions in real-time based on the amount (whether it's high, medium, or low).

1. Choosing a Simple Pre-trained Model
For simplicity, let’s use TensorFlow and a pre-trained model to classify the transaction amounts as either high, medium, or low based on the total_cost of each transaction.

We'll use a pre-trained decision tree or logistic regression model stored in TensorFlow's SavedModel format. If you don't have a pre-trained model on hand, we can simulate the behavior with a custom function for classification.

2. Step-by-Step Implementation
Transaction Classification: We will classify the total_cost of each transaction into three categories: high, medium, and low.
BeamML Integration: We'll integrate the machine learning model within the Apache Beam pipeline to perform inference on each transaction.

In [21]:
import apache_beam as beam

# Function to classify total_cost into categories: 'low', 'medium', or 'high'
def classify_transaction(transaction):
    total_cost = transaction['total_cost']

    if total_cost > 1000:
        transaction['classification'] = 'high'
    elif 500 < total_cost <= 1000:
        transaction['classification'] = 'medium'
    else:
        transaction['classification'] = 'low'

    return transaction

# Function to calculate total cost and discounted price, and then classify the transaction
class ProcessAndClassifyTransaction(beam.DoFn):
    def process(self, transaction):
        # Calculate total cost
        transaction['total_cost'] = transaction['quantity'] * transaction['price']

        # Apply a discount of 10% if the total cost is greater than $500
        if transaction['total_cost'] > 500:
            transaction['discounted_price'] = transaction['total_cost'] * 0.9
        else:
            transaction['discounted_price'] = transaction['total_cost']

        # Classify transaction based on total cost
        classified_transaction = classify_transaction(transaction)

        # Yield the classified transaction
        yield classified_transaction

# Define the pipeline
def run_pipeline_with_classification():
    with beam.Pipeline() as pipeline:

        # Sample data: transactions
        sample_data = [
            {'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000},
            {'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500},
            {'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150},
            {'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30},
            {'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20}
        ]

        # Step 1: Create a PCollection from the sample data
        transactions = pipeline | 'Create transactions' >> beam.Create(sample_data)

        # Step 2: Apply ParDo to process transactions and classify them
        processed_and_classified_transactions = transactions | 'Process and Classify Transactions' >> beam.ParDo(ProcessAndClassifyTransaction())

        # Step 3: Print classified transactions
        processed_and_classified_transactions | 'Print Classified Transactions' >> beam.Map(print)

# Run the pipeline
run_pipeline_with_classification()

{'transaction_id': 'T1001', 'product': 'Laptop', 'quantity': 1, 'price': 1000, 'total_cost': 1000, 'discounted_price': 900.0, 'classification': 'medium'}
{'transaction_id': 'T1002', 'product': 'Phone', 'quantity': 2, 'price': 500, 'total_cost': 1000, 'discounted_price': 900.0, 'classification': 'medium'}
{'transaction_id': 'T1003', 'product': 'Monitor', 'quantity': 3, 'price': 150, 'total_cost': 450, 'discounted_price': 450, 'classification': 'low'}
{'transaction_id': 'T1004', 'product': 'Keyboard', 'quantity': 5, 'price': 30, 'total_cost': 150, 'discounted_price': 150, 'classification': 'low'}
{'transaction_id': 'T1005', 'product': 'Mouse', 'quantity': 10, 'price': 20, 'total_cost': 200, 'discounted_price': 200, 'classification': 'low'}
