<h2>Building a custom pipeline</h2>

Imagining a user-friendly UI for users to build custom ETL pipelines using drag-and-drop method.<br> 

In Graph representation, the pipeline can be represented as a graph, with nodes as vertices and connections as edges. <br> 

Nodes can be categorized as Source/Target, Transform, Join/Split, Custom and Subgraph. Each node will have unique id and configuration details. The user defines the nodes and connects them to define the workflow. This is the example pipeline assumed for this task:<br>
<b>Oracle CDC >> Data ingestion, transformation, denormalization >> Delta Lake </b><br>


The pipeline can be represented as a JSON object containing an array of nodes, where each node represents a step in the ETL process.<br> 

To compile and execute:<br> 
The JSON representation is interpreted and the necessary Spark code is compiled to execute the ETL pipeline. The function of each node will be mapped from sql to Spark.<br> 

Oracle CDC with JDBC connection can be used to ingest the data to staging layer or transform further to delta lake.<br> 
<b>Important config details -</b><br>
Using source database authentication <br>
Specifying source and target table, their primary keys and incremental column<br>
Number of partitions/mappers, estimated record size<br>
If needed for hive connection, hive authentication to query from presto<br>

<b>Oracle CDC - Change data capture from last succcessful job time</b><br>
From Oracle database, whatever insert,update,delete has changed from last successful job time, only the changes will be processed. This incremental processing of data requires us to select a specific column to capture changes like the insert_timestamp. The challenge can be a lag of 1 run when performing incremental joins.(can be more prone when using complex joins, window functions, etc.)Example:<br>
1st run<br>
Main. Reference<br>
Address Zipcode  Join - addr_zip<br>
1 60601 60601    60601 - 60601<br>
2 60602 No Match 60602 - null<br>
2nd Run<br>
Address Zipcode Join - addr_zip<br>
3 60603 60602   60601 - 60601<br>
4 60604 60603   60602 - 60602<br>


<b>Transformation logic assumed: </b><br>
Task 1 - Explode table customer from array datatype to columns<br>
Task 2 - Left join customer and orders table<br>
Task 3 - Filter only columns where delete_ind = 0<br>
Task 4 - Select only distinct records<br>
Task 5 - Get counts of distinct orders of each customer<br>
Task 6 - Aggregate by customer id and get max date for latest order<br>
Task 7 - Window partition by customer id and get max bill sequence id<br>
Task 8 - When order discount == "BlackFriday", True, False<br>
Task 9 - Write True as 1 and False as 0<br>
Task 10 - Take 10% off the order price <br>
Task 11 - Create UDF to take customer id, order id, partition id and concatenate into UUID using |<br>
Task 12 - Check if order price is null<br>
Task 13 - Coalesce the order price<br>
Task 14 - Create column order count to count number of orders for each unique customer<br>
Task 15 - Broadcast a small table order category<br>
Task 16 - Trim the order name column strings<br>
Task 17 - Select purchase year and check if it is above 2018<br>
Task 18 - Sort by the customer id<br>
Task 19 - Rename column seq_id to sequence_id<br>
Task 20 - Format and denormalise to write data to delta lake<br>

<h4>Giving the Pipeline Representation as JSON input:</h4><br>
Each transform node contains an operation, which is later performed as spark job

