## Assignment Part 3 - Apache Beam

### Apache beam features
The data used in this section will be the same data used in the Israel-Palestine_EDA notebook, as there are some data preprocessing steps that were taken in that notebook that could be automated with beam.

Demonstrating Apache beam features by creating a data ingestion, validation, and preprocessing pipeline (i.e composite transform, pipeline io, triggers, windowing , pardo)


In [1]:
import apache_beam as beam
import pandas as pd
import statistics
import os

In [2]:
israel_v_palestine = pd.read_csv('Israel-Palestine.csv')

In [3]:
israel_v_palestine.isna().sum()

Year                                                  0
Country                                               0
GDP (in USD)                                          0
Population                                            0
GDP Growth Rate (%)                                   0
Fertility Rate                                        0
Infant Mortality Rate (per 1,000 live births)         0
Maternal Mortality Rate (per 100,000 live births)     0
Agricultural Output (in USD)                          0
Active Military Personnel                            22
Reserve Military Personnel                           22
Literacy Rate (%)                                    13
IT Output (in USD)                                   13
Number of Tanks                                      22
Number of Submarines                                 22
Number of Armoured Vehicles                          22
dtype: int64

### Data Ingestion

This is a composite transform to fill in missing values with the mean of the columns in the data that have missing values (columns 9-16) excluding column 12

In [4]:
# Custom Pardo to fill missing values with the mean of the column
class FillMissingWithMean(beam.DoFn):
    def __init__(self):
        self.column_means = None

    def setup(self):
        # Initialize a dictionary to store column means for all columns
        self.column_means = {}
        for i in range(9, 17):  # Columns 9 through 16
            self.column_means[i] = 0.0

    def process(self, element):
        # Calculate column means for the first element
        if not self.column_means:
            for i in range(len(element)):
                if i in self.column_means:
                    column_data = [float(x[i]) for x in element if x[i] is not None]
                    if column_data:
                        self.column_means[i] = sum(column_data) / len(column_data)
                    else:
                        self.column_means[i] = None
        # Fill missing values with column mean for columns 9-11 and 13-16 (0-based index), excluding column 12
        for i in range(9, 16):  # Columns 9 through 15
            if i != 12:  # Exclude column 12
                if element[i] is None:
                    element[i] = str(self.column_means[i])
        yield element


In [5]:
# Define a function to convert a list to a CSV string to save data to a file
def list_to_csv_string(row):
    return ','.join(row)

In [6]:
pipe = beam.Pipeline()

In [7]:
with beam.Pipeline() as p:
    # Read data from a text file, skip header lines
    data = p | beam.io.ReadFromText('Israel-Palestine.csv', skip_header_lines=True)

    # Apply transformations to split each line into a list of values
    transformed_data = data | beam.Map(lambda x: x.split(','))
    filled_data = transformed_data | beam.ParDo(FillMissingWithMean())
    filled_data = filled_data | beam.Map(list_to_csv_string)
    filled_data | beam.io.WriteToText('./', file_name_suffix='.txt')
os.rename('-00000-of-00001.txt', 'filled_missing.csv')



In [8]:
israel_in_pipe = { pipe
                      |beam.io.ReadFromText('filled_missing.csv', skip_header_lines=True)
                      |beam.Map(lambda x: x.split(','))
                      |beam.Filter(lambda x: x[1] == 'Israel') # x[1] is Country column
                      |beam.Map(print)
                      }

In [9]:
palestine_in_pipe = { pipe
                 |beam.io.ReadFromText('filled_missing.csv', skip_header_lines=True)
                 |beam.Map(lambda x: x.split(','))
                 |beam.Filter(lambda x: x[1] == 'Palestine') # x[1] is Country column
                 |beam.Map(print)
}

In [10]:
pipe.run()

['2001', 'Israel', '131.02 billion', '"6', '165', '000"', '-0.47', '2.89', '4.8', '6.7', '5.17 billion', '"166', '500"', '"445', '000"', '97.5', '6.89 billion', '"2', '791"', '3', '"10', '814"']
['2002', 'Israel', '126.64 billion', '"6', '263', '000"', '-0.35', '2.98', '4.6', '5.9', '4.76 billion', '"163', '000"', '"375', '000"', '97.9', '7.02 billion', '"2', '821"', '3', '"10', '885"']
['2003', 'Israel', '129.56 billion', '"6', '684', '000"', '2.8', '2.81', '4.3', '6.1', '5.10 billion', '"160', '000"', '"445', '000"', '97.9', '7.15 billion', '"2', '819"', '3', '"10', '885"']
['2004', 'Israel', '143.53 billion', '"6', '830', '000"', '4.5', '2.84', '4.1', '6.8', '5.33 billion', '"159', '000"', '"428', '000"', '97.9', '7.36 billion', '"2', '798"', '3', '"10', '886"']
['2005', 'Israel', '155.05 billion', '"6', '925', '000"', '4.97', '2.87', '3.9', '6.4', '5.38 billion', '"156', '000"', '"411', '000"', '98.4', '7.69 billion', '"2', '771"', '3', '"10', '876"']
['2006', 'Israel', '179.02 bil

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fddd1360e80>