# **Apache Beam**

In [7]:
# Required installations for Google Colab
!pip install apache-beam

import apache_beam as beam

# Define custom filter transformation
class FilterHouses(beam.DoFn):
    def process(self, element, threshold=100000):
        if element and 'TotalValue' in element and element['TotalValue']:
            if float(element['TotalValue']) >= threshold:
                yield element

# Define composite transform
class AverageTotalValueByCity(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'GroupByCity' >> beam.GroupBy('City')
            | 'ComputeAverageTotalValue' >> beam.Map(lambda element: (element[0], sum(float(item['TotalValue']) for item in element[1]) / len(element[1])))
        )

# Define the pipeline with debugging steps
def run_pipeline():
    with beam.Pipeline() as pipeline:
        input_data = (
            pipeline
            | 'ReadCSV' >> beam.io.ReadFromText('/content/sample_data/real-estate-sales-730-days-1.csv', skip_header_lines=1)
            | 'ParseCSV' >> beam.Map(lambda line: dict(zip(['City', 'TotalValue'], line.split(','))))
            | 'PrintParsedData' >> beam.Map(print)  # Debugging step
            | 'FilterByTotalValue' >> beam.ParDo(FilterHouses())
            | 'PrintFilteredData' >> beam.Map(print)  # Debugging step
            | 'AverageByCity' >> AverageTotalValueByCity()
            | 'FormatOutput' >> beam.Map(lambda element: f"{element[0]},{element[1]}")
            | 'WriteToCSV' >> beam.io.WriteToText('/content/output.csv')
        )

run_pipeline()


{'City': '0', 'TotalValue': '30'}
{'City': '1', 'TotalValue': '50'}
{'City': '2', 'TotalValue': '58'}
{'City': '3', 'TotalValue': '66'}
{'City': '4', 'TotalValue': '78'}
{'City': '5', 'TotalValue': '79'}
{'City': '6', 'TotalValue': '94'}
{'City': '7', 'TotalValue': '94'}
{'City': '8', 'TotalValue': '98'}
{'City': '9', 'TotalValue': '98'}
{'City': '10', 'TotalValue': '103'}
{'City': '11', 'TotalValue': '127'}
{'City': '12', 'TotalValue': '142'}
{'City': '13', 'TotalValue': '165'}
{'City': '14', 'TotalValue': '189'}
{'City': '15', 'TotalValue': '192'}
{'City': '16', 'TotalValue': '200'}
{'City': '17', 'TotalValue': '203'}
{'City': '18', 'TotalValue': '235'}
{'City': '19', 'TotalValue': '235'}
{'City': '20', 'TotalValue': '258'}
{'City': '21', 'TotalValue': '263'}
{'City': '22', 'TotalValue': '328'}
{'City': '23', 'TotalValue': '364'}
{'City': '24', 'TotalValue': '415'}
{'City': '25', 'TotalValue': '467'}
{'City': '26', 'TotalValue': '489'}
{'City': '27', 'TotalValue': '11'}
{'City': '28'

In [8]:
# Required installations for Google Colab
!pip install apache-beam

import apache_beam as beam

# Define the basic pipeline
def run_consolidated_pipeline():
    with beam.Pipeline() as pipeline:
        input_data = (
            pipeline
            | 'ReadCSV' >> beam.io.ReadFromText('/content/sample_data/real-estate-sales-730-days-1.csv', skip_header_lines=1)
            | 'ParseCSV' >> beam.Map(lambda line: dict(zip(['City', 'TotalValue'], line.split(','))))
            | 'WriteToCSV' >> beam.io.WriteToText('/content/sample_data/consolidated_output.csv', num_shards=1)
        )

run_consolidated_pipeline()




1. **Setup**: We added necessary tools to work with our data.
2. **Average House Value**: We made a tool (function) that can calculate the average value of houses in a city.
3. **Date Conversion**: We made another tool that can read the date from our data. If the date is not correct, we just skip that piece of data.
4. **Main Process**:
   - We opened and read our data from a file.
   - We turned the data into a format that's easier for us to work with.
   - We converted the date information for each house sale.
   - We then grouped our data in chunks of 30 days (like putting them into monthly buckets).
   - For each city, we made a list of house values.
   - Using our average tool, we calculated the average house value for each city in each 30-day chunk.
   - Lastly, we saved our results in a new file.

In simpler terms, we looked at house sales data, grouped them by month and city, and then found out the average house sale value for each city in each month. We saved these monthly averages in a new file.

In [16]:
# Required installations for Google Colab
!pip install apache-beam

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from datetime import datetime

# Custom transformation to compute the average TotalValue for each city
class ComputeAverage(beam.DoFn):
    def process(self, element):
        city, total_values = element
        if total_values:
            avg_value = sum(total_values) / len(total_values)
            yield city, avg_value

# Convert string to datetime
def to_datetime(element):
    try:
        dt = datetime.strptime(element['Date'], '%Y-%m-%d %H:%M:%S')
        return beam.window.TimestampedValue(element, dt)
    except (ValueError, TypeError):
        return  # Skip rows with invalid dates

# Define the pipeline with windowing and ParDo
def run_advanced_pipeline():
    with beam.Pipeline() as pipeline:
        results = (
            pipeline
            | 'ReadCSV' >> beam.io.ReadFromText('/content/sample_data/real-estate-sales-730-days-1.csv', skip_header_lines=1)
            | 'ParseCSV' >> beam.Map(lambda line: dict(zip(['Date', 'City', 'TotalValue'], line.split(','))))
            | 'ToDatetime' >> beam.FlatMap(to_datetime)  # Convert string to datetime objects and filter out invalid entries
            | 'Window' >> beam.WindowInto(FixedWindows(30*24*60*60))  # 30 days in seconds
            | 'ToKV' >> beam.Map(lambda x: (x['City'], float(x['TotalValue'])))
            | 'GroupByKey' >> beam.GroupByKey()
            | 'ComputeAverage' >> beam.ParDo(ComputeAverage())
            | 'WriteResults' >> beam.io.WriteToText('/content/sample_data/windowed_output.csv', num_shards=1, header='City,AverageTotalValue')
        )

run_advanced_pipeline()




In [17]:
# Define a basic pipeline to read, parse, and write the CSV data
def basic_pipeline():
    with beam.Pipeline() as pipeline:
        _ = (
            pipeline
            | 'ReadCSV' >> beam.io.ReadFromText('/content/sample_data/real-estate-sales-730-days-1.csv', skip_header_lines=1)
            | 'ParseCSV' >> beam.Map(lambda line: dict(zip(['Date', 'City', 'TotalValue'], line.split(','))))
            | 'WriteResults' >> beam.io.WriteToText('/content/sample_data/basic_output.csv', num_shards=1, header='Date,City,TotalValue')
        )

basic_pipeline()

We followed the steps below:
1. **Setup**: We set things up by downloading the necessary tools.
2. **Filter Houses**: We made a function to pick houses whose value is above a certain amount.
3. **Average by City**: We made another function to find out the average house value in each city.
4. **The Main Process**:
   - We read the data from a file.
   - We turned each line in the file into a dictionary with city names and house values.
   - For debugging, we printed out the data to see if it looks right.
   - We used our filter to keep only the houses we're interested in.
   - Again, for debugging, we printed out the filtered data.
   - We then calculated the average house value for each city.
   - Finally, we saved our results to a new file.

In simpler terms, we took a list of houses, filtered out the cheaper ones, and then found out the average value of the remaining houses in each city. We saved these average values in a new file.