Apache Beam is an open-source, unified programming model for defining and executing data processing pipelines. It is particularly useful for organizations dealing with both batch and streaming data, or those looking for flexibility in their data processing infrastructure. It allows developers to write data processing logic once and run it on various execution engines, providing portability and future-proofing for data pipelines.

### Step 1: Install Apache Beam
First, install the Apache Beam Python SDK.

In [1]:
!pip install apache-beam[gcp]


Collecting apache-beam[gcp]
  Downloading apache_beam-2.59.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[gcp])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam[gcp])
  Downloading orjson-3.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m961.7 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam[gcp])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro<2,>=0.2

In [2]:
!pip install kaggle



In [3]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


In [5]:
!kaggle datasets download -d altruistdelhite04/loan-prediction-problem-dataset

Dataset URL: https://www.kaggle.com/datasets/altruistdelhite04/loan-prediction-problem-dataset
License(s): unknown
Downloading loan-prediction-problem-dataset.zip to /content
  0% 0.00/12.6k [00:00<?, ?B/s]
100% 12.6k/12.6k [00:00<00:00, 24.3MB/s]


In [6]:
!unzip loan-prediction-problem-dataset.zip

Archive:  loan-prediction-problem-dataset.zip
  inflating: test_Y3wMUE5_7gLdaTN.csv  
  inflating: train_u6lujuX_CVtuZ9i.csv  


### Step 2: Set Up the Apache Beam Pipeline
We'll start by importing the necessary modules and creating a simple pipeline. Here's how to structure the basic components and include composite transforms, windowing, and ParDo.

In [8]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import time
from apache_beam.transforms import window

# Define PipelineOptions
options = PipelineOptions()

# Composite Transform Example: Split and Extract Fields
class ExtractFields(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Split CSV' >> beam.ParDo(SplitCSVRow())
            | 'Extract Loan Fields' >> beam.Map(lambda fields: {
                'Loan_ID': fields[0],
                'Gender': fields[1],
                'Married': fields[2],
                'ApplicantIncome': fields[5],
                'LoanAmount': fields[8],
                'Loan_Status': fields[12]
            })
        )

# ParDo Example: Custom DoFn to Split CSV Rows
class SplitCSVRow(beam.DoFn):
    def process(self, element):
        # Split each line into CSV columns
        reader = csv.reader([element])
        for row in reader:
            yield row

# ParDo Example: Custom DoFn to Print Elements
class PrintElement(beam.DoFn):
    def process(self, element):
        print(element)
        yield element

# Function to run the pipeline
def run_pipeline():
    with beam.Pipeline(options=options) as p:
        # Read the Kaggle CSV file
        csv_data = p | 'Read CSV' >> beam.io.ReadFromText('train_u6lujuX_CVtuZ9i.csv', skip_header_lines=1)

        # Composite Transform: Extract fields from CSV
        extracted_data = csv_data | 'Extract Loan Data' >> ExtractFields()

        # ParDo: Print the data
        extracted_data | 'Print Extracted Data' >> beam.ParDo(PrintElement())

        # Windowing Example: Apply fixed windowing of 10 seconds
        windowed_data = (
            extracted_data
            | 'Apply Fixed Windowing' >> beam.WindowInto(window.FixedWindows(10))
        )

        # Trigger Example: After processing time of 5 seconds
        triggered_data = windowed_data | 'Apply Trigger' >> beam.WindowInto(
            window.FixedWindows(10),
            trigger=beam.trigger.AfterProcessingTime(5),
            accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
        )

        # Write results to a file
        triggered_data | 'Write to Output' >> beam.io.WriteToText('loan_output.txt')

run_pipeline()




