# Apache Beam on Video Game Sales Dataset

## Setting Up and Running a Simple Apache Beam Pipeline

In [None]:

# Install Apache Beam if not already installed
!pip install -q apache-beam

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Sample data to simulate streaming input
sample_data = [
    {'Name': 'Wii Sports', 'Platform': 'Wii', 'Year': 2006, 'Genre': 'Sports', 'Global_Sales': 82.74},
    {'Name': 'Super Mario Bros.', 'Platform': 'NES', 'Year': 1985, 'Genre': 'Platform', 'Global_Sales': 40.24},
    {'Name': 'Mario Kart Wii', 'Platform': 'Wii', 'Year': 2008, 'Genre': 'Racing', 'Global_Sales': 35.82},
    {'Name': 'Wii Sports Resort', 'Platform': 'Wii', 'Year': 2009, 'Genre': 'Sports', 'Global_Sales': 33.00},
    {'Name': 'Pokemon Red/Pokemon Blue', 'Platform': 'GB', 'Year': 1996, 'Genre': 'Role-Playing', 'Global_Sales': 31.37}
]

# Define a simple Apache Beam pipeline with ParDo, Windowing, and Composite Transform
class ParseGameData(beam.DoFn):
    def process(self, element):
        yield element

class FilterBySales(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | 'FilterHighSales' >> beam.Filter(lambda x: x['Global_Sales'] > 30))

def run_pipeline():
    # Create a PipelineOptions object
    options = PipelineOptions(flags=[], runner='DirectRunner')
    with beam.Pipeline(options=options) as p:
        # Create a PCollection from the sample data
        games = (p | 'CreateSampleData' >> beam.Create(sample_data)
                  | 'ParseGameData' >> beam.ParDo(ParseGameData()))

        # Apply a composite transform
        filtered_games = games | 'ApplyFilter' >> FilterBySales()

        # Write output to text file
        filtered_games | 'WriteOutput' >> beam.io.WriteToText('filtered_games.txt')

run_pipeline()


## Applying Windowing and Triggers in Apache Beam

In [None]:

from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode

def run_windowed_pipeline():
    options = PipelineOptions(flags=[], runner='DirectRunner')
    with beam.Pipeline(options=options) as p:
        # Windowing and trigger example
        windowed_data = (p
                         | 'CreateSampleData' >> beam.Create(sample_data)
                         | 'WindowIntoFixedWindows' >> beam.WindowInto(FixedWindows(60),
                                                                        trigger=AfterProcessingTime(30),
                                                                        accumulation_mode=AccumulationMode.DISCARDING)
                         | 'ParseGameData' >> beam.ParDo(ParseGameData()))

        windowed_data | 'WriteWindowedOutput' >> beam.io.WriteToText('windowed_output.txt')

run_windowed_pipeline()


## Streaming Simulation Example

In [None]:
# Add a streaming simulation example using the DirectRunner and unbounded PCollections.