In [1]:
#pip install apache-beam[gcp]

In [14]:
import apache_beam as beam
from apache_beam.metrics import Metrics, MetricsFilter
import time
import psutil

class ExtractAndProcess(beam.DoFn):
    def __init__(self):
        self.element_count = Metrics.counter(self.__class__, 'element_count')
        self.processing_time = Metrics.distribution(self.__class__, 'processing_time')
        self.cpu_usage = Metrics.gauge(self.__class__, 'cpu_usage')
        self.memory_usage = Metrics.gauge(self.__class__, 'memory_usage')

    def process(self, element):
#         print(element)
        # Skip the header row
        
        if element[0].startswith('date,max_temp,min_temp'):
            return

        # Start measuring processing time
        start_time = time.time()

        # Measure CPU and memory usage before processing
        cpu_before = psutil.cpu_percent()
        memory_before = psutil.virtual_memory().percent
        
        for element in element:
            if element.startswith('date,max_temp,min_temp'):
                continue

            date, max_temp, min_temp = element.split(',')
            avg_temp = (float(max_temp) + float(min_temp)) / 2

            # Emit the result
            yield {'date': date, 'avg_temp': avg_temp}

            # Increment element count metric
            self.element_count.inc()

        # Record processing time metric
        end_time = time.time()
        self.processing_time.update(int((end_time - start_time) * 1000))  # in milliseconds

        # Measure CPU and memory usage after processing
        cpu_after = psutil.cpu_percent()
        memory_after = psutil.virtual_memory().percent

        # Update CPU and memory usage metrics
        self.cpu_usage.set(cpu_after)
        self.memory_usage.set(memory_after)

def run(argv=None):
    # Define your pipeline options
    pipeline_options = beam.pipeline.PipelineOptions(argv)

    # Create a pipeline
    with beam.Pipeline(options=pipeline_options) as p:
        # Read the CSV file and create a subset of 100 elements
        subset_lines = (
            p
            | 'Read CSV (Last 1000)' >> beam.io.ReadFromText('preprocessed_data.csv')
            | 'Sample 1000' >> beam.transforms.combiners.Sample.FixedSizeGlobally(1000)
        )

        # Apply the ExtractAndProcess transform to the subset
        results = (
            subset_lines
            | 'Extract and Process' >> beam.ParDo(ExtractAndProcess())
            | 'Write Output' >> beam.io.WriteToText('output1000')
        )

    # Retrieve and print metrics
    result = p.run()

    # Accessing metrics directly without querying
    element_count = result.metrics().query(MetricsFilter().with_name('element_count'))
    processing_time = result.metrics().query(MetricsFilter().with_name('processing_time'))
    cpu_usage = result.metrics().query(MetricsFilter().with_name('cpu_usage'))
    memory_usage = result.metrics().query(MetricsFilter().with_name('memory_usage'))

    # Print the metrics
    if element_count:
        print(f"Total Element Count: {element_count['counters'][0].committed}")
    if processing_time:
        print(f"Average Processing Time: {processing_time['distributions'][0].committed.mean} milliseconds")
    if cpu_usage:
        print(f"CPU Usage: {cpu_usage['gauges'][0].committed.value:.2f}%")
    if memory_usage:
        print(f"Memory Usage: {memory_usage['gauges'][0].committed.value:.2f}%")

if __name__ == '__main__':
    run()




Total Element Count: 1000
Average Processing Time: 23.0 milliseconds
CPU Usage: 21.00%
Memory Usage: 63.00%


In [15]:
import apache_beam as beam
from apache_beam.metrics import Metrics, MetricsFilter
import time
import psutil

class ExtractAndProcess(beam.DoFn):
    def __init__(self):
        self.element_count = Metrics.counter(self.__class__, 'element_count')
        self.processing_time = Metrics.distribution(self.__class__, 'processing_time')
        self.cpu_usage = Metrics.gauge(self.__class__, 'cpu_usage')
        self.memory_usage = Metrics.gauge(self.__class__, 'memory_usage')

    def process(self, element):
#         print(element)
        # Skip the header row
        
        if element[0].startswith('date,max_temp,min_temp'):
            return

        # Start measuring processing time
        start_time = time.time()

        # Measure CPU and memory usage before processing
        cpu_before = psutil.cpu_percent()
        memory_before = psutil.virtual_memory().percent
        
        for element in element:
            if element.startswith('date,max_temp,min_temp'):
                continue

            date, max_temp, min_temp = element.split(',')
            avg_temp = (float(max_temp) + float(min_temp)) / 2

            # Emit the result
            yield {'date': date, 'avg_temp': avg_temp}

            # Increment element count metric
            self.element_count.inc()

        # Record processing time metric
        end_time = time.time()
        self.processing_time.update(int((end_time - start_time) * 1000))  # in milliseconds

        # Measure CPU and memory usage after processing
        cpu_after = psutil.cpu_percent()
        memory_after = psutil.virtual_memory().percent

        # Update CPU and memory usage metrics
        self.cpu_usage.set(cpu_after)
        self.memory_usage.set(memory_after)

