In [1]:
!pip3 install --quiet apache-beam

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/89.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.2/17.2 MB[0m [31m71.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m38.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m84.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m85.8 MB/s[0m eta [36m0:00:00[0m
[2K 

In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.interactive import interactive_beam

# Initialize interactive Beam environment
interactive_beam.options.capture_control.capture_duration = 5
print("✅ Apache Beam installed and ready.")

✅ Apache Beam installed and ready.


In [3]:
sales_data = """order_id,customer,amount,timestamp
1,John,250,2025-10-28T10:01:05
2,Alice,900,2025-10-28T10:01:25
3,Bob,50,2025-10-28T10:02:10
4,Jane,600,2025-10-28T10:02:30
5,Steve,0,2025-10-28T10:03:05
6,Mary,120,2025-10-28T10:03:45
7,Tom,750,2025-10-28T10:04:10
"""

with open("sales.csv", "w") as f:
    f.write(sales_data)

# Check that the file was created successfully
!cat sales.csv

order_id,customer,amount,timestamp
1,John,250,2025-10-28T10:01:05
2,Alice,900,2025-10-28T10:01:25
3,Bob,50,2025-10-28T10:02:10
4,Jane,600,2025-10-28T10:02:30
5,Steve,0,2025-10-28T10:03:05
6,Mary,120,2025-10-28T10:03:45
7,Tom,750,2025-10-28T10:04:10


In [4]:
# STEP 3: Basic Beam pipeline - Read and print sales data

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

# Initialize a Beam pipeline
p = beam.Pipeline(InteractiveRunner(), options=PipelineOptions())

# Read data from the CSV file (skipping the header)
sales = (
    p
    | "ReadSalesData" >> beam.io.ReadFromText("sales.csv", skip_header_lines=1)
)

# Print each line to verify the pipeline works
sales | "PrintLines" >> beam.Map(print)

# Execute the pipeline
result = p.run()
result.wait_until_finish()

print("✅ Pipeline executed successfully.")





1,John,250,2025-10-28T10:01:05
2,Alice,900,2025-10-28T10:01:25
3,Bob,50,2025-10-28T10:02:10
4,Jane,600,2025-10-28T10:02:30
5,Steve,0,2025-10-28T10:03:05
6,Mary,120,2025-10-28T10:03:45
7,Tom,750,2025-10-28T10:04:10
✅ Pipeline executed successfully.


In [5]:
# STEP 4: Parse, clean, and enrich sales data using ParDo and Composite Transform

class ParseAndCleanDoFn(beam.DoFn):
    def process(self, element):
        try:
            order_id, customer, amount, timestamp = element.split(',')
            amount = float(amount)
            if amount > 0:  # Filter invalid rows here
                yield {
                    'order_id': order_id.strip(),
                    'customer': customer.strip(),
                    'amount': amount,
                    'timestamp': timestamp.strip()
                }
        except Exception as e:
            # Skip malformed lines
            return

# Composite transform that combines parsing and enrichment
class CleanSalesData(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "ParseAndClean" >> beam.ParDo(ParseAndCleanDoFn())
            | "AddTax" >> beam.Map(lambda x: {**x, 'total_with_tax': round(x['amount'] * 1.05, 2)})
        )

# Build pipeline
p2 = beam.Pipeline(InteractiveRunner(), options=PipelineOptions())

parsed_sales = (
    p2
    | "ReadCSV" >> beam.io.ReadFromText("sales.csv", skip_header_lines=1)
    | "CleanSalesData" >> CleanSalesData()
)

# Print a few cleaned results
parsed_sales | "PreviewCleanedData" >> beam.Map(print)

# Run pipeline
p2.run().wait_until_finish()

print("✅ Parsing and cleaning complete.")




{'order_id': '1', 'customer': 'John', 'amount': 250.0, 'timestamp': '2025-10-28T10:01:05', 'total_with_tax': 262.5}
{'order_id': '2', 'customer': 'Alice', 'amount': 900.0, 'timestamp': '2025-10-28T10:01:25', 'total_with_tax': 945.0}
{'order_id': '3', 'customer': 'Bob', 'amount': 50.0, 'timestamp': '2025-10-28T10:02:10', 'total_with_tax': 52.5}
{'order_id': '4', 'customer': 'Jane', 'amount': 600.0, 'timestamp': '2025-10-28T10:02:30', 'total_with_tax': 630.0}
{'order_id': '6', 'customer': 'Mary', 'amount': 120.0, 'timestamp': '2025-10-28T10:03:45', 'total_with_tax': 126.0}
{'order_id': '7', 'customer': 'Tom', 'amount': 750.0, 'timestamp': '2025-10-28T10:04:10', 'total_with_tax': 787.5}
✅ Parsing and cleaning complete.


In [6]:
# STEP 5: Apply Map and Filter transforms

def to_usd(sale):
    """Convert amount to USD using an exchange rate."""
    exchange_rate = 1.1  # Assume 1 local currency = 1.1 USD
    sale['amount_usd'] = round(sale['amount'] * exchange_rate, 2)
    return sale

def is_significant_sale(sale):
    """Keep only sales above a certain threshold."""
    return sale['amount'] > 100

# Build pipeline
p3 = beam.Pipeline(InteractiveRunner(), options=PipelineOptions())

processed_sales = (
    p3
    | "ReadCleanedData" >> beam.io.ReadFromText("sales.csv", skip_header_lines=1)
    | "CleanSalesData" >> CleanSalesData()
    | "ConvertToUSD" >> beam.Map(to_usd)
    | "FilterSmallSales" >> beam.Filter(is_significant_sale)
)

# Print the filtered, converted sales
processed_sales | "PrintProcessed" >> beam.Map(print)

# Run
p3.run().wait_until_finish()

print("✅ Map and Filter transformations complete.")



{'order_id': '1', 'customer': 'John', 'amount': 250.0, 'timestamp': '2025-10-28T10:01:05', 'total_with_tax': 262.5, 'amount_usd': 275.0}
{'order_id': '2', 'customer': 'Alice', 'amount': 900.0, 'timestamp': '2025-10-28T10:01:25', 'total_with_tax': 945.0, 'amount_usd': 990.0}
{'order_id': '4', 'customer': 'Jane', 'amount': 600.0, 'timestamp': '2025-10-28T10:02:30', 'total_with_tax': 630.0, 'amount_usd': 660.0}
{'order_id': '6', 'customer': 'Mary', 'amount': 120.0, 'timestamp': '2025-10-28T10:03:45', 'total_with_tax': 126.0, 'amount_usd': 132.0}
{'order_id': '7', 'customer': 'Tom', 'amount': 750.0, 'timestamp': '2025-10-28T10:04:10', 'total_with_tax': 787.5, 'amount_usd': 825.0}
✅ Map and Filter transformations complete.


In [7]:
# STEP 6: Partition the data into high-value and regular sales

def partition_fn(sale, num_partitions):
    """Partition sales based on the amount."""
    return 0 if sale['amount'] > 500 else 1

p4 = beam.Pipeline(InteractiveRunner(), options=PipelineOptions())

high_value, regular_sales = (
    p4
    | "ReadSales" >> beam.io.ReadFromText("sales.csv", skip_header_lines=1)
    | "CleanData" >> CleanSalesData()
    | "ToUSD" >> beam.Map(lambda x: {**x, 'amount_usd': round(x['amount'] * 1.1, 2)})
    | "PartitionSales" >> beam.Partition(partition_fn, 2)
)

# Print each partition separately
(high_value | "PrintHighValue" >> beam.Map(lambda x: print("High value:", x)))
(regular_sales | "PrintRegular" >> beam.Map(lambda x: print("Regular:", x)))

# Run
p4.run().wait_until_finish()

print("✅ Partitioning complete.")



Regular: {'order_id': '1', 'customer': 'John', 'amount': 250.0, 'timestamp': '2025-10-28T10:01:05', 'total_with_tax': 262.5, 'amount_usd': 275.0}
High value: {'order_id': '2', 'customer': 'Alice', 'amount': 900.0, 'timestamp': '2025-10-28T10:01:25', 'total_with_tax': 945.0, 'amount_usd': 990.0}
Regular: {'order_id': '3', 'customer': 'Bob', 'amount': 50.0, 'timestamp': '2025-10-28T10:02:10', 'total_with_tax': 52.5, 'amount_usd': 55.0}
High value: {'order_id': '4', 'customer': 'Jane', 'amount': 600.0, 'timestamp': '2025-10-28T10:02:30', 'total_with_tax': 630.0, 'amount_usd': 660.0}
Regular: {'order_id': '6', 'customer': 'Mary', 'amount': 120.0, 'timestamp': '2025-10-28T10:03:45', 'total_with_tax': 126.0, 'amount_usd': 132.0}
High value: {'order_id': '7', 'customer': 'Tom', 'amount': 750.0, 'timestamp': '2025-10-28T10:04:10', 'total_with_tax': 787.5, 'amount_usd': 825.0}
✅ Partitioning complete.


In [8]:
# STEP 7: Apply Windowing to group sales by 1-minute intervals

from apache_beam import window
import datetime

def parse_and_add_timestamp(line):
    """Parse a CSV line and attach an event timestamp."""
    order_id, customer, amount, timestamp = line.split(',')
    amount = float(amount)
    if amount <= 0:
        return
    # Convert string timestamp to Beam Timestamp
    ts = datetime.datetime.fromisoformat(timestamp.strip())
    yield beam.window.TimestampedValue(
        (customer, amount),
        ts.timestamp()
    )

p5 = beam.Pipeline(InteractiveRunner(), options=PipelineOptions())

windowed_sales = (
    p5
    | "ReadSalesForWindow" >> beam.io.ReadFromText("sales.csv", skip_header_lines=1)
    | "ParseAndAddTimestamp" >> beam.FlatMap(parse_and_add_timestamp)
    | "ApplyFixedWindow" >> beam.WindowInto(window.FixedWindows(60))
    | "SumAmountsPerCustomer" >> beam.CombinePerKey(sum)
)

# Print windowed results
windowed_sales | "PrintWindowedResults" >> beam.Map(print)

p5.run().wait_until_finish()

print("✅ Windowing complete.")



('John', 250.0)
('Alice', 900.0)
('Bob', 50.0)
('Jane', 600.0)
('Mary', 120.0)
('Tom', 750.0)
✅ Windowing complete.


In [9]:
# STEP 8: Write high-value and regular sales to output files

def partition_fn(sale, num_partitions):
    return 0 if sale['amount'] > 500 else 1  # 0 -> high value, 1 -> regular

p6 = beam.Pipeline(InteractiveRunner(), options=PipelineOptions())

high_value, regular_sales = (
    p6
    | "ReadSalesFile" >> beam.io.ReadFromText("sales.csv", skip_header_lines=1)
    | "CleanSales" >> CleanSalesData()
    | "PartitionSales" >> beam.Partition(partition_fn, 2)
)

# Write each partition to separate text files
(
    high_value
    | "FormatHighValue" >> beam.Map(lambda x: f"{x['customer']},{x['amount']},{x['timestamp']}")
    | "WriteHighValue" >> beam.io.WriteToText("output/high_value_sales")
)

(
    regular_sales
    | "FormatRegular" >> beam.Map(lambda x: f"{x['customer']},{x['amount']},{x['timestamp']}")
    | "WriteRegular" >> beam.io.WriteToText("output/regular_sales")
)

p6.run().wait_until_finish()

print("✅ Outputs written to 'output/' folder.")
!ls output



✅ Outputs written to 'output/' folder.
high_value_sales-00000-of-00001  regular_sales-00000-of-00001