In [None]:
{
  "pipeline": [
    {
      "id": "customerSource",
      "type": "jdbc",
      "config": {
        "connection": {
          "url": "jdbc:oracle:thin:@//localhost:1521/ORCL",
          "username": "your_username",
          "password": "your_password"
        },
        "table": "customer",
        "cdc": {
          "enabled": true,
          "incrementalColumn": "last_modified_ts",
          "lastSuccessfulJobTime": "2023-11-25T00:00:00Z",
          "primaryKey": ["customer_id"]
        },
        "partitions": 5,
        "mappers": 3
      },
      "hive": {
        "connection": {
          "url": "jdbc:hive2://hive-server:10000/default",
          "username": "hive_username",
          "password": "hive_password"
        }
      }
    },
    {
      "id": "ordersSource",
      "type": "jdbc",
      "config": {
        "connection": {
          "url": "jdbc:oracle:thin:@//localhost:1521/ORCL",
          "username": "your_username",
          "password": "your_password"
        },
        "table": "orders",
        "cdc": {
          "enabled": true,
          "incrementalColumn": "last_modified_ts",
          "lastSuccessfulJobTime": "2023-11-25T00:00:00Z",
          "primaryKey": ["order_id"],
          "pollingInterval": "5000"
        },
        "partitions": 5,
        "mappers": 3
      },
      "hive": {
        "connection": {
          "url": "jdbc:hive2://hive-server:10000/default",
          "username": "hive_username",
          "password": "hive_password"
        }
      }
    },
    {
      "id": "transformNode",
      "type": "transform",
      "config": {
        "transformType": "Spark",
        "script": "transform_code.py",
        "tasks": [
          {
            "id": "task1",
            "operation": "explode",
            "table": "customer",
            "arrayColumn": "customer_details"
          },
          {
            "id": "task2",
            "operation": "left_join",
            "leftTable": "customer",
            "rightTable": "orders",
            "on": "customer.customer_id = orders.customer_id",
            "joinType": "left"
          },
          {
            "id": "task3",
            "operation": "filter",
            "condition": "delete_ind == 0"
          },
          {
            "id": "task4",
            "operation": "distinct"
          },
          {
            "id": "task5",
            "operation": "count",
            "groupBy": ["customer_id"],
            "alias": "distinct_orders_count"
          },
          {
            "id": "task6",
            "operation": "aggregate",
            "groupBy": ["customer_id"],
            "aggregations": {
              "latest_order_date": "max(order_date)"
            }
          },
          {
            "id": "task7",
            "operation": "window_partition",
            "partitionBy": ["customer_id"],
            "orderBy": ["bill_sequence_id"],
            "alias": "max_bill_sequence_id"
          },
          {
            "id": "task8",
            "operation": "map",
            "column": "order_discount",
            "mapping": {
              "BlackFriday": true,
              "Other": false
            }
          },
          {
            "id": "task9",
            "operation": "map",
            "column": "BlackFriday",
            "mapping": {
              "true": 1,
              "false": 0
            }
          },
          {
            "id": "task10",
            "operation": "percent_off",
            "column": "order_price",
            "percentage": 10
          },
          {
            "id": "task11",
            "operation": "udf",
            "name": "concatenate_uuid",
            "columns": ["customer_id", "order_id", "partition_id"],
            "alias": "uuid"
          },
          {
            "id": "task12",
            "operation": "check_null",
            "column": "order_price"
          },
          {
            "id": "task13",
            "operation": "coalesce",
            "columns": ["order_price"]
          },
          {
            "id": "task14",
            "operation": "count",
            "groupBy": ["customer_id"],
            "alias": "order_count"
          },
          {
            "id": "task15",
            "operation": "broadcast",
            "table": "order_category"
          },
          {
            "id": "task16",
            "operation": "trim",
            "column": "order_name"
          },
          {
            "id": "task17",
            "operation": "filter",
            "condition": "purchase_year > 2018"
          },
          {
            "id": "task18",
            "operation": "sort",
            "columns": ["customer_id"]
          },
          {
            "id": "task19",
            "operation": "rename",
            "columns": {"seq_id": "sequence_id"}
          },
          {
            "id": "task20",
            "operation": "format_and_denormalize"
          }
        ]
        }
    },
    {
      "id": "destinationNode",
      "type": "destination",
      "config": {
        "destinationType": "deltaLake",
        "path": "/path/to/delta/lake",
        "cdc": {
          "enabled": true,
          "column": "last_modified",
          "lastSuccessfulJobTime": "2023-11-25T00:00:00Z"
        }
      }
    }
  ]
}

<h4>Creating spark job from the JSON:</h4>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Function to perform explode operation - task 1
def explode_task(df, column_name):
    return df.selectExpr("*", f"explode({column_name}) as exploded_column")

