In [1]:
import apache_beam as beam
import csv
from graphviz import Digraph

# Define Apache Beam pipeline components

class ParseCSVLine(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Parse CSV Lines' >> beam.Map(self.parse_csv)
        )
    
    @staticmethod
    def parse_csv(line):
        headers = ["name", "age", "city"]
        reader = csv.reader([line])
        row = next(reader)
        return dict(zip(headers, row))


class LowercaseName(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Lowercase Name Field' >> beam.Map(self.lowercase_name)
        )

    @staticmethod
    def lowercase_name(row):
        row['name'] = row['name'].lower()
        return row


class UppercaseCity(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Uppercase City Field' >> beam.Map(self.uppercase_city)
        )

    @staticmethod
    def uppercase_city(row):
        row['city'] = row['city'].upper()
        return row


def format_as_csv_line(row):
    """
    Format each row as a CSV line.
    """
    return f"{row['name']},{row['age']},{row['city']}"

# Define the Apache Beam pipeline
output_file = "output_data.csv"  # Output CSV file name

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Read CSV File' >> beam.io.ReadFromText('sample_data.csv', skip_header_lines=1)
        | 'Parse CSV' >> ParseCSVLine()                      # Step 1: Parse the CSV
        | 'Transform Name to Lowercase' >> LowercaseName()   # Step 2: Lowercase the 'name' field
        | 'Transform City to Uppercase' >> UppercaseCity()   # Step 3: Uppercase the 'city' field
        | 'Format as CSV Line' >> beam.Map(format_as_csv_line)  # Format each row as a CSV line
        | 'Write to CSV' >> beam.io.WriteToText(output_file, file_name_suffix='.csv')  # Step 4: Write to CSV
    )

# Graphviz pipeline visualization
dot = Digraph(comment="Apache Beam CSV Processing Pipeline")

# Define each node for the pipeline steps
dot.node("A", "Read CSV File")
dot.node("B", "Parse CSV Lines")
dot.node("C", "Lowercase Name Field")
dot.node("D", "Uppercase City Field")
dot.node("E", "Format as CSV Line")
dot.node("F", "Write to CSV")

# Define edges to show the flow
dot.edge("A", "B", label="Input lines")
dot.edge("B", "C", label="Parsed rows")
dot.edge("C", "D", label="Name in lowercase")
dot.edge("D", "E", label="City in uppercase")
dot.edge("E", "F", label="Formatted CSV lines")

# Render and save the graph to a file
file_path = "beam_pipeline_graph"
dot.render(file_path, format="png")
print(f"Graph saved to {file_path}")


Graph saved to beam_pipeline_graph
