<a href="https://colab.research.google.com/github/SriVinayA/SJSU-CMPE255-DataMining/blob/main/DM_Assignment_4_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### 1. **Setup and Dependencies:**
   Before diving into the pipeline, make sure to have Apache Beam installed in your environment. If you haven't, you can install it using:

In [1]:
!pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.51.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.7/14.7 MB[0m [31m39.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m11.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam)
  Downloading orjson-3.9.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (138 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m138.7/138.7 kB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m17.8 MB/s[0m eta [36m0:00:

### 2. **Pipeline I/O: Reading Data**
   We'll define a function to read data from the CSV file. This function yields each row as a list of values.

In [2]:
import apache_beam as beam

def read_from_csv(file_path):
    with open(file_path, 'r') as file:
        next(file)  # skip the header
        for line in file:
            yield line.strip().split(',')

### 3. **ParDo Transform: Extract Emissions Value**
   ParDo is a parallel processing operation. We'll define a custom transform to extract the value column from each record.

In [3]:
class ExtractValue(beam.DoFn):
    def process(self, record):
        yield float(record[2])  # Extract the 'value' column, which is the third column in the dataset

### 4. **Windowing and Triggers**
   We'll set up fixed windowing and define a trigger. For this example, we're using arbitrary numbers for the window duration and trigger count. You can adjust these based on your requirements.

In [4]:
# Window duration and trigger count can be modified based on your requirements
WINDOW_DURATION = 60
TRIGGER_COUNT = 10

### 5. **Composite Transform**
   We'll create a composite transform that extracts the emission value and then calculates its mean.

In [5]:
class ExtractAndCalculateMean(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | "ExtractValue" >> beam.ParDo(ExtractValue())
                | "CalculateMean" >> beam.CombineGlobally(beam.combiners.MeanCombineFn()))

### 6. **Constructing and Running the Pipeline**
   Now, we'll piece everything together and construct the Apache Beam pipeline.

In [7]:
from apache_beam.options.pipeline_options import PipelineOptions

# Create the Pipeline
with beam.Pipeline(options=PipelineOptions()) as p:
    # Read records from the CSV
    records = p | "ReadFromCSV" >> beam.Create(read_from_csv('/content/drive/MyDrive/SJSU/Sem1/CMPE255 - Data Mining/Assignment 4/greenhouse_gas_inventory_data_data.csv'))

    # Extract emission values
    values = records | "ExtractValues" >> beam.ParDo(ExtractValue())

    # Apply windowing and triggers
    windowed_values = (
        values
        | "Window" >> beam.WindowInto(beam.window.FixedWindows(WINDOW_DURATION),
                                      trigger=beam.transforms.trigger.AfterCount(TRIGGER_COUNT),
                                      accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
    )

    # Compute mean of emission values
    mean_values = records | "ExtractAndCalculateMean" >> ExtractAndCalculateMean()

    # Output results
    windowed_values | "PrintWindowedValues" >> beam.Map(print)
    mean_values | "PrintMeanValues" >> beam.Map(print)





[1;30;43mStreaming output truncated to the last 5000 lines.[0m
9914.6869693544
9613.58054963515
9368.02796676995
9422.41749132518
9135.85826323885
8752.62764727365
8546.57869520082
8102.16179231695
7943.21475839427
7346.29314585491
5972.05163961269
5161.16483144972
4837.33235475625
4134.19908899373
3392.97797253605
2606.0735993744
2683.0981788552
2492.8558365488
287.5644590521
45.9220062644
50.32
5758.12863877603
5650.21859764964
5061.77691494848
4661.65509784493
4388.66802144415
3964.12465526031
3710.3482172854
3245.14066079019
2722.45340589096
5077.45227954165
4927.91153024745
4733.35562321277
5090.06558842149
4781.3884482475
5261.83186356152
6721.1460300199
5767.51429514513
5166.48802785957
4820.16537150106
4157.37596992872
2712.11246316
2032.43915305
1149.072
1400.08
1182.816
1428.15989742829
1280.34083026629
1184.63413628766
1345.29450693186
1223.00892466918
1118.16207549988
1163.87453384613
1033.27019656279
948.667606663176
804.015080509621
704.416274222243
579.923813000475
447