# Function to perform left join operation - task 2
def left_join_task(df_left, df_right, join_condition, join_type):
    return df_left.join(df_right, join_condition, join_type)

# Function to perform filter operation - task 3
def filter_task(df, condition):
    return df.filter(condition)

# Function to perform distinct operation - task 4
def distinct_task(df):
    return df.distinct()

# Function to perform count operation - task 5
def count_task(df, group_by_columns, alias):
    return df.groupBy(group_by_columns).agg(f.count().alias(alias))

# Function to perform aggregate operation - task 6
def aggregate_task(df, group_by_columns, aggregations):
    return df.groupBy(group_by_columns).agg(aggregations)

# Function to perform window partition operation - task 7
def window_partition_task(df, partition_by_columns, order_by_columns, alias):
    w = Window.partitionBy(partition_by_columns).orderBy(order_by_columns)
    return df.withColumn(alias, f.first(df[order_by_columns]).over(w))

# Function to perform map operation - task 8 & 9
def map_task(df, column, mapping):
    mapping_expr = f.create_map([f.lit(x) for x in sum(mapping.items(), ())])
    return df.withColumn(column, mapping_expr[df[column]])

# Function to perform percent off operation - task 10
def percent_off_task(df, column, percentage):
    return df.withColumn(column, df[column] * 0.9)

# Function to perform UDF operation - task 11
def udf_task(df, name, columns, alias):
    concatenate_uuid_udf = f.udf(lambda *args: '|'.join(str(arg) for arg in args))
    return df.withColumn(alias, concatenate_uuid_udf(*columns))

# Function to perform check null operation - task 12
def check_null_task(df, column):
    return df.withColumn(column, f.when(f.col(column).isNull(), True).otherwise(False))

# Function to perform coalesce operation - task 13
def coalesce_task(df, columns):
    return df.withColumn(columns[0], f.coalesce(*columns))

# Function to perform count operation - task 14
def count_task(df, group_by_columns, alias):
    return df.groupBy(group_by_columns).agg(F.count().alias(alias))

# Function to perform broadcast operation - task 15
def broadcast_task(df, broadcast_table):
    return df.join(f.broadcast(broadcast_table), "order_id", "left")

# Function to perform trim operation - task 16
def trim_task(df, column):
    return df.withColumn(column, f.trim(df[column]))

# Function to perform filter operation - task 17
def filter_year_task(df, condition):
    return df.filter(condition)

# Function to perform sort operation - task 18
def sort_task(df, columns):
    return df.sort(*columns)

# Function to perform rename operation - task 19
def rename_task(df, columns):
    for old_col, new_col in columns.items():
        df = df.withColumnRenamed(old_col, new_col)
    return df

# Function to perform format and denormalize operation - task 20
def format_denormalize_task(df):
    selected_columns = ["customer_id", "latest_order_date", "max_bill_sequence_id", "BlackFriday", "order_price", "cust_uuid",
    "order_count", "order_category", "order_name", "purchase_year", "sequence_id"] 
    return df.select(*selected_columns)

# Function to write to Delta Lake with incremental CDC
def write_to_delta_lake(df, path, last_successful_time, cdc_column):
    df.write.format("delta").mode("append").save(path)

