In [26]:
!pip install apache_beam



## **Dataset: Banking Transactions Data**
### File: banking_transactions.csv

* transaction_id: A unique identifier for the transaction.
* account_id: Identifier for the account making the transaction.
* transaction_type: Either "credit" or "debit".
* amount: Amount involved in the transaction.
* balance_after_transaction: Balance of the account after the transaction.
* timestamp: Time of the transaction.

### 1. Composite Transform using Apache Beam
For this example, let's create a composite transform that:
1.   Filters out only "debit" transactions.
2.   Calculates the total amount debited for each account.

In [27]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Filtering Debit Transactions
class FilterDebitTransactions(beam.PTransform):
    def expand(self, pcoll):
        return pcoll | beam.Filter(lambda record: record['transaction_type'] == 'debit')

# Map account with amount
class CalculateTotalDebitPerAccount(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Map Account and Amount' >> beam.Map(lambda record: (record['account_id'], record['amount']))
            | 'Sum Amounts' >> beam.CombinePerKey(sum)
        )

#
class DebitAnalysis(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Filter Debits' >> FilterDebitTransactions()
            | 'Total Debit per Account' >> CalculateTotalDebitPerAccount()
        )

In [28]:
options = PipelineOptions(flags=['--allow_unsafe_triggers'])

with beam.Pipeline(options=options) as p:
    transactions = (
        p
        | 'Read from CSV' >> beam.io.ReadFromText('banking_transactions.csv', skip_header_lines=1)
        | 'Parse CSV' >> beam.Map(lambda line: dict(zip(['transaction_id', 'account_id', 'transaction_type', 'amount', 'balance_after_transaction', 'timestamp'], line.split(','))))
        | 'Convert Amount to Float' >> beam.Map(lambda record: {**record, 'amount': float(record['amount'])})
        | 'Analyze Debits' >> DebitAnalysis()
        | 'Write Results' >> beam.io.WriteToText('debit_analysis.txt')
    )
print("Written results to debit_analysis.txt file")

Written results to debit_analysis.txt file


### 2. Pipeline I/O
Let's find the total number of credit and debit transactions for each account.

In [29]:
class CountTransactions(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Map Account and Transaction Type' >> beam.Map(lambda record: ((record['account_id'], record['transaction_type']), 1))
            | 'Count Transactions' >> beam.CombinePerKey(sum)
            | 'Format Results' >> beam.Map(lambda x: {'account_id': x[0][0], 'transaction_type': x[0][1], 'count': x[1]})
        )

In [30]:
# Define the pipeline
with beam.Pipeline(options=options) as p:
    results = (
        p
        | 'Read from CSV' >> beam.io.ReadFromText('banking_transactions.csv', skip_header_lines=1)
        | 'Parse CSV' >> beam.Map(lambda line: dict(zip(['transaction_id', 'account_id', 'transaction_type', 'amount', 'balance_after_transaction', 'timestamp'], line.split(','))))
        | 'Count Transactions per Type' >> CountTransactions()
        | 'Write Results to CSV' >> beam.io.WriteToText('transaction_counts_per_account', file_name_suffix='.csv', header='account_id,transaction_type,count', shard_name_template='')
    )

print("Analysis complete. Results written to 'transaction_counts_per_account.csv'")

Analysis complete. Results written to 'transaction_counts_per_account.csv'


### 3. ParDo
Let's use ParDo to categorize transactions based on the amount:

1.   Small: Amount < $100

2.   Medium: $100 <= Amount < $500

3.   Large: Amount >= $500






In [31]:
class CategorizeTransaction(beam.DoFn):
    def process(self, transaction):
        amount = float(transaction['amount'])
        if amount < 100:
            category = 'Small'
        elif amount < 500:
            category = 'Medium'
        else:
            category = 'Large'

        # Add the category to the transaction and yield the result
        yield {**transaction, 'category': category}

