In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, trim
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from queue import Queue
import threading

# Initialize Spark Session
spark = SparkSession.builder.appName("metadata_driven_etl").getOrCreate()

# Define metadata inline
metadata = [
    {"field": "account_id", "rule": "not_null"},
    {"field": "balance", "rule": "is_positive"},
    {"field": "last_updated", "rule": "no_nan"},
    {"field": "account_name", "rule": "non_empty_string"},
    # Add more field-rule pairs as needed
]

schema = StructType([
    StructField("account_id", StringType(), True),
    StructField("balance", FloatType(), True),
    StructField("last_updated", StringType(), True),
    StructField("account_name", StringType(), True)
])

# Define account data inline
data = [("123", 100.0, "2024-04-28", "Account1"),
        ("", 200.0, "2024-04-29", "Account2"),
        ("125", -50.0, "", "Account3"),
        ("126", 300.0, "2024-04-30", "")]

# Create a queue to simulate streaming data
queue = Queue()

# Function to simulate data streaming
def stream_data(queue, data):
    for record in data:
        queue.put(record)

# Start a new thread to simulate data streaming
threading.Thread(target=stream_data, args=(queue, data)).start()

while not queue.empty():
    # Get the next record from the queue
    record = queue.get()

    # Create a DataFrame with the record
    account_data = spark.createDataFrame([record], schema)

    # Iterate over metadata to apply data quality rules
    for row in metadata:
        field = row["field"]
        rule = row["rule"]
        
        # Apply rule and add a flag column
        if rule == "not_null":
            account_data = account_data.withColumn(f"{field}_flag", col(field).isNotNull())
            account_data = account_data.withColumn(f"{field}_issue", when(col(field).isNull(), "null value"))
        elif rule == "is_positive":
            account_data = account_data.withColumn(f"{field}_flag", col(field) > 0)
            account_data = account_data.withColumn(f"{field}_issue", when(col(field) <= 0, "non-positive value"))
        elif rule == "no_nan":
            account_data = account_data.withColumn(f"{field}_flag", ~isnan(col(field)))
            account_data = account_data.withColumn(f"{field}_issue", when(isnan(col(field)), "NaN value"))
        elif rule == "non_empty_string":
            account_data = account_data.withColumn(f"{field}_flag", trim(col(field)) != "")
            account_data = account_data.withColumn(f"{field}_issue", when(trim(col(field)) == "", "empty string"))
        # Add more rules as needed

    # Display the DataFrame
    display(account_data)


account_id,balance,last_updated,account_name,account_id_flag,account_id_issue,balance_flag,balance_issue,last_updated_flag,last_updated_issue,account_name_flag,account_name_issue
123,100.0,2024-04-28,Account1,True,,True,,True,,True,


account_id,balance,last_updated,account_name,account_id_flag,account_id_issue,balance_flag,balance_issue,last_updated_flag,last_updated_issue,account_name_flag,account_name_issue
,200.0,2024-04-29,Account2,True,,True,,True,,True,


account_id,balance,last_updated,account_name,account_id_flag,account_id_issue,balance_flag,balance_issue,last_updated_flag,last_updated_issue,account_name_flag,account_name_issue
125,-50.0,,Account3,True,,False,non-positive value,True,,True,


account_id,balance,last_updated,account_name,account_id_flag,account_id_issue,balance_flag,balance_issue,last_updated_flag,last_updated_issue,account_name_flag,account_name_issue
126,300.0,2024-04-30,,True,,True,,True,,False,empty string