def run(argv=None):
    # Define your pipeline options
    pipeline_options = beam.pipeline.PipelineOptions(argv)

    # Create a pipeline
    with beam.Pipeline(options=pipeline_options) as p:
        # Read the CSV file and create a subset of 100 elements
        subset_lines = (
            p
            | 'Read CSV (Last 5000)' >> beam.io.ReadFromText('preprocessed_data.csv')
            | 'Sample 5000' >> beam.transforms.combiners.Sample.FixedSizeGlobally(5000)
        )

        # Apply the ExtractAndProcess transform to the subset
        results = (
            subset_lines
            | 'Extract and Process' >> beam.ParDo(ExtractAndProcess())
            | 'Write Output' >> beam.io.WriteToText('output5000')
        )

    # Retrieve and print metrics
    result = p.run()

    # Accessing metrics directly without querying
    element_count = result.metrics().query(MetricsFilter().with_name('element_count'))
    processing_time = result.metrics().query(MetricsFilter().with_name('processing_time'))
    cpu_usage = result.metrics().query(MetricsFilter().with_name('cpu_usage'))
    memory_usage = result.metrics().query(MetricsFilter().with_name('memory_usage'))

    # Print the metrics
    if element_count:
        print(f"Total Element Count: {element_count['counters'][0].committed}")
    if processing_time:
        print(f"Average Processing Time: {processing_time['distributions'][0].committed.mean} milliseconds")
    if cpu_usage:
        print(f"CPU Usage: {cpu_usage['gauges'][0].committed.value:.2f}%")
    if memory_usage:
        print(f"Memory Usage: {memory_usage['gauges'][0].committed.value:.2f}%")

if __name__ == '__main__':
    run()




Total Element Count: 5000
Average Processing Time: 93.0 milliseconds
CPU Usage: 24.00%
Memory Usage: 63.00%


In [18]:
import apache_beam as beam
from apache_beam.metrics import Metrics, MetricsFilter
import time
import psutil

class ExtractAndProcess(beam.DoFn):
    def __init__(self):
        self.element_count = Metrics.counter(self.__class__, 'element_count')
        self.processing_time = Metrics.distribution(self.__class__, 'processing_time')
        self.cpu_usage = Metrics.gauge(self.__class__, 'cpu_usage')
        self.memory_usage = Metrics.gauge(self.__class__, 'memory_usage')

    def process(self, element):
#         print(element)
        # Skip the header row
        
        if element[0].startswith('date,max_temp,min_temp'):
            return

        # Start measuring processing time
        start_time = time.time()

        # Measure CPU and memory usage before processing
        cpu_before = psutil.cpu_percent()
        memory_before = psutil.virtual_memory().percent
        
        for element in element:
            if element.startswith('date,max_temp,min_temp'):
                continue
            date, max_temp, min_temp = element.split(',')
            avg_temp = (float(max_temp) + float(min_temp)) / 2

            # Emit the result
            yield {'date': date, 'avg_temp': avg_temp}

            # Increment element count metric
            self.element_count.inc()

        # Record processing time metric
        end_time = time.time()
        self.processing_time.update(int((end_time - start_time) * 1000))  # in milliseconds

        # Measure CPU and memory usage after processing
        cpu_after = psutil.cpu_percent()
        memory_after = psutil.virtual_memory().percent

        # Update CPU and memory usage metrics
        self.cpu_usage.set(cpu_after)
        self.memory_usage.set(memory_after)

def run(argv=None):
    # Define your pipeline options
    pipeline_options = beam.pipeline.PipelineOptions(argv)

    # Create a pipeline
    with beam.Pipeline(options=pipeline_options) as p:
        # Read the CSV file and create a subset of 100 elements
        subset_lines = (
            p
            | 'Read CSV (Last 15000)' >> beam.io.ReadFromText('preprocessed_data.csv')
            | 'Sample 15000' >> beam.transforms.combiners.Sample.FixedSizeGlobally(15001)
        )

        # Apply the ExtractAndProcess transform to the subset
        results = (
            subset_lines
            | 'Extract and Process' >> beam.ParDo(ExtractAndProcess())
            | 'Write Output' >> beam.io.WriteToText('output15000')
        )

    # Retrieve and print metrics
    result = p.run()

    # Accessing metrics directly without querying
    element_count = result.metrics().query(MetricsFilter().with_name('element_count'))
    processing_time = result.metrics().query(MetricsFilter().with_name('processing_time'))
    cpu_usage = result.metrics().query(MetricsFilter().with_name('cpu_usage'))
    memory_usage = result.metrics().query(MetricsFilter().with_name('memory_usage'))

    # Print the metrics
    if element_count:
        print(f"Total Element Count: {element_count['counters'][0].committed}")
    if processing_time:
        print(f"Average Processing Time: {processing_time['distributions'][0].committed.mean} milliseconds")
    if cpu_usage:
        print(f"CPU Usage: {cpu_usage['gauges'][0].committed.value:.2f}%")
    if memory_usage:
        print(f"Memory Usage: {memory_usage['gauges'][0].committed.value:.2f}%")

if __name__ == '__main__':
    run()




Total Element Count: 15000
Average Processing Time: 228.0 milliseconds
CPU Usage: 17.00%
Memory Usage: 64.00%
