<a href="https://colab.research.google.com/github/Akshata4/data_mining/blob/main/apachebeam/apachebeam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This pipeline demonstrates several Apache Beam concepts:

1.  **Pipeline I/O (`beam.io.ReadFromText`)**: Reads data from the `sample_data.txt` file, creating an initial `PCollection` of strings.
2.  **Composite Transform (`ProcessData`)**: This is a custom transform that encapsulates a sequence of operations:
    *   `SplitData` (`beam.Map`): Splits each input string by commas into a list of strings.
    *   `FilterApples` (`beam.Filter`): Keeps only the elements where the third element (item name) is 'apple'.
    *   `ExtractTimestamp` (`beam.Map`): This was originally intended to extract the timestamp but was modified in the composite transform to simply pass through the split elements. The actual timestamp extraction with a `TimestampedValue` is done in the subsequent `ParDo`.
3.  **ParDo (`ExtractTimestampDoFn`)**: A custom `DoFn` is used to process each element. It parses the timestamp string from the element and associates a `TimestampedValue` with each element, which is necessary for windowing. Elements that cannot be processed are skipped.
4.  **Windowing (`beam.WindowInto(FixedWindows(30 * 60))`)**: Groups elements into fixed windows of 30 minutes based on their timestamps. This allows for processing elements within specific time intervals.
5.  **Map (`ExtractItemAndCount`)**: Transforms each element in the windowed data. It creates a tuple containing the original element and a count of 1. This structure is prepared for potential future aggregations (though not fully utilized in this example).
6.  **Filter (`FilterCounts`)**: Keeps only the elements where the count is 1. In this simple case, it effectively passes all elements from the previous step, but demonstrates the use of a filter.
7.  **Partition (`beam.Partition`)**: Splits the `PCollection` into multiple `PCollections` based on a partitioning function (`partition_by_item`). The `partition_by_item` function determines which output `PCollection` an element belongs to based on the item name (apple, banana, or other).
8.  **Map (Printing Partitions)**: Separate `Map` transforms are applied to each partitioned `PCollection` to print the elements belonging to each partition.

This example illustrates how these different transforms can be chained together to build a data processing pipeline in Apache Beam.

In [20]:
import os

# Create a sample data file
data = ["1,2023-01-01 10:00:00,apple",
        "2,2023-01-01 10:05:00,banana",
        "3,2023-01-01 10:10:00,apple",
        "4,2023-01-01 10:15:00,orange",
        "5,2023-01-01 10:20:00,banana",
        "6,2023-01-01 11:00:00,apple",
        "7,2023-01-01 11:05:00,banana",
        "8,2023-01-01 11:10:00,apple",
        "9,2023-01-01 11:15:00,orange",
        "10,2023-01-01 11:20:00,banana"]

with open('sample_data.txt', 'w') as f:
    for line in data:
        f.write(line + '\n')

print("Sample data generated in sample_data.txt")

Sample data generated in sample_data.txt


In [21]:
import apache_beam as beam
from apache_beam import transforms
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
import re

# Define a simple composite transform
@beam.ptransform_fn
def ProcessData(pcollection):
    """
    A composite transform that processes the data.
    """
    return (
        pcollection
        | 'SplitData' >> beam.Map(lambda x: x.split(','))
        | 'FilterApples' >> beam.Filter(lambda x: x[2] == 'apple')
        | 'ExtractTimestamp' >> beam.Map(lambda x: (x[0], x[1], x[2])) # Keep all parts for now
    )

In [25]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from datetime import datetime

# Define a DoFn for ParDo
class ExtractTimestampDoFn(beam.DoFn):
    def process(self, element):
        try:
            # Assuming the timestamp is the second element (index 1) after splitting
            timestamp_str = element[1]
            # Convert the timestamp string to a datetime object
            timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
            # Yield the element with the timestamp
            yield beam.window.TimestampedValue(element, timestamp.timestamp())
        except (ValueError, IndexError) as e:
            print(f"Could not process element: {element}, error: {e}")
            return # Skip elements that cannot be processed


# Define a partition function
def partition_by_item(element, num_partitions):
    """Partitions elements based on the item name."""
    # Element is now a tuple of (original_element, count)
    original_element = element[0]
    item = original_element[2] # Access the item name from the original element
    if item == 'apple':
        return 0  # Partition 0 for apples
    elif item == 'banana':
        return 1  # Partition 1 for bananas
    else:
        return 2  # Partition 2 for others

