In [1]:
%env PROJECT_ID = my-project
%env GCP_REGION = "EU"
%env STAGING_LOCATION = "gs://my-bucket/pos/stg"
%env TEMP_LOCATION = "gs://my-bucket/pos/temp"
%env TOPIC_NAME = my_topic
%env DATASET_ID = dataset_id
%env ACTIVITY_TABLE = activity_table
%env LINE_TABLE = line_table
import time
import apache_beam as beam
import typing
from apache_beam.utils.timestamp import Timestamp
import json
from datetime import datetime
import logging
import random
from uuid6 import uuid7 as uuid
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys, FlatMap, Map
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows, GlobalWindows
from apache_beam.runners import DataflowRunner, DirectRunner
from apache_beam.options.pipeline_options import GoogleCloudOptions,PipelineOptions,StandardOptions
import apache_beam.runners.interactive.interactive_beam as ib
import os
# from apache_beam.runners.interactive.interactive_runner import InteractiveRunner


ib.options.recording_duration = '10m'
# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

class LineLog(typing.NamedTuple):
    id:	                int = None
    product_id:	        int = None
    quantity:	        float = None
    price:	            float = None
    custom2:	        str = ' '
    custom3:	        str = ' '
    custom4:	        str = ' '
    custom5:	        int = ' '
    custom6:	        str = ' '
    custom7:	        str = ' '
    custom8:	        str = ' '
    custom9:	        str = ' '
    created_at:	        str = Timestamp
    updated_at:	        bool = Timestamp
    size:	            str = None
    color:	            str = None
    discount:	        float = None
    discount_type:	    str = None
    product_code:	    str = None
    total:	            float = None
    company_id:	        int = None
    activity_code:	    str = None
    weight:	            int = None
    description:	    str = None
    pending_code_sync:	bool = None
    total_cost:	        float = None
    total_tax:	        float = None
    product_ean:	    str = None

class ActivityLog(typing.NamedTuple):
    id:	                int = None
    customer_id:	    int = None
    location_id:	    int = None
    company_id:	        int = None
    created_at:	        Timestamp = None
    updated_at:	        Timestamp = None
    name:	            str = None
    day_week:	        int = None
    session:	        Timestamp = None
    total:	            float = None
    code:	            str = None
    processed:	        bool = None
    returned:	        bool = None
    barcode:	        str = None
    customer_code:	    str = None
    location_code:	    str = None
    staff_code:	        str = None
    custom1:	        str = ' '
    pending_code_sync:	bool = None
    points:	            int = None
    returned_at:	    str = None
    synchronised_at:	Timestamp = None
    profile_id:	        int = None
    custom2:	        str = ' '
    custom3:	        str = ' '
    custom4:	        str = ' '
    custom5:	        str = ' '
    custom6:	        str = ' '
    custom7:	        str = ' '
    custom8:	        str = ' '
    total_tax:	        float = None
    associate_id:	    float = None
    ingest_datetime:	Timestamp = None
    uuid:	        	str = None

beam.coders.registry.register_coder(LineLog, beam.coders.RowCoder)
beam.coders.registry.register_coder(ActivityLog, beam.coders.RowCoder)
lineKeys = list(filter(lambda key: not key.startswith("_"),LineLog.__dict__.keys()))
activityKeys = list(filter(lambda key: not key.startswith("_"),ActivityLog.__dict__.keys()))

with open ('schemas/activity_table_schema.json') as schema:
    activity_table_schema = json.load(schema)
    activity_table_schema=','.join([f'{field["name"]}:{field["type"]}' for field in activity_table_schema['fields']])


with open ('schemas/lines_table_schema.json') as schema:
    lines_table_schema = json.load(schema)
    lines_table_schema=','.join([f'{field["name"]}:{field["type"]}' for field in lines_table_schema['fields']])

