In [6]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_postgres import splitters
from beam_postgres.io import ReadFromPostgres
from datetime import datetime

class FilterWeekends(beam.DoFn):
    def process(self, element):
        # Get the datetime from the element
        dt = element['datetime']
        
        # Check if the day of the week is a weekend (5 or 6 are Saturday and Sunday)
        if dt.weekday() >= 5:
            yield element

class FilterWeekdays(beam.DoFn):
    def process(self, element):
        # Get the datetime from the element
        dt = element['datetime']
        
        # Check if the day of the week is a weekday (0 to 4 are Monday to Friday)
        if dt.weekday() < 5:
            yield element

class FormatData(beam.DoFn):
    def process(self, element):
        formatted_element = {
            'id': element['id'],
            'datetime': element['datetime'].strftime('%Y-%m-%d %H:%M:%S UTC'),  # Format datetime as UTC string
            'street_time': element['street_time'],
            'count': element['count'],
            'velocity': format(element['velocity'], '.2f')  # Format velocity to two decimal places
        }
        yield formatted_element

with beam.Pipeline(options=PipelineOptions()) as p:
    read_from_postgres = ReadFromPostgres(
            query="SELECT * FROM public.fifteen_minutes;",
            host="localhost",
            database="test_db",
            user="postgres",
            password="postgres",
            splitter=splitters.NoSplitter()  # you can select how to split query for performance
    )

    # Read data from PostgreSQL and store in a PCollection named 'data'
    data = p | "ReadFromPostgres" >> read_from_postgres

    # Filter data for weekend and weekday separately
    weekend_data = data | "FilterWeekends" >> beam.ParDo(FilterWeekends())
    weekday_data = data | "FilterWeekdays" >> beam.ParDo(FilterWeekdays())

    # Format the data for friendliness
    formatted_weekend_data = weekend_data | "FormatWeekendData" >> beam.ParDo(FormatData())
    formatted_weekday_data = weekday_data | "FormatWeekdayData" >> beam.ParDo(FormatData())

    # Output formatted data to stdout or other downstream steps
    (
        formatted_weekend_data
        | "WriteWeekendToStdout" >> beam.Map(print)
        #| "Write weekend to file" >> beam.io.WriteToText('data/weekend_output')
        # Add more pipeline steps here
    )

    (
        formatted_weekday_data
        | "WriteWeekdayToStdout" >> beam.Map(print)
        #| "Write weekday to file" >> beam.io.WriteToText('data/weekday_output')
        # Add more pipeline steps here
    )


INFO:beam_postgres.client:Successfully execute query: EXPLAIN SELECT * FROM (SELECT * FROM public.fifteen_minutes) as subq
INFO:beam_postgres.client:Successfully execute query: SELECT * FROM public.fifteen_minutes


{'id': 2120983, 'datetime': '2019-01-31 21:15:00 UTC', 'street_time': 1266, 'count': 21, 'velocity': '82.10'}
{'id': 2120984, 'datetime': '2019-02-09 07:15:00 UTC', 'street_time': 1222, 'count': 23, 'velocity': '84.61'}
{'id': 2120985, 'datetime': '2019-01-04 06:00:00 UTC', 'street_time': 145, 'count': 2, 'velocity': '14.50'}
{'id': 2120986, 'datetime': '2019-02-12 12:30:00 UTC', 'street_time': 2188, 'count': 10, 'velocity': '33.50'}
{'id': 2120987, 'datetime': '2019-01-08 03:15:00 UTC', 'street_time': 470, 'count': 2, 'velocity': '9.50'}
{'id': 2120988, 'datetime': '2019-01-08 06:15:00 UTC', 'street_time': 2818, 'count': 65, 'velocity': '69.62'}
{'id': 2120989, 'datetime': '2019-01-08 07:45:00 UTC', 'street_time': 10, 'count': 1, 'velocity': '23.00'}
{'id': 2120990, 'datetime': '2019-02-14 16:15:00 UTC', 'street_time': 213, 'count': 38, 'velocity': '30.07'}
{'id': 2120991, 'datetime': '2019-01-18 07:30:00 UTC', 'street_time': 663, 'count': 74, 'velocity': '74.29'}
{'id': 2120992, 'dat