# Main function to execute the Spark job
def execute_spark_job(pipeline_config):
    spark = SparkSession.builder.appName("CustomETL").getOrCreate()

    for node in pipeline_config['pipeline']:
        node_id = node['id']
        node_type = node['type']
        node_config = node['config']

        if node_type == 'jdbc':
            # Read data from JDBC source
            df = spark.read.format('jdbc').option('url', node_config['connection']['url']) \
                .option('dbtable', node_config['table']).option('user', node_config['connection']['username']) \
                .option('password', node_config['connection']['password']).load()

            # Apply CDC logic
            if node_config['cdc']['enabled']:
                df = df.filter(f.col(node_config['cdc']['incrementalColumn']) > node_config['cdc']['lastSuccessfulJobTime'])

        elif node_type == 'transform':
            # Read the script and tasks from the config
            script_path = node_config['script']
            tasks = node_config['tasks']

            # Execute each transformation task
            for task in tasks:
                task_id = task['id']
                operation = task['operation']

                if operation == 'explode':
                    df = explode_task(df, task['arrayColumn'])

                elif operation == 'left_join':
                    df_right = spark.read.format('jdbc').option('url', node_config['connection']['url']) \
                        .option('dbtable', task['rightTable']).option('user', node_config['connection']['username']) \
                        .option('password', node_config['connection']['password']).load()
                    df = left_join_task(df, df_right, task['on'], task['joinType'])

                elif operation == 'filter':
                    df = filter_task(df, task['condition'])

                elif operation == 'distinct':
                    df = distinct_task(df)

                elif operation == 'count':
                    df = count_task(df, task['groupBy'], task['alias'])

                elif operation == 'aggregate':
                    aggregations = {col: task['aggregations'][col] for col in task['aggregations']}
                    df = aggregate_task(df, task['groupBy'], aggregations)

                elif operation == 'window_partition':
                    df = window_partition_task(df, task['partitionBy'], task['orderBy'], task['alias'])

                elif operation == 'map':
                    df = map_task(df, task['column'], task['mapping'])

                elif operation == 'percent_off':
                    df = percent_off_task(df, task['column'], task['percentage'])

                elif operation == 'udf':
                    df = udf_task(df, task['name'], task['columns'], task['alias'])

                elif operation == 'check_null':
                    df = check_null_task(df, task['column'])

                elif operation == 'coalesce':
                    df = coalesce_task(df, task['columns'])

                elif operation == 'count':
                    df = count_task(df, task['groupBy'], task['alias'])

                elif operation == 'broadcast':
                    broadcast_table = spark.read.format('jdbc').option('url', node_config['connection']['url']) \
                        .option('dbtable', task['table']).option
                    
                elif operation == 'trim':
                df = trim_task(df, task['column'])

                elif operation == 'filter_year':
                    df = filter_year_task(df, task['condition'])

                elif operation == 'sort':
                    df = sort_task(df, task['columns'])

                elif operation == 'rename':
                    df = rename_task(df, task['columns'])

                elif operation == 'format_and_denormalize':
                    df = format_denormalize_task(df)

# Write to Delta Lake with incremental CDC
destination_node = node for node in pipeline_config['pipeline'] if node['id'] == 'destinationNode', None
if destination_node == 'destination' and destination_node['config']['destinationType'] == 'deltaLake':
        delta_config = destination_node['config']['cdc']
        write_to_delta_lake(df, delta_config)

<h4>This will be the spark job without reading from JSON, if written manually:</h4>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("ETLJob").getOrCreate()

# Read customer and orders tables from Parquet source using JDBC connection
customer_df = spark.read.format("jdbc").load("jdbc:oracle:thin:@//localhost:1521/ORCL") \
    .option("dbtable", "(SELECT * FROM customer WHERE latest_modified_ts > '2023-11-25T00:00:00Z') as customer_cdc") \
    .option("user", "your_username").option("password", "your_password").load()

orders_df = spark.read.format("jdbc").load("jdbc:oracle:thin:@//localhost:1521/ORCL") \
    .option("dbtable", "(SELECT * FROM orders WHERE latest_modified_ts > '2023-11-25T00:00:00Z') as orders_cdc") \
    .option("user", "your_username").option("password", "your_password").load()

# Task 1 - Explode table customer from array datatype to columns
customer_df = customer_df.withColumn("exploded_column", f.explode(f.col("customer_details")))

# Task 2 - Left join customer and orders table
joined_df = customer_df.join(orders_df, "customer_id", "left")

# Task 3 - Filter only columns where delete_ind = 0
filtered_df = joined_df.filter(f.col("delete_ind") == 0)

# Task 4 - Select only distinct records
distinct_df = filtered_df.distinct()

# Task 5 - Get counts of distinct orders of each customer
distinct_orders_count_df = distinct_df.groupBy("customer_id").agg(f.count("order_id").alias("distinct_orders_count"))

