## Beaming up to the Flow

This notebook contains a step-by-step guide to create a <b>streaming</b> data pipline that engineers time-based features for a fraud detection system in real-time.


<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?name=create-time-based-features&download_url=https://raw.githubusercontent.com/iamthuya/apache-beam-notebooks/main/beaming-up-to-the-flow/create_time_based_features.ipynb" target=”_blank”>
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/iamthuya/apache-beam-notebooks/blob/main/beaming-up-to-the-flow/create_time_based_features.ipynb" target=”_blank”>
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/iamthuya/apache-beam-notebooks/blob/main/beaming-up-to-the-flow/create_time_based_features.ipynb" target=”_blank”>
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">View on GitHub
    </a>
  </td>
</table>

### Overview


We will first use [Apache Beam](https://beam.apache.org/) to create a data pipeline that works with a batch of data. After that we will change the input source to pulls the streaming data directly from a [Pub/Sub](https://cloud.google.com/pubsub) topic. Once the pipeline is working locally, we will deploy it to the [Dataflow](https://cloud.google.com/dataflow).

As for the features, we will be creating
- customer spending features (last 15-mins, 30-mins, and 60-mins)
- terminal transacting features (last 15-mins, 30-mins, and 60-mins)

Note: Before creating the features, an exploratory data analysis (EDA) should be performed on the data to understand the statistics and the corrlations better. We won't be covering the EDA part in this notebook as it is considered out of scope.

### Dataset

The dataset is synthesized by using the code from [Machine Learning for Credit Card Fraud Detection - Practical Handbook project from Kaggle](https://github.com/Fraud-Detection-Handbook/fraud-detection-handbook).

### Objective

In the following notebook, you will learn to:

- Use Apache Beam to create data pipelines
- Apply Apache Beam's windowing and aggreation functions to create features
- Deploy the Apache Beam pipelines to Dataflow

## Setting up the environment

### Enlarge the display

First, let's enlarge the jupyter notebook for viewing pleasure

In [None]:
from IPython.display import display, HTML

display(HTML("<style>.container { width:95% !important; }</style>"))

### Install additional packages

Before we get started, let's install the required packages to execute this notebook.

In [None]:
!pip install -q apache-beam[gcp] pandas --upgrade

### Restart runtime

Restarting the Jupyter Kernel is necessary to reflect the newly installed packages in current runtime.

In [None]:
import IPython

IPython.Application.instance().kernel.do_shutdown(True) # automatically restarts kernel

## Creating data pipelines


### Import libraries

Importing all the requied libraries for the later part of this notebook

In [None]:
import json
import time

from typing import Tuple, Any, List

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import CountCombineFn, MeanCombineFn

### Define constant variables

Constant variables are defined here to be used in the later part of this notebook

In [None]:
MINUTES_WINDOWS = [15, 30, 60]
SECONDS_WINDOWS = [x * 60 for x in sorted(MINUTES_WINDOWS)]
WINDOW_SIZE = SECONDS_WINDOWS[-1]  # the largest window
WINDOW_PERIOD = 1 * 60  # 1 minute

### Defining auxiliary functions and classes

Here we define auxiliary functions and classes for data transformation to be used in the pipelines later

In [None]:
def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:
    """Converts a time string into Unix time."""
    time_tuple = time.strptime(time_str, time_format)
    return int(time.mktime(time_tuple))


class PrintElementInfo(beam.DoFn):
    """Prints an element with its information."""
    def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        print(f"{element}\n")
        yield (element,)


class AddAddtionalInfo(beam.DoFn):
    """Add composite key and difference from window end timestamp to element"""
    def process(self, element: Tuple, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam) -> Tuple:
        window_end_dt = window.end.to_utc_datetime().strftime("%Y%m%d%H%M%S")
        new_element = {
            'TX_ID': element['TX_ID'],
            'TX_TS': element['TX_TS'],                                                                                                                                                                                                                                      
            'CUSTOMER_ID': element['CUSTOMER_ID'],
            'TERMINAL_ID': element['TERMINAL_ID'],
            'TX_AMOUNT': element['TX_AMOUNT'],
            'TX_FRAUD': element['TX_FRAUD'],
            'CID_COMPOSITE_KEY': f"{element['CUSTOMER_ID']}_{window_end_dt}",
            'TID_COMPOSITE_KEY': f"{element['TERMINAL_ID']}_{window_end_dt}",
            'TS_DIFF': window.end - timestamp
        }
        return (new_element,)

    
class Reformat(beam.DoFn):
    """Reformat the element such that it appears as one dictionary"""
    def process(self, element: Tuple) -> Tuple:
        new_records = element[1]['new_records']
        aggregated = element[1]['aggregated_customer_id'][0]
        
        for row in new_records:
            new_element = {
                'TX_ID': row.TX_ID,
                'TX_TS': row.TX_TS,
                'CUSTOMER_ID': row.CUSTOMER_ID,
                'TERMINAL_ID': row.TERMINAL_ID,
                'TX_AMOUNT': row.TX_AMOUNT,
                'TX_FRAUD': row.TX_FRAUD,
                'CID_NUM_TX_15MIN': aggregated.CID_NUM_TX_15MIN,
                'CID_AVG_AMOUNT_15MIN': aggregated.CID_SUM_AMOUNT_15MIN / aggregated.CID_NUM_TX_15MIN,
                'CID_NUM_TX_30MIN': aggregated.CID_NUM_TX_30MIN,
                'CID_AVG_AMOUNT_30MIN': aggregated.CID_SUM_AMOUNT_30MIN / aggregated.CID_NUM_TX_30MIN,
                'CID_NUM_TX_60MIN': aggregated.CID_NUM_TX_60MIN,
                'CID_AVG_AMOUNT_60MIN': aggregated.CID_AVG_AMOUNT_60MIN,
            }
            yield (new_element,)


## Working with batch data

We start by creating a dummy data that ressemble the actual data. The reason for doing this is that Apache Beam works the same for both batch and streaming data and it is easier to debug with batch data.

In [None]:
dummy_data = [
    {'TX_ID': '01', 'TX_TS': '2022-08-04 08:36:00', 'CUSTOMER_ID': 'A1', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 10.00, 'TX_FRAUD': 0},
    {'TX_ID': '02', 'TX_TS': '2022-08-04 08:36:30', 'CUSTOMER_ID': 'B2', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 20.00, 'TX_FRAUD': 0},
    {'TX_ID': '03', 'TX_TS': '2022-08-04 08:37:00', 'CUSTOMER_ID': 'A1', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 10.00, 'TX_FRAUD': 0},
    {'TX_ID': '04', 'TX_TS': '2022-08-04 08:37:30', 'CUSTOMER_ID': 'B2', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 20.00, 'TX_FRAUD': 0},
    {'TX_ID': '05', 'TX_TS': '2022-08-04 08:38:00', 'CUSTOMER_ID': 'A1', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 10.00, 'TX_FRAUD': 0},
    {'TX_ID': '06', 'TX_TS': '2022-08-04 08:38:30', 'CUSTOMER_ID': 'B2', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 20.00, 'TX_FRAUD': 0},
    {'TX_ID': '07', 'TX_TS': '2022-08-04 08:39:00', 'CUSTOMER_ID': 'A1', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 10.00, 'TX_FRAUD': 0},
    {'TX_ID': '08', 'TX_TS': '2022-08-04 08:39:30', 'CUSTOMER_ID': 'B2', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 20.00, 'TX_FRAUD': 0},
    {'TX_ID': '09', 'TX_TS': '2022-08-04 08:40:00', 'CUSTOMER_ID': 'A1', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 10.00, 'TX_FRAUD': 0},
    {'TX_ID': '10', 'TX_TS': '2022-08-04 08:40:30', 'CUSTOMER_ID': 'B2', 'TERMINAL_ID': 'T4', 'TX_AMOUNT': 20.00, 'TX_FRAUD': 0},
]

Here we define pipeline options to pass it to our data pipeline. `DirectRunner` means running locally. Let's check whether loading the dummy data is working.

In [None]:
# define pipeline options
local_batch_options = PipelineOptions(flags=[], 
                                      type_check_additional='all',
                                      save_main_session=True,
                                      runner='DirectRunner')

# run the pipeline
with beam.Pipeline(options=local_batch_options) as pipeline:
    source = (
        pipeline
        | 'Create dummy data' >> beam.Create(dummy_data)
        | 'Print elements' >> beam.ParDo(PrintElementInfo())
    )

Let's enrich the data by adding timestamps and windows information while slicing the data into defined sliding windows. The results are converted to Row data type so that it can be aggregated easily later.

In [None]:
with beam.Pipeline(options=local_batch_options) as pipeline:
    source = (
        pipeline
        | 'Create dummy data' >> beam.Create(dummy_data)
    )
    
    enriched_source = (
        source
        | 'Attach timestamps' >> beam.Map(lambda row: beam.window.TimestampedValue(row, to_unix_time(row['TX_TS'])))
        | 'Create sliding window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_SIZE, WINDOW_PERIOD, offset=WINDOW_SIZE))
        | 'Add window info' >> beam.ParDo(AddAddtionalInfo())
        | 'Convert to namedtuple' >> beam.Map(lambda row: beam.Row(**row))
        | 'Print elements' >> beam.ParDo(PrintElementInfo())
    )

Now we are ready to create requied features. We will make use of `GroupBy` and `aggregate_field` functions to perform aggregations. Rows are selected based on the window sizes  defined earlier and the time difference with window end time of each window.

In [None]:
with beam.Pipeline(options=local_batch_options) as pipeline:
    source = (
        pipeline
        | 'Create dummy data' >> beam.Create(dummy_data)
    )
    
    enriched_source = (
        source
        | 'Attach timestamps' >> beam.Map(lambda row: beam.window.TimestampedValue(row, to_unix_time(row['TX_TS'])))
        | 'Create sliding window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_SIZE, WINDOW_PERIOD, offset=WINDOW_SIZE))
        | 'Add window info' >> beam.ParDo(AddAddtionalInfo())
        | 'Convert to namedtuple' >> beam.Map(lambda row: beam.Row(**row))
    )
    
    aggregated_customer_id = (
        enriched_source
        | 'Group by customer id composite key column' >> beam.GroupBy(CID_COMPOSITE_KEY='CID_COMPOSITE_KEY')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum, 'CID_NUM_TX_15MIN')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum,'CID_SUM_AMOUNT_15MIN')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CID_NUM_TX_30MIN')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CID_SUM_AMOUNT_30MIN')
            .aggregate_field('TX_ID', CountCombineFn(), 'CID_NUM_TX_60MIN')
            .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'CID_AVG_AMOUNT_60MIN')
        | 'Assign key for aggregated results (customer id)' >> beam.WithKeys(lambda row: row.CID_COMPOSITE_KEY)
        | 'Print elements' >> beam.ParDo(PrintElementInfo())
    )

