In [7]:
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
import google.auth
from datetime import datetime, timezone

# Function to print human-readable window information
def human_readable_window(window) -> str:
    if isinstance(window, beam.window.GlobalWindow):
        return str(window)
    return f'{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}'

# PTransform function to print window information
@beam.ptransform_fn
def PrintWindowInfo(pcollection):
    class PrintCountsInfo(beam.DoFn):
        def process(self, num_elements, window=beam.DoFn.WindowParam):
            logging.info(f'Window [{human_readable_window(window)}] has {num_elements} elements')
            yield num_elements

    return (
        pcollection
        | 'Count elements per window' >> beam.combiners.Count.Globally().without_defaults()
        | 'Print counts info' >> beam.ParDo(PrintCountsInfo())
    )

# BigQuery query
query = """
    SELECT DISTINCT trip_id, start_station_id, start_time
    FROM bigquery-public-data.austin_bikeshare.bikeshare_trips
    WHERE DATE(start_time) = '2023-01-01'
    ORDER BY start_station_id
    LIMIT 100;
"""

# Pipeline options setup
options = PipelineOptions()
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
gcs_location = 'gs://sandeep-apache'
options.view_as(GoogleCloudOptions).temp_location = f'{gcs_location}/temp'

# Create pipeline with InteractiveRunner
p = beam.Pipeline(InteractiveRunner(), options=options)

# Read from BigQuery and process data
table_row = p | 'ReadTable' >> ReadFromBigQuery(query=query, use_standard_sql=True)

# Map the BigQuery rows to a dictionary format
trips = table_row | 'Trip Data' >> beam.Map(lambda x: {
    'trip_id': x['trip_id'],
    'start_station_id': x['start_station_id'],
    'start_time': x['start_time'].replace(tzinfo=timezone.utc).timestamp()  # Convert to Unix timestamp
})

# Assign timestamps to the data based on 'start_time'
trips_time_stamp = trips | 'With Timestamps' >> beam.Map(lambda bikes: beam.window.TimestampedValue(bikes, bikes['start_time']))

# Define fixed windows of 5 minutes for data processing
trips_window = trips_time_stamp | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5 * 60))

# Apply the PTransform function PrintWindowInfo to print window information
printWindowinfo = trips_window | 'Print window info' >> PrintWindowInfo()

# Show the results in an interactive Beam environment (assuming 'ib' is defined elsewhere)
ib.show(printWindowinfo)






In [8]:
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
import google.auth
from datetime import datetime, timezone

# Function to print human-readable window information
def human_readable_window(window) -> str:
    if isinstance(window, beam.window.GlobalWindow):
        return str(window)
    return f'{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}'

# PTransform function to print window information
@beam.ptransform_fn
def PrintWindowInfo(pcollection):
    class PrintCountsInfo(beam.DoFn):
        def process(self, num_elements, window=beam.DoFn.WindowParam):
            logging.info(f'Window [{human_readable_window(window)}] has {num_elements} elements')
            yield num_elements

    return (
        pcollection
        | 'Count elements per window' >> beam.combiners.Count.Globally().without_defaults()
        | 'Print counts info' >> beam.ParDo(PrintCountsInfo())
    )

# BigQuery query
query = """
    SELECT DISTINCT trip_id, start_station_id, start_time
    FROM bigquery-public-data.austin_bikeshare.bikeshare_trips
    WHERE DATE(start_time) = '2023-01-01'
    ORDER BY start_station_id
    LIMIT 100;
"""

# Pipeline options setup
options = PipelineOptions()
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
gcs_location = 'gs://sandeep-apache'
options.view_as(GoogleCloudOptions).temp_location = f'{gcs_location}/temp'

# Create pipeline with InteractiveRunner
p = beam.Pipeline(InteractiveRunner(), options=options)

# Read from BigQuery and process data
table_row = p | 'ReadTable' >> ReadFromBigQuery(query=query, use_standard_sql=True)

# Map the BigQuery rows to a dictionary format
trips = table_row | 'Trip Data' >> beam.Map(lambda x: {
    'trip_id': x['trip_id'],
    'start_station_id': x['start_station_id'],
    'start_time': x['start_time'].replace(tzinfo=timezone.utc).timestamp()  # Convert to Unix timestamp
})

# Assign timestamps to the data based on 'start_time'
trips_time_stamp = trips | 'With Timestamps' >> beam.Map(lambda bikes: beam.window.TimestampedValue(bikes, bikes['start_time']))

# Define sliding windows of 5 minutes for data processing
trips_window = trips_time_stamp | 'Window' >> beam.WindowInto(beam.window.SlidingWindows(300,150))

# Apply the PTransform function PrintWindowInfo to print window information
printWindowinfo = trips_window | 'Print window info' >> PrintWindowInfo()

# Show the results in an interactive Beam environment (assuming 'ib' is defined elsewhere)
ib.show(printWindowinfo)