# Task 6 - Aggregate by customer id and get max date for the latest order
aggregated_df = joined_df.groupBy("customer_id").agg(f.max("order_date").alias("latest_order_date"))

# Task 7 - Window partition by customer id and get max bill sequence id
w = Window.partitionBy("customer_id").orderBy(f.col("bill_sequence_id").desc())
max_bill_sequence_id_df = filtered_df.withColumn("max_bill_sequence_id", f.max("bill_sequence_id").over(w))

# Task 8 - When order discount == "BlackFriday", True, False
df_with_true_false = distinct_df.withColumn("BlackFriday", f.when(f.col("order_discount") == "BlackFriday", True).otherwise(False))

# Task 9 - Write True as 1 and False as 0
mapped_df = df_with_true_false.withColumn("BlackFriday", f.lit(1).when(f.col("BlackFriday") == True, 0))

# Task 10 - Take 10% off the order price
discounted_df = mapped_df.withColumn("order_price", f.col("order_price") * 0.9)

# Task 11 - Create UDF to take customer id, order id, partition id, and concatenate into UUID using |
#UDF definition
def concatenate_uuid(c_id, o_id, p_id):
    return f"{c_id}|{o_id}|{p_id}"
concatenate_uuid_udf = f.udf(concatenate_uuid, StringType())

df_with_uuid = discounted_df.withColumn("cust_uuid", concatenate_uuid_udf(f.col("customer_id"), f.col("order_id"), f.col("partition_id")))

# Task 12 - Check if order price is null
df_checked_null = df_with_uuid.withColumn("order_price", f.when(f.col("order_price").isNull(), 0).otherwise(f.col("order_price")))

# Task 13 - Coalesce the order price
df_coalesced = df_checked_null.withColumn("order_price", f.coalesce("order_price", 0))

# Task 14 - Create column order count to count the number of orders for each unique customer
w1 = Window.partitionBy("customer_id")
df_with_order_count = df_coalesced.withColumn("order_count", f.count("order_id").over(w1))

# Task 15 - Broadcast a small table order category
order_category = spark.read.format("parquet").load("jdbc:oracle:thin:@//localhost:1521/ORCL")
df_with_broadcasted_category = df_with_order_count.join(f.broadcast(order_category), "order_id", "left")

# Task 16 - Trim the order name column strings
df_trimmed = df_with_broadcasted_category.withColumn("order_name", f.trim(f.col("order_name")))

# Task 17 - Select purchase year and check if it is above 2018
df_purchase_year_filtered = df_trimmed.withColumn("purchase_year", f.col("purchase_date").substr(1, 4).cast("int")) \
    .filter(f.col("purchase_year") > 2018)

# Task 18 - Sort by the customer id
sorted_df = df_purchase_year_filtered.orderBy("customer_id")

# Task 19 - Rename column seq_id to sequence_id
df_renamed_sequence_id = sorted_df.withColumnRenamed("seq_id", "sequence_id")

# Task 20 - Format and denormalize to write data to delta lake
df_denormalized = df_renamed_sequence_id.select(
    "customer_id", "latest_order_date", "max_bill_sequence_id", "BlackFriday", "order_price", "cust_uuid",
    "order_count", "order_category", "order_name", "purchase_year", "sequence_id"
)

df_denormalized.write.format("delta").mode("append").option("mergeSchema", "true") \
    .option("predicate", f"{cdc_column} > '{last_successful_job_time}'").save("/path/to/delta/lake")


# Stop Spark session
spark.stop()


<h4>JSON example response generated from API:</h4>

In [None]:
{
  "status": "success",
  "message": "Data retrieval successful",
  "data": {
    "customer": [
      {
        "customer_id": 1,
        "name": "Gughapriyaa Elango",
        "email": "gug@myemail.com",
        "latest_modified_ts": "2023-11-26T08:00:00Z"
      },
      {
        "customer_id": 2,
        "name": "Priya Elango",
        "email": "priya@someemail.com",
        "latest_modified_ts": "2023-11-26T09:30:00Z"
      }, ...