Since we are only moving by 1 minute window period, we need to get a slice of those new data within the 1 minute window period and join them with the aggregate features.

In [None]:
with beam.Pipeline(options=local_batch_options) as pipeline:
    source = (
        pipeline
        | 'Create dummy data' >> beam.Create(dummy_data)
    )
    
    enriched_source = (
        source
        | 'Attach timestamps' >> beam.Map(lambda row: beam.window.TimestampedValue(row, to_unix_time(row['TX_TS'])))
        | 'Create sliding window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_SIZE, WINDOW_PERIOD, offset=WINDOW_SIZE))
        | 'Add window info' >> beam.ParDo(AddAddtionalInfo())
        | 'Convert to namedtuple' >> beam.Map(lambda row: beam.Row(**row))
    )
    
    aggregated_customer_id = (
        enriched_source
        | 'Group by customer id composite key column' >> beam.GroupBy(CID_COMPOSITE_KEY='CID_COMPOSITE_KEY')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum, 'CID_NUM_TX_15MIN')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum,'CID_SUM_AMOUNT_15MIN')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CID_NUM_TX_30MIN')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CID_SUM_AMOUNT_30MIN')
            .aggregate_field('TX_ID', CountCombineFn(), 'CID_NUM_TX_60MIN')
            .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'CID_AVG_AMOUNT_60MIN')
        | 'Assign key for aggregated results (customer id)' >> beam.WithKeys(lambda row: row.CID_COMPOSITE_KEY)
    )
    
    new_records = (
        enriched_source
        | 'Filter only new elements' >> beam.Filter(lambda row: row.TS_DIFF <= WINDOW_PERIOD)
        | 'Assign key for new elements' >> beam.WithKeys(lambda row: row.CID_COMPOSITE_KEY)
    )

    result = (
        ({
            'new_records': new_records, 
            'aggregated_customer_id': aggregated_customer_id
        })
        | 'Merge pcollections (customer id)' >> beam.CoGroupByKey()
        | 'Filter empty rows (customer id)' >> beam.Filter(lambda row: len(row[1]['new_records']) > 0)
        | 'Reformat elements (customer id)' >> beam.ParDo(Reformat())
        | 'Print elements' >> beam.ParDo(PrintElementInfo())
    )