{'Loan_ID': 'LP001002', 'Gender': 'Male', 'Married': 'No', 'ApplicantIncome': 'No', 'LoanAmount': '', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001003', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'No', 'LoanAmount': '128', 'Loan_Status': 'N'}
{'Loan_ID': 'LP001005', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'Yes', 'LoanAmount': '66', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001006', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'No', 'LoanAmount': '120', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001008', 'Gender': 'Male', 'Married': 'No', 'ApplicantIncome': 'No', 'LoanAmount': '141', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001011', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'Yes', 'LoanAmount': '267', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001013', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'No', 'LoanAmount': '95', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001014', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'No', 'LoanAmount': '158', 'Loan_Status': 'N

Explanation of the Features
Composite Transform:

1. The ExtractFields class is a composite transform that applies multiple transforms, including SplitCSVRow (which splits the CSV row into fields) and a Map function (which extracts specific columns such as Loan_ID, ApplicantIncome, etc.).
Pipeline I/O:

2. The beam.io.ReadFromText() function reads the Kaggle CSV file (train.csv).
The beam.io.WriteToText() function writes the output to a text file (loan_output.txt).
ParDo:

3. The SplitCSVRow class demonstrates a ParDo operation that processes each element (CSV row) by splitting it into individual fields.
Another ParDo, PrintElement, simply prints the extracted fields for demonstration.
Windowing:

4. The beam.WindowInto(window.FixedWindows(10)) applies fixed-time windowing, which groups elements that arrive within 10-second intervals.
Triggers:

5. A trigger is applied using beam.WindowInto() with a trigger condition AfterProcessingTime(5) that emits the results 5 seconds after processing begins.

### Step 3: Check the Output
You can check the output file loan_output.txt

In [9]:
!cat loan_output.txt-00000-of-00001


{'Loan_ID': 'LP001002', 'Gender': 'Male', 'Married': 'No', 'ApplicantIncome': 'No', 'LoanAmount': '', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001003', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'No', 'LoanAmount': '128', 'Loan_Status': 'N'}
{'Loan_ID': 'LP001005', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'Yes', 'LoanAmount': '66', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001006', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'No', 'LoanAmount': '120', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001008', 'Gender': 'Male', 'Married': 'No', 'ApplicantIncome': 'No', 'LoanAmount': '141', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001011', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'Yes', 'LoanAmount': '267', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001013', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'No', 'LoanAmount': '95', 'Loan_Status': 'Y'}
{'Loan_ID': 'LP001014', 'Gender': 'Male', 'Married': 'Yes', 'ApplicantIncome': 'No', 'LoanAmount': '158', 'Loan_Status': 'N

There are several additional Apache Beam features that you can explore with your Kaggle dataset to enhance the complexity and flexibility of your data pipeline. Here are some key advanced features

### GroupByKey and CoGroupByKey
These transformations are useful for grouping or joining data. For example, if you have multiple datasets (like loan data and applicant demographic data), you can use GroupByKey to group data by a specific key (e.g., Loan_ID), or CoGroupByKey to join two datasets based on a common key.

Group Loan Data by Status :
You can group loans by their Loan_Status to analyze them by categories like "Approved" or "Rejected".

In [11]:
def run_group_by_pipeline():
    with beam.Pipeline(options=options) as p:
        # Read CSV file
        csv_data = p | 'Read CSV' >> beam.io.ReadFromText('train_u6lujuX_CVtuZ9i.csv', skip_header_lines=1)

        # Extract Loan_ID and Loan_Status
        loan_data = (
            csv_data
            | 'Extract Loan Data' >> beam.ParDo(SplitCSVRow())
            | 'Key by Loan Status' >> beam.Map(lambda row: (row[12], row))  # Using Loan_Status as key
        )

        # Group by Loan_Status (Approved/Rejected)
        grouped_data = loan_data | 'Group by Loan Status' >> beam.GroupByKey()

        # Print grouped data
        grouped_data | 'Print Grouped Data' >> beam.Map(print)

run_group_by_pipeline()




('Y', [['LP001002', 'Male', 'No', '0', 'Graduate', 'No', '5849', '0', '', '360', '1', 'Urban', 'Y'], ['LP001005', 'Male', 'Yes', '0', 'Graduate', 'Yes', '3000', '0', '66', '360', '1', 'Urban', 'Y'], ['LP001006', 'Male', 'Yes', '0', 'Not Graduate', 'No', '2583', '2358', '120', '360', '1', 'Urban', 'Y'], ['LP001008', 'Male', 'No', '0', 'Graduate', 'No', '6000', '0', '141', '360', '1', 'Urban', 'Y'], ['LP001011', 'Male', 'Yes', '2', 'Graduate', 'Yes', '5417', '4196', '267', '360', '1', 'Urban', 'Y'], ['LP001013', 'Male', 'Yes', '0', 'Not Graduate', 'No', '2333', '1516', '95', '360', '1', 'Urban', 'Y'], ['LP001018', 'Male', 'Yes', '2', 'Graduate', 'No', '4006', '1526', '168', '360', '1', 'Urban', 'Y'], ['LP001024', 'Male', 'Yes', '2', 'Graduate', 'No', '3200', '700', '70', '360', '1', 'Urban', 'Y'], ['LP001027', 'Male', 'Yes', '2', 'Graduate', '', '2500', '1840', '109', '360', '1', 'Urban', 'Y'], ['LP001028', 'Male', 'Yes', '2', 'Graduate', 'No', '3073', '8106', '200', '360', '1', 'Urban',

### Side Inputs
Side inputs allow you to pass additional data to a ParDo transformation. This is useful if you need to perform operations using external or constant data, such as applying reference data (e.g., loan interest rates or eligibility rules) to each record in your main dataset.



In [15]:
class FilterByIncome(beam.DoFn):
    def process(self, element, income_threshold):
        applicant_income = element['ApplicantIncome']
        try:
            applicant_income = int(applicant_income)
        except ValueError:
            applicant_income = 0  # or any other default value you want to use

        if applicant_income >= income_threshold:
            yield element

def run_side_input_pipeline():
    with beam.Pipeline(options=options) as p:
        # Read the CSV file
        csv_data = p | 'Read CSV' >> beam.io.ReadFromText('train_u6lujuX_CVtuZ9i.csv', skip_header_lines=1)

        # Extract loan data
        loan_data = csv_data | 'Extract Loan Data' >> ExtractFields()

        # Define an income threshold
        income_threshold = p | 'Create income threshold' >> beam.Create([5000])

        # Apply filter using Side Input
        filtered_data = loan_data | 'Filter by income' >> beam.ParDo(FilterByIncome(), beam.pvalue.AsSingleton(income_threshold))

        # Write filtered results to a file
        filtered_data | 'Write filtered data' >> beam.io.WriteToText('filtered_loan_output.txt')

run_side_input_pipeline()




### Combining Data with CombinePerKey
You can use CombinePerKey to perform aggregations, such as summing, averaging, or counting data elements based on a specific key.

In [16]:
def run_combine_pipeline():
    with beam.Pipeline(options=options) as p:
        # Read the CSV file
        csv_data = p | 'Read CSV' >> beam.io.ReadFromText('train_u6lujuX_CVtuZ9i.csv', skip_header_lines=1)

        # Extract (Loan_Status, LoanAmount)
        loan_amounts = (
            csv_data
            | 'Extract Loan Amounts' >> beam.ParDo(SplitCSVRow())
            | 'Key by Loan Status' >> beam.Map(lambda row: (row[12], float(row[8] or 0)))  # (Loan_Status, LoanAmount)
        )

        # Sum LoanAmounts by Loan_Status
        total_loan_by_status = loan_amounts | 'Sum by Loan Status' >> beam.CombinePerKey(sum)

        # Write the result to an output file
        total_loan_by_status | 'Write Total Loan Amount' >> beam.io.WriteToText('total_loan_by_status.txt')

run_combine_pipeline()




### Sessions Windowing
You can use session windowing when you're dealing with streaming data and want to group events that occur close to each other in time. This is ideal for datasets where user activity is bursty and happens in distinct sessions.

In [17]:
def run_sessions_windowing_pipeline():
    with beam.Pipeline(options=options) as p:
        # Simulate a stream of loan application events (timestamps added)
        loan_applications = p | 'Create Loan Applications' >> beam.Create([
            {'Loan_ID': 'LP001', 'Timestamp': 1},
            {'Loan_ID': 'LP002', 'Timestamp': 3},
            {'Loan_ID': 'LP001', 'Timestamp': 5},
            {'Loan_ID': 'LP003', 'Timestamp': 15}
        ])

        # Apply session windowing
        sessioned_data = (
            loan_applications
            | 'Session window' >> beam.WindowInto(window.Sessions(gap_size=10))
        )

        # Write the sessioned output to a file
        sessioned_data | 'Write Sessioned Output' >> beam.io.WriteToText('sessioned_output.txt')

run_sessions_windowing_pipeline()




### Side Outputs (Multi-output ParDo)
You can use multi-output ParDo to produce multiple output streams from a single input stream, allowing you to split your data based on certain criteria.

In [20]:
class SplitLoanStatus(beam.DoFn):
    def process(self, element):
        if element['Loan_Status'] == 'Y':
            yield beam.pvalue.TaggedOutput('approved', element)
        else:
            yield beam.pvalue.TaggedOutput('rejected', element)

def run_split_output_pipeline():
    with beam.Pipeline(options=options) as p:
        loan_data = p | 'Read CSV' >> beam.io.ReadFromText('train_u6lujuX_CVtuZ9i.csv', skip_header_lines=1)

        extracted_data = loan_data | 'Extract Loan Data' >> ExtractFields()

        # Apply the multi-output ParDo
        split_result = extracted_data | 'Split by Loan Status' >> beam.ParDo(SplitLoanStatus()).with_outputs('approved', 'rejected')

        # Write approved loans
        split_result.approved | 'Write Approved Loans' >> beam.io.WriteToText('approved_loans.txt')

        # Write rejected loans
        split_result.rejected | 'Write Rejected Loans' >> beam.io.WriteToText('rejected_loans.txt')

run_split_output_pipeline()