# Create and run the pipeline
with beam.Pipeline(options=PipelineOptions()) as pipeline:
    # Pipeline I/O: Read data from the text file
    lines = pipeline | 'ReadData' >> beam.io.ReadFromText('sample_data.txt')

    # Apply Composite Transform to split and filter data
    split_data = lines | 'SplitAndFilter' >> ProcessData()

    # Apply ParDo for timestamp extraction on the split data
    processed_data = split_data | 'ExtractTimestamp' >> beam.ParDo(ExtractTimestampDoFn())

    # Windowing: Apply fixed windows of 30 minutes
    windowed_data = processed_data | 'FixedWindows' >> beam.WindowInto(FixedWindows(30 * 60))

    # Map: Extract item and count (simple example)
    # Modified to keep the original element along with the count
    item_counts = windowed_data | 'ExtractItemAndCount' >> beam.Map(lambda x: (x, 1))

    # Filter: Keep only items with count 1 (in this simple case)
    filtered_counts = item_counts | 'FilterCounts' >> beam.Filter(lambda x: x[1] == 1)

    # Partition: Partition data based on item name
    partitioned_data = filtered_counts | 'PartitionByItem' >> beam.Partition(partition_by_item, 3)

    # Process each partition (example: print elements in each partition)
    apple_partition = partitioned_data[0] | 'PrintApples' >> beam.Map(lambda x: print(f"Apple Partition: {x}"))
    banana_partition = partitioned_data[1] | 'PrintBananas' >> beam.Map(lambda x: print(f"Banana Partition: {x}"))
    other_partition = partitioned_data[2] | 'PrintOthers' >> beam.Map(lambda x: print(f"Other Partition: {x}"))



Apple Partition: (('1', '2023-01-01 10:00:00', 'apple'), 1)
Apple Partition: (('3', '2023-01-01 10:10:00', 'apple'), 1)
Apple Partition: (('6', '2023-01-01 11:00:00', 'apple'), 1)
Apple Partition: (('8', '2023-01-01 11:10:00', 'apple'), 1)


Let's trace how the data changes as it flows through the pipeline:

1.  **ReadData (`beam.io.ReadFromText`)**: The initial data is a `PCollection` of strings, where each string is a line read from the `sample_data.txt` file (e.g., `"1,2023-01-01 10:00:00,apple"`).
2.  **SplitAndFilter (`ProcessData` composite transform)**:
    *   `SplitData` (`beam.Map`): Each string is split by commas into a list of strings (e.g., `['1', '2023-01-01 10:00:00', 'apple']`).
    *   `FilterApples` (`beam.Filter`): Only lists where the third element is 'apple' are kept (e.g., `['1', '2023-01-01 10:00:00', 'apple']` is kept, but `['2', '2023-01-01 10:05:00', 'banana']` is filtered out).
    *   `ExtractTimestamp` (`beam.Map`): The structure remains a list of strings (e.g., `['1', '2023-01-01 10:00:00', 'apple']`).
3.  **ExtractTimestamp (`beam.ParDo(ExtractTimestampDoFn)`)**: Each list of strings is wrapped in a `TimestampedValue` object, associating a timestamp with it. The structure is now a `PCollection` of `TimestampedValue` objects, each containing a list of strings (e.g., `TimestampedValue(['1', '2023-01-01 10:00:00', 'apple'], 1672567200.0)`).
4.  **FixedWindows (`beam.WindowInto(FixedWindows(30 * 60))`)**: The `TimestampedValue` objects are grouped into windows based on their timestamps. The structure remains a `PCollection` of `TimestampedValue` objects, but now organized within windows.
5.  **ExtractItemAndCount (`beam.Map`)**: Each `TimestampedValue` object's value (the list of strings) is paired with a count of 1. The structure becomes a `PCollection` of tuples, where each tuple contains the original list and the count (e.g., `(['1', '2023-01-01 10:00:00', 'apple'], 1)`).
6.  **FilterCounts (`beam.Filter`)**: The structure remains a `PCollection` of tuples `(original_element, 1)`, but elements where the count is not 1 are removed (in this simple case, all elements are kept).
7.  **PartitionByItem (`beam.Partition`)**: The `PCollection` is split into a list of `PCollection`s based on the item name (the third element of the original list within the tuple). Each resulting `PCollection` contains tuples `(original_element, 1)` for a specific item type (e.g., one `PCollection` for apples, one for bananas, one for others).
8.  **Print Partitions (`beam.Map`)**: The elements in each partitioned `PCollection` are processed (in this case, printed to the console). The structure within each partition is still tuples `(original_element, 1)`.