## Setting up Google Cloud project

- Step 1: Go to [Google Cloud Console](https://console.cloud.google.com/projectcreate) and create a project. Select the created project.

- Step 2: Activate [Cloud Shell](https://console.cloud.google.com/?cloudshell=true) and enter the following commands: (Authorize it if prompted)
```
gcloud services enable pubsub dataflow
gcloud pubsub subscriptions create "ff-txlabels-sub" --topic="ff-txlabels" --topic-project="cymbal-fraudfinder"
export PROJECT_ID=$(gcloud config get-value project)
gsutil mb -c STANDARD -l US gs://${PROJECT_ID}
```

- Step 3: Create a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). (Choose json format and download it).

- Step 4: Go to the [IAM Admin](https://console.cloud.google.com/iam-admin). Click on `ADD`. Put your service account email address as `New Principals` and select `Owner` as role. Click `Save`.

### Set Google Application Credentials

If the steps from previous cell are successful, you should have a json file. Assign it as `GOOGLE_APPLICATION_CREDENTIALS` variable and you are all set.

In [None]:
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path-to-your-service-account-key.json" # update this value with yours

### Define Google Cloud variables

Google Cloud variables are defined here to be used in the later part of this notebook. You can find these from the json file that you just downloaded.

In [None]:
PROJECT_ID = 'your-project-id'  # update this value with yours
REGION = 'us-central1'  # update this value with yours
SUBSCRIPTION_NAME = "ff-txlabels-sub"
SUBSCRIPTION_PATH = f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_NAME}"

## Working with streaming data


### Reading sample messages from the Pub/Sub topic

Pulling sample messages from the pub/sub topic to verfiy whether the upstream is working as intended

In [None]:
import ast

from google.api_core import retry
from google.cloud import pubsub_v1

def read_from_sub(project_id, subscription_name, messages=10):
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_name)

    # Wrap the subscriber in a 'with' block to automatically call close() to
    # close the underlying gRPC channel when done.
    with subscriber:
        # The subscriber pulls a specific number of messages. The actual
        # number of messages pulled may be smaller than max_messages.
        response = subscriber.pull(
            subscription=subscription_path,
            max_messages=messages,
            retry=retry.Retry(deadline=300),
        )

        if len(response.received_messages) == 0:
            print("no messages")
            return

        ack_ids = []
        msg_data = []
        for received_message in response.received_messages:
            msg = ast.literal_eval(received_message.message.data.decode('utf-8'))
            msg_data.append(msg)
            ack_ids.append(received_message.ack_id)

        # Acknowledges the received messages so they will not be sent again.
        subscriber.acknowledge(subscription=subscription_path, ack_ids=ack_ids)

        print(
            f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
        )

        return msg_data


