<a href="https://colab.research.google.com/github/aniket-alt/apache-beam/blob/main/Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 1: Setup
!pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.69.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (21 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro<2,>=0.23.6 (from apache-beam)
  Downloading fastavro-1.12.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (5.8 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam)
  Downloading fasteners-0.20-py3-none-any.whl.metadata (4.8 kB)
Collecting grpcio!=1.48.0,!=1.59.*,!=1.60.*,!=1.61.*,!=1.62.0,!=1.62.1,<1.66.0,<2,>=1.33.1 (from apache-beam)
  Downloading grpcio-1.65.5-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.3 kB)
Collecting hdfs<3.0.0,>=2.1.0 (from apache-beam)
  Downloading hdfs-2.7.3.tar.gz (43 kB)
[2K 

In [1]:
# 2: Imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import time
import re # We'll use this for parsing
from apache_beam.transforms.window import FixedWindows

In [2]:
# 3: Sample Data and Pipeline
# Get a fake "current time" to base our timestamps on
# In a real stream, these would be generated live
START_TIME = int(time.time())

# Our sample data: (timestamp, event_type, player_id, damage)
# We'll use simple CSV strings
RAW_DATA = [
    f"{START_TIME + 1},pvp,player_1,15",
    f"{START_TIME + 2},pvm,player_2,50",
    f"{START_TIME + 4},pvp,player_3,75",
    f"{START_TIME + 8},pvm,player_1,25",
    f"{START_TIME + 11},pvp,player_2,150", # This will be in window 2
    f"{START_TIME + 13},pvm,player_3,10",  # This will be in window 2
    f"{START_TIME + 15},invalid_log,error,999", # An error to be filtered
    f"{START_TIME + 17},pvp,player_1,30",  # This will be in window 2
]

# Set up the pipeline
options = PipelineOptions()
pipeline = beam.Pipeline(options=options)

In [3]:
# 4: Our Composite Transform
class ParseLogFn(beam.DoFn):
    def process(self, element, *args, **kwargs):
        try:
            timestamp_str, event_type, player_id, damage_str = element.split(',')
            # Add the timestamp to the element for windowing
            # This is how Beam knows "when" the event happened
            yield beam.transforms.window.TimestampedValue(
                {
                    "event_type": event_type,
                    "player_id": player_id,
                    "damage": int(damage_str)
                },
                int(timestamp_str) # The event's "event time"
            )
        except:
            # Bad log, just drop it
            pass # ParDo lets us filter by simply not yielding

class ParseAndFilterLogs(beam.PTransform):
    """A composite transform to parse and filter game logs."""
    def expand(self, pcoll):
        # We use ParDo to parse (1-to-1, or 1-to-0 if it fails)
        parsed = pcoll | "Parse" >> beam.ParDo(ParseLogFn())

        # We use Filter to remove any other invalid types
        # Here we'll filter out any "non-game" events
        filtered = parsed | "FilterInvalid" >> beam.Filter(
            lambda log: log['event_type'] in ('pvp', 'pvm')
        )
        return filtered

In [4]:
# Cell 5: Apply the Transforms in the Pipeline
# 1. Apply our Composite Transform
clean_logs = (
    pipeline
    | "CreateData" >> beam.Create(RAW_DATA) # Pipeline IO (Input)
    | "ParseAndFilter" >> ParseAndFilterLogs()
)

# 2. Apply Partition
def partition_fn(log, num_partitions):
    if log['event_type'] == 'pvp':
        return 0  # Partition 0
    else:
        return 1  # Partition 1

# This splits 'clean_logs' into two PCollections
partitioned_logs = clean_logs | "PartitionEvents" >> beam.Partition(partition_fn, 2)

pvp_logs = partitioned_logs[0]
pvm_logs = partitioned_logs[1]





In [5]:
# Cell 6: Windowing and Aggregation
player_scores = (
    clean_logs
    | "ExtractPlayerDamage" >> beam.Map(
        lambda log: (log['player_id'], log['damage']) # Map to (K, V)
    )
    | "ApplyWindowing" >> beam.WindowInto(FixedWindows(10)) # 10-second windows
    | "SumPerPlayerPerWindow" >> beam.CombinePerKey(sum)
)

In [6]:
# Cell 7: Add our "sinks" (outputs) to print results
# We can add labels to our prints to see what's what
_ = pvp_logs | "PrintPvP" >> beam.Map(lambda x: print(f"PVP_LOG: {x}"))
_ = pvm_logs | "PrintPvM" >> beam.Map(lambda x: print(f"PVM_LOG: {x}"))
_ = player_scores | "PrintScores" >> beam.Map(lambda x: print(f"SCORES_PER_WINDOW: {x}"))

# Run the pipeline!
print("--- Running Pipeline ---")
result = pipeline.run()
result.wait_until_finish()
print("--- Pipeline Finished ---")



--- Running Pipeline ---
PVP_LOG: {'event_type': 'pvp', 'player_id': 'player_1', 'damage': 15}
PVM_LOG: {'event_type': 'pvm', 'player_id': 'player_2', 'damage': 50}
PVP_LOG: {'event_type': 'pvp', 'player_id': 'player_3', 'damage': 75}
PVM_LOG: {'event_type': 'pvm', 'player_id': 'player_1', 'damage': 25}
PVP_LOG: {'event_type': 'pvp', 'player_id': 'player_2', 'damage': 150}
PVM_LOG: {'event_type': 'pvm', 'player_id': 'player_3', 'damage': 10}
PVP_LOG: {'event_type': 'pvp', 'player_id': 'player_1', 'damage': 30}
SCORES_PER_WINDOW: ('player_1', 40)
SCORES_PER_WINDOW: ('player_1', 30)
SCORES_PER_WINDOW: ('player_2', 50)
SCORES_PER_WINDOW: ('player_2', 150)
SCORES_PER_WINDOW: ('player_3', 75)
SCORES_PER_WINDOW: ('player_3', 10)
--- Pipeline Finished ---