In [32]:
with beam.Pipeline(options=options) as p:
    categorized_transactions = (
        p
        | 'Read from CSV' >> beam.io.ReadFromText('banking_transactions.csv', skip_header_lines=1)
        | 'Parse CSV' >> beam.Map(lambda line: dict(zip(['transaction_id', 'account_id', 'transaction_type', 'amount', 'balance_after_transaction', 'timestamp'], line.split(','))))
        | 'Categorize Transactions' >> beam.ParDo(CategorizeTransaction())
        | 'Write Results to CSV' >> beam.io.WriteToText('categorized_transactions', file_name_suffix='.csv', header='transaction_id,account_id,transaction_type,amount,balance_after_transaction,timestamp,category', shard_name_template='')
    )

print("Analysis complete. Results written to 'categorized_transactions.csv'")

Analysis complete. Results written to 'categorized_transactions.csv'


### 4. Windowing
We'll window our transactions into fixed windows of 1 day based on their timestamp and compute the total amount transacted per day.

In [33]:
class ExtractTimestamp(beam.DoFn):
    def process(self, element):
        timestamp_str = element['timestamp']
        timestamp_obj = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
        unix_timestamp = time.mktime(timestamp_obj.timetuple())
        yield TimestampedValue(element, unix_timestamp)

In [34]:
from apache_beam.transforms.window import FixedWindows, TimestampedValue
from datetime import datetime
import time

with beam.Pipeline(options=options) as p:
    daily_totals = (
        p
        | 'Read from CSV' >> beam.io.ReadFromText('banking_transactions.csv', skip_header_lines=1)
        | 'Parse CSV' >> beam.Map(lambda line: dict(zip(['transaction_id', 'account_id', 'transaction_type', 'amount', 'balance_after_transaction', 'timestamp'], line.split(','))))
        | 'Extract Timestamp' >> beam.ParDo(ExtractTimestamp())
        | 'Window into Days' >> beam.WindowInto(FixedWindows(24*60*60))  # 1 day fixed windows
        | 'Map Amounts' >> beam.Map(lambda record: float(record['amount']))
        | 'Compute Daily Total' >> beam.CombineGlobally(sum).without_defaults()
        | 'Write Results' >> beam.io.WriteToText('daily_totals.txt')
    )

print("Analysis complete. Results written to 'daily_totals.txt'")

Analysis complete. Results written to 'daily_totals.txt'


### 5. Triggers
We'll window our transactions into fixed windows of 1 day based on their timestamp. We'll then set a trigger to fire when it has seen at least 100,000 elements in a window. This is an example of an element count trigger.

In [35]:
from apache_beam.transforms.trigger import AfterCount

class ExtractTimestamp(beam.DoFn):
    def process(self, element):
        timestamp_str = element['timestamp']
        timestamp_obj = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
        unix_timestamp = time.mktime(timestamp_obj.timetuple())
        yield TimestampedValue(element, unix_timestamp)

In [36]:
with beam.Pipeline(options=options) as p:
    transaction_counts = (
        p
        | 'Read from CSV' >> beam.io.ReadFromText('banking_transactions.csv', skip_header_lines=1)
        | 'Parse CSV' >> beam.Map(lambda line: dict(zip(['transaction_id', 'account_id', 'transaction_type', 'amount', 'balance_after_transaction', 'timestamp'], line.split(','))))
        | 'Extract Timestamp' >> beam.ParDo(ExtractTimestamp())
        | 'Window into Weeks' >> beam.WindowInto(FixedWindows(7*24*60*60), trigger=AfterCount(10), accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
        | 'Count Transactions' >> beam.combiners.Count.Globally().without_defaults()
        | 'Write Results' >> beam.io.WriteToText('transaction_counts_with_trigger.txt')
    )

print("Analysis complete. Results written to 'transaction_counts_with_trigger.txt'")



Analysis complete. Results written to 'transaction_counts_with_trigger.txt'