messages_tx = read_from_sub(project_id=PROJECT_ID,
                            subscription_name=SUBSCRIPTION_NAME,
                            messages=5)

messages_tx[:5]

### Running streaming pipeline using DirectRunner

Here we just need to add one more option to the PipelineOptions (`streaming=True`) and it will work seamlessly. 

In [None]:
local_stream_options = PipelineOptions(flags=[], 
                                       type_check_additional='all',
                                       save_main_session=True,
                                       runner='DirectRunner', 
                                       streaming=True)

with beam.Pipeline(options=local_stream_options) as pipeline:
    source = (
        pipeline
        | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION_PATH)
        | 'Decode byte array to json dict' >> beam.Map(lambda row: json.loads(row.decode('utf-8')))
    )
    
    enriched_source = (
        source
        | 'Attach timestamps' >> beam.Map(lambda row: beam.window.TimestampedValue(row, to_unix_time(row['TX_TS'])))
        | 'Create sliding window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_SIZE, WINDOW_PERIOD, offset=WINDOW_SIZE))
        | 'Add window info' >> beam.ParDo(AddAddtionalInfo())
        | 'Convert to namedtuple' >> beam.Map(lambda row: beam.Row(**row))
    )
    
    aggregated_customer_id = (
        enriched_source
        | 'Group by customer id composite key column' >> beam.GroupBy(CID_COMPOSITE_KEY='CID_COMPOSITE_KEY')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum, 'CID_NUM_TX_15MIN')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum,'CID_SUM_AMOUNT_15MIN')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CID_NUM_TX_30MIN')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CID_SUM_AMOUNT_30MIN')
            .aggregate_field('TX_ID', CountCombineFn(), 'CID_NUM_TX_60MIN')
            .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'CID_AVG_AMOUNT_60MIN')
        | 'Assign key for aggregated results (customer id)' >> beam.WithKeys(lambda row: row.CID_COMPOSITE_KEY)

    )
    
    new_records = (
        enriched_source
        | 'Filter only new elements' >> beam.Filter(lambda row: row.TS_DIFF <= WINDOW_PERIOD)
        | 'Assign key for new elements' >> beam.WithKeys(lambda row: row.CID_COMPOSITE_KEY)
    )

    result = (
        ({
            'new_records': new_records, 
            'aggregated_customer_id': aggregated_customer_id
        })
        | 'Merge pcollections (customer id)' >> beam.CoGroupByKey()
        | 'Filter empty rows (customer id)' >> beam.Filter(lambda row: len(row[1]['new_records']) > 0)
        | 'Reformat elements (customer id)' >> beam.ParDo(Reformat())
        | 'Print elements' >> beam.ParDo(PrintElementInfo())
    )

### Running streaming pipeline using DataflowRunner

Deploying the pipeline to Dataflow is quite straightforward. We just need to change the `runner='DirectRunner'` to `runner='DataflowRunner'` and add Google Cloud specific options such as `project`, `region`, and `temp_location`.

In [None]:
dataflow_stream_options = PipelineOptions(flags=[],
                                          type_check_additional='all',
                                          save_main_session=True,
                                          runner='DataflowRunner',
                                          streaming=True,
                                          project=PROJECT_ID,
                                          region=REGION,
                                          temp_location=f"gs://{PROJECT_ID}/tmp")

with beam.Pipeline(options=dataflow_stream_options) as pipeline:
    source = (
        pipeline
        | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION_PATH)
        | 'Decode byte array to json dict' >> beam.Map(lambda row: json.loads(row.decode('utf-8')))
    )
    
    enriched_source = (
        source
        | 'Attach timestamps' >> beam.Map(lambda row: beam.window.TimestampedValue(row, to_unix_time(row['TX_TS'])))
        | 'Create sliding window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_SIZE, WINDOW_PERIOD, offset=WINDOW_SIZE))
        | 'Add window info' >> beam.ParDo(AddAddtionalInfo())
        | 'Convert to namedtuple' >> beam.Map(lambda row: beam.Row(**row))
    )
    
    aggregated_customer_id = (
        enriched_source
        | 'Group by customer id composite key column' >> beam.GroupBy(CID_COMPOSITE_KEY='CID_COMPOSITE_KEY')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum, 'CID_NUM_TX_15MIN')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[0] else 0, sum,'CID_SUM_AMOUNT_15MIN')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CID_NUM_TX_30MIN')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= SECONDS_WINDOWS[1] else 0, sum, 'CID_SUM_AMOUNT_30MIN')
            .aggregate_field('TX_ID', CountCombineFn(), 'CID_NUM_TX_60MIN')
            .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'CID_AVG_AMOUNT_60MIN')
        | 'Assign key for aggregated results (customer id)' >> beam.WithKeys(lambda row: row.CID_COMPOSITE_KEY)

    )
    
    new_records = (
        enriched_source
        | 'Filter only new elements' >> beam.Filter(lambda row: row.TS_DIFF <= WINDOW_PERIOD)
        | 'Assign key for new elements' >> beam.WithKeys(lambda row: row.CID_COMPOSITE_KEY)
    )

    result = (
        ({
            'new_records': new_records, 
            'aggregated_customer_id': aggregated_customer_id
        })
        | 'Merge pcollections (customer id)' >> beam.CoGroupByKey()
        | 'Filter empty rows (customer id)' >> beam.Filter(lambda row: len(row[1]['new_records']) > 0)
        | 'Reformat elements (customer id)' >> beam.ParDo(Reformat())
        | 'Print elements' >> beam.ParDo(PrintElementInfo())
    )

Congrats! Now the job should be running on [Dataflow](https://console.cloud.google.com/dataflow/jobs)!