additional_bq_parameters = {
  'timePartitioning': {'field':'created_at','type': 'DAY'},
#   'clustering': {'fields': ['country']}
  }

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """
    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards
    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals" >> WindowInto(FixedWindows(self.window_size))
            # | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
        )


class MapLineLog(beam.DoFn):
    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        logging.getLogger().setLevel(logging.INFO)
        """Processes each incoming windowed element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.
        """
        try:
            lines = json.loads(element.decode("utf-8"))['lines']
            for line in lines:
                # drop keys not in LinesLog class
                popKeys = set(filter(lambda key: key not in lineKeys,line.keys()))
                for i in range(1,9):
                    line[f'custom{i}'] = ' ' or str(line[f'custom{i}'])
                message = dict([(k, v) for k,v in line.items() if k not in popKeys])
                addKeys = set(filter(lambda key: key not in line.keys(),lineKeys))
                # message.update(dict.fromkeys(addKeys))  
                yield LineLog(**{**message,**dict.fromkeys(addKeys)})#._asdict()
        except (ValueError, AttributeError) as e:
            # logging.info(f"[Invalid Data] ({e}) - {element}")
            pass

class MapActivityLog(beam.DoFn):
    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        logging.getLogger().setLevel(logging.INFO)
        """Processes each incoming windowed element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.
        """
        try:
            message = json.loads(element.decode("iso-8859-1"))
            message["ingest_datetime"] = datetime.utcfromtimestamp(
                time.time()
            ).strftime("%Y-%m-%d %H:%M:%S.%f")
            message['uuid'] = str(uuid())
            # drop keys not in ActivityLog class
            popKeys = set(filter(lambda key: key not in activityKeys,message.keys()))
            for i in range(1,9):
                message[f'custom{i}'] = ' ' or str(message[f'custom{i}'])
            message = dict([(k, v) for k,v in message.items() if k not in popKeys])
            addKeys = set(filter(lambda key: key not in message.keys(),activityKeys))
            # message.update(dict.fromkeys(addKeys))  
            yield ActivityLog(**{**message,**dict.fromkeys(addKeys)})#._asdict()
        except (ValueError, AttributeError) as e:
            logging.info(f"[Invalid Data] ({e}) - {element}")
            pass


options = PipelineOptions(save_main_session=True, streaming=True)

project = os.getenv('PROJECT_ID')
region = os.getenv('GCP_REGION')
staging_location = os.getenv('STAGING_LOCATION')
temp_location = os.getenv('TEMP_LOCATION')
topic_name = os.getenv('TOPIC_NAME')
dataset_id = os.getenv('DATASET_ID')
activity_table = os.getenv('ACTIVITY_TABLE')
line_table = os.getenv('LINE_TABLE')

options.view_as(GoogleCloudOptions).project = project
options.view_as(GoogleCloudOptions).region = region
options.view_as(GoogleCloudOptions).staging_location = staging_location
options.view_as(GoogleCloudOptions).temp_location = temp_location
options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('pos-minute-pipeline-',time.time_ns())
# https://beam.apache.org/releases/pydoc/2.43.0/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.StandardOptions.ALL_KNOWN_RUNNERS
options.view_as(StandardOptions).runner = 'InteractiveRunner'
input_topic=f'projects/{project}/topics/{topic_name}'
activity_table = f'{dataset_id}.{activity_table}'
line_table = f'{dataset_id}.{line_table}'

output_path=None
window_size=0.2
num_shards=5

# pipeline = Pipeline(options=options)
with Pipeline(options=options) as pipeline:
    messages = pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic).with_output_types(bytes)#,id_label=)
    windowed_lines = (
        messages
        # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
        # binds the publish time returned by the Pub/Sub server for each message
        # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
        # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
        | "Map to LineLog" >> ParDo(MapLineLog().with_output_types(LineLog))
        | f"Window lines into {window_size} minutes" >> GroupMessagesByFixedWindows(window_size, num_shards)
    )
    (
        windowed_lines 
        # | "Re-window" >> beam.WindowInto(GlobalWindows())
        # | "Combine-window" >> beam.CombineGlobally(json.dumps)
        | 'Flatten line lists' >> FlatMap(lambda elements: elements)
        | 'Line serialization' >> Map(lambda element: element._asdict())
        # | "Print" >> Map(print)
        | "Write Line To BigQuery" >> io.WriteToBigQuery(
            line_table,
            schema=lines_table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            additional_bq_parameters=additional_bq_parameters
        )
    )
    windowed_activities = (
            messages
            | "Map to Activity" >> ParDo(MapActivityLog().with_output_types(ActivityLog))
            | f"Window activities into {window_size} minutes" >> GroupMessagesByFixedWindows(window_size, num_shards)

        )
    (
        windowed_activities 
        # | "Re-window" >> beam.WindowInto(GlobalWindows())
        | 'Flatten activity lists' >> FlatMap(lambda elements: elements)
        | 'Activity serialization' >> Map(lambda element: element._asdict())
        # | "Print" >> Map(print)
        | "Write Activity To BigQuery" >> io.WriteToBigQuery(
            activity_table,
            schema=activity_table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            additional_bq_parameters=additional_bq_parameters
        )
    )
    ib.watch(locals())
    # ib.collect(messages)
    # ib.collect(windowed_lines)
    # ib.collect(windowed_activities)
    ib.show_graph(pipeline)
    # ib.show(windowed_activities, include_window_info=True, n=100, duration=60)

env: GOOGLE_CLOUD_PROJECT=streaming-west




/opt/homebrew/Caskroom/mambaforge/base/envs/beam/bin/dot


