In [34]:
pip install apache-beam



In [47]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.transforms.window import FixedWindows, TimestampedValue
from datetime import datetime
import csv

In [48]:
p = beam.Pipeline(options=PipelineOptions())

In [49]:
events = p | 'Read CSV' >> ReadFromText('/content/sample_dataset.csv', skip_header_lines=1)




In [50]:
def parse_csv(line):
    """Parses a CSV line into a dictionary."""
    fieldnames = ['timestamp', 'user_id', 'item_id', 'price']
    reader = csv.DictReader([line], fieldnames=fieldnames)
    return next(reader)

parsed_events = events | 'Parse CSV Line' >> beam.Map(parse_csv)


In [57]:
def assign_timestamp(event):
    timestamp_str = event['timestamp']
    timestamp_obj = datetime.strptime(timestamp_str, '%m/%d/%Y %H:%M')
    return TimestampedValue(event, timestamp_obj)


timestamped_events = parsed_events | 'Assign Timestamp' >> beam.Map(assign_timestamp)


In [58]:
windowed_events = timestamped_events | 'Window into Fixed Intervals' >> beam.WindowInto(FixedWindows(60*60))


In [59]:
def extract_price(event):
    return float(event['price'])

total_sales_per_window = (
    windowed_events
    | 'Extract Price' >> beam.Map(extract_price)
    | 'Compute Total Sales' >> beam.CombineGlobally(sum).without_defaults()
)


In [60]:
output = total_sales_per_window | 'Write to File' >> beam.io.WriteToText('/path/to/output.txt')


In [None]:
p.run().wait_until_finish()