In [1]:
## data_fetch.py
import pandas as pd
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def load_mappings(mapping_file):
    """
    Load the column mappings from the mapping document.
    """
    # Load the first sheet of the mapping document
    mapping_df = pd.read_excel(mapping_file)

    # Create a dictionary of source-to-target mappings
    column_mappings = dict(zip(mapping_df["Source"], mapping_df["Target"]))
    return column_mappings


from pyspark.sql import functions as F

def apply_mappings(column_mappings):
    """
    Parse source columns and target columns from mappings, handling multiple source columns.
    """
    mapped_columns = []
    for source_col, target_col in column_mappings.items():
        source_columns = [col.strip() for col in source_col.split(",")]  # Split multi-source columns
        mapped_columns.append((source_columns, target_col))
    print("Mapped Columns for Validation:", mapped_columns)
    return mapped_columns



def load_data(spark, source_path: str, target_path: str) -> (DataFrame, DataFrame):
    """
    Load source and target datasets.
    """
    def load_file(path: str) -> DataFrame:
        if path.endswith(".csv"):
            return spark.read.option("header", "true").csv(path)
        elif path.endswith(".xlsx"):
            pdf = pd.read_excel(path)
            return spark.createDataFrame(pdf)
        else:
            raise ValueError(f"Unsupported file format: {path}")

    # Load source and target data
    source_df = load_file(source_path)
    target_df = load_file(target_path)

    print(f"Source data loaded with {source_df.count()} records and {len(source_df.columns)} columns.")
    print(f"Target data loaded with {target_df.count()} records and {len(target_df.columns)} columns.")

    return source_df, target_df

In [2]:
def get_primary_key(mapping_file):
    """
    Extract the primary key column from the mapping file.
    Automatically reads the first sheet and handles extra spaces in column names.
    """
    # Load the first sheet of the mapping file
    mapping_df = pd.read_excel(mapping_file)

    # Strip spaces and clean column names
    mapping_df.columns = mapping_df.columns.str.strip()
    print(mapping_df.columns)

    # Check if 'Primary Key' and 'Column Name' exist in the file
    if "Primary Key" not in mapping_df.columns or "Source" not in mapping_df.columns:
        raise ValueError("Required columns 'Primary Key' or 'Source' not found in the mapping file.")

    # Find the primary key column
    primary_key_row = mapping_df[mapping_df["Primary Key"] == "Y"]

    if not primary_key_row.empty:
        primary_key = primary_key_row.iloc[0]["Source"]
        print(f"Detected Primary Key: {primary_key}")
        return primary_key
    else:
        raise ValueError("No primary key detected in the mapping file.")

In [9]:
def perform_dynamic_validation(source_df, target_df, mapped_columns, primary_key):
    validation_results = []
    metrics=[]

    source_columns=source_df.columns
    target_columns=target_df.columns

    # Record Count Validation
    source_count = source_df.count()
    target_count = target_df.count()
    validation_results.append({
        "ValidationType": "Record Count",
        "Source": source_count,
        "Target": target_count
    })

    # Data Type and Null Validation for Mapped Columns
    for source_cols, target_col in mapped_columns:
        # Handle multiple source columns mapped to a single target column
        source_dtype = [
            [f.dataType for f in source_df.schema.fields if f.name == col][0]
            for col in source_cols if col in source_df.columns
        ]
        target_dtype = [f.dataType for f in target_df.schema.fields if f.name == target_col][0]
        validation_results.append({
            "ValidationType": f"Data Type ({', '.join(source_cols)} -> {target_col})",
            "Source": ", ".join(map(str, source_dtype)),
            "Target": str(target_dtype)
        })

        # Null Validation for each source column
        for source_col in source_cols:
            if source_col in source_df.columns:
                source_nulls = source_df.filter(F.col(source_col).isNull()).count()
                target_nulls = target_df.filter(F.col(target_col).isNull()).count()
                validation_results.append({
                    "ValidationType": f"Null Values ({source_col} -> {target_col})",
                    "Source": source_nulls,
                    "Target": target_nulls
                })

    # Primary Key Validation
    source_primary_key_count = source_df.select(primary_key).distinct().count()
    target_primary_key_count = target_df.select(primary_key).distinct().count()

    source_col_count = len(source_columns)
    target_col_count = len(target_columns)

    validation_results.append({
        "ValidationType": "Primary Key Uniqueness",
        "Source": source_primary_key_count,
        "Target": target_primary_key_count
    })

    validation_results.append({
            "ValidationType": "ColumnCount",
            "Source": source_col_count,
            "Target": target_col_count
        })

    metrics.append({
        "ValidationType": "Number of Rows",
        "Source": source_count,
        "Target": target_count
    })

    # Number of Duplicate Rows (Basic Validation)
    source_duplicates = source_df.groupBy(source_df.columns).count().filter(F.col("count") > 1).count()
    target_duplicates = target_df.groupBy(target_df.columns).count().filter(F.col("count") > 1).count()
    metrics.append({
        "ValidationType": "Number of Duplicate Rows",
        "Source": source_duplicates,
        "Target": target_duplicates
    })
    metrics.append({
        "ValidationType": "Transformation Applied",
        "Direct Move": skipped_count,
        "Transformed Data Points" : applied_count
    })

    return validation_results , metrics


def save_dynamic_script(source_columns, target_columns, mapped_columns, primary_key):
    """
    Save the dynamic validation logic as a .py file with user-provided values.
    """
    script_content = f"""from pyspark.sql import SparkSession, functions as F

def perform_dynamic_validation(source_df, target_df):
    print(\"\\n=== Starting Validation ===\\n\")

    # Record Count Validation
    source_count = source_df.count()
    target_count = target_df.count()
    print(f\"Record Count Comparison:\\n  Source: {{source_count}}\\n  Target: {{target_count}}\\n\")

    # Column Count Validation
    source_column_count = len(source_df.columns)
    target_column_count = len(target_df.columns)
    print(f\"Column Count Comparison:\\n  Source: {{source_column_count}}\\n  Target: {{target_column_count}}\\n\")

    # Data Type Validation
"""
    for source_col, target_col in mapped_columns:
        script_content += f"""
    for source_col, target_col in [(\"{source_col}\", \"{target_col}\")]:
        source_dtype = [f.dataType for f in source_df.schema.fields if f.name == source_col][0]
        target_dtype = [f.dataType for f in target_df.schema.fields if f.name == target_col][0]
        print(f\"Data Type Comparison for '{{target_col}}':\\n  Source: {{source_dtype}}\\n  Target: {{target_dtype}}\\n\")
"""

    script_content += """
    # Null Values Validation
"""
    for source_col, target_col in mapped_columns:
        script_content += f"""
    for source_col, target_col in [(\"{source_col}\", \"{target_col}\")]:
        source_nulls = source_df.filter(F.col(source_col).isNull()).count()
        target_nulls = target_df.filter(F.col(target_col).isNull()).count()
        print(f\"Null Values Comparison for '{{target_col}}':\\n  Source: {{source_nulls}} nulls\\n  Target: {{target_nulls}} nulls\\n\")
"""

    script_content += f"""
    # Primary Key Uniqueness Validation
    source_primary_key_count = source_df.select(\"{primary_key}\").distinct().count()
    target_primary_key_count = target_df.select(\"{primary_key}\").distinct().count()
    print(f\"Primary Key Uniqueness Validation:\\n  Source Unique Keys: {{source_primary_key_count}}\\n  Target Unique Keys: {{target_primary_key_count}}\\n\")

if __name__ == \"__main__\":
    spark = SparkSession.builder \\
        .appName(\"Dynamic Validation\") \\
        .master(\"local[*]\") \\
        .getOrCreate()

    source_data = [(\"A1\", \"Value1\", None),
                   (\"A2\", \"Value2\", \"123\"),
                   (\"A3\", \"Value3\", \"456\")]

    target_data = [(\"A1\", \"ValueX\", None),
                   (\"A2\", \"Value2\", None),
                   (\"A3\", None, \"456\")]

    source_df = spark.createDataFrame(source_data, {source_columns})
    target_df = spark.createDataFrame(target_data, {target_columns})

    perform_dynamic_validation(source_df, target_df)
"""

    # Save script to file
    script_path = "generated_dynamic_validation.py"
    with open(script_path, "w") as script_file:
        script_file.write(script_content)
    print(f"Validation script has been saved to: {script_path}")

In [4]:
from pyspark.sql import functions as F

# def perform_full_validation(source_df, target_df, primary_key):
#     """
#     Perform full validation between source and target datasets.
#     Outputs mismatched records with details in the desired format, sorted by PrimaryKey.
#     """
#     if not hasattr(source_df, "columns") or not hasattr(target_df, "columns"):
#         raise TypeError("Both source_df and target_df must be valid PySpark DataFrames.")

#     # Identify common columns for validation
#     common_columns = [col for col in source_df.columns if col in target_df.columns]
#     if not common_columns:
#         raise ValueError("No common columns found between source and target DataFrames.")

#     # Standardize data types for comparison
#     for col in common_columns:
#         source_df = source_df.withColumn(col, F.col(col).cast("string"))
#         target_df = target_df.withColumn(col, F.col(col).cast("string"))

#     # Perform full outer join on the primary key
#     joined_df = source_df.alias("source").join(
#         target_df.alias("target"),
#         F.col(f"source.{primary_key}") == F.col(f"target.{primary_key}"),
#         how="full_outer"
#     )

#     mismatched_records = []
#     missing_keys = set()  # To track keys already marked as missing

#     # Handle missing keys in source
#     missing_in_source = joined_df.filter(F.col(f"source.{primary_key}").isNull()) \
#         .select(
#             F.col(f"target.{primary_key}").alias("PrimaryKey"),
#             *[F.col(f"target.{col}").alias(col) for col in common_columns]
#         )

#     for row in missing_in_source.collect():
#         row_dict = row.asDict()
#         missing_keys.add(row_dict["PrimaryKey"])  # Track the key as missing
#         mismatched_records.append({
#             "PrimaryKey": row_dict["PrimaryKey"],
#             "Column": "All Columns",
#             "SourceValue": None,
#             "TargetValue": {col: row_dict[col] for col in common_columns},
#             "Reason": "Missing Key in Source"
#         })

#     # Handle missing keys in target
#     missing_in_target = joined_df.filter(F.col(f"target.{primary_key}").isNull()) \
#         .select(
#             F.col(f"source.{primary_key}").alias("PrimaryKey"),
#             *[F.col(f"source.{col}").alias(col) for col in common_columns]
#         )

#     for row in missing_in_target.collect():
#         row_dict = row.asDict()
#         missing_keys.add(row_dict["PrimaryKey"])  # Track the key as missing
#         mismatched_records.append({
#             "PrimaryKey": row_dict["PrimaryKey"],
#             "Column": "All Columns",
#             "SourceValue": {col: row_dict[col] for col in common_columns},
#             "TargetValue": None,
#             "Reason": "Missing Key in Target"
#         })

#     # Handle column mismatches, skipping keys already marked as missing
#     for col in common_columns:
#         mismatched_df = joined_df.filter(
#             (F.col(f"source.{col}") != F.col(f"target.{col}")) |
#             (F.col(f"source.{col}").isNull() & F.col(f"target.{col}").isNotNull()) |
#             (F.col(f"source.{col}").isNotNull() & F.col(f"target.{col}").isNull())
#         ).select(
#             F.coalesce(F.col(f"source.{primary_key}"), F.col(f"target.{primary_key}")).alias("PrimaryKey"),
#             F.lit(col).alias("Column"),
#             F.col(f"source.{col}").alias("SourceValue"),
#             F.col(f"target.{col}").alias("TargetValue")
#         )

#         for row in mismatched_df.collect():
#             row_dict = row.asDict()
#             primary_key_value = row_dict["PrimaryKey"]

#             # Skip if the key is already marked as missing
#             if primary_key_value in missing_keys:
#                 continue

#             reason = "Null Mismatch" if row_dict["SourceValue"] is None or row_dict["TargetValue"] is None else "Value Mismatch"
#             mismatched_records.append({
#                 "PrimaryKey": primary_key_value,
#                 "Column": row_dict["Column"],
#                 "SourceValue": row_dict["SourceValue"],
#                 "TargetValue": row_dict["TargetValue"],
#                 "Reason": reason
#             })

#     # Sort the final records by PrimaryKey
#     mismatched_records.sort(key=lambda x: x["PrimaryKey"])

#     return mismatched_records


from pyspark.sql import functions as F

def perform_full_validation(source_df, target_df, primary_key):
    """
    Perform full validation between source and target datasets.
    Outputs mismatched records with details in the desired format, sorted by PrimaryKey.
    """

    if not hasattr(source_df, "columns") or not hasattr(target_df, "columns"):
        raise TypeError("Both source_df and target_df must be valid PySpark DataFrames.")

    # Identify common columns for validation
    common_columns = [col for col in source_df.columns if col in target_df.columns]
    if not common_columns:
        raise ValueError("No common columns found between source and target DataFrames.")

    # Standardize data types for comparison
    for col in common_columns:
        source_df = source_df.withColumn(col, F.col(col).cast("string"))
        target_df = target_df.withColumn(col, F.col(col).cast("string"))

    # Check for duplicates in source and target
    duplicate_in_source = source_df.groupBy(primary_key).count().filter(F.col("count") > 1)
    duplicate_in_target = target_df.groupBy(primary_key).count().filter(F.col("count") > 1)

    # Perform full outer join on the primary key
    joined_df = source_df.alias("source").join(
        target_df.alias("target"),
        F.col(f"source.{primary_key}") == F.col(f"target.{primary_key}"),
        how="full_outer"
    )

    mismatched_records = []
    missing_keys = set()  # To track keys already marked as missing

    # Handle missing keys in source
    missing_in_source = joined_df.filter(F.col(f"source.{primary_key}").isNull()) \
        .select(
            F.col(f"target.{primary_key}").alias("PrimaryKey"),
            *[F.col(f"target.{col}").alias(col) for col in common_columns]
        )

    for row in missing_in_source.collect():
        row_dict = row.asDict()
        missing_keys.add(row_dict["PrimaryKey"])  # Track the key as missing
        mismatched_records.append({
            "PrimaryKey": row_dict["PrimaryKey"],
            "Column": "All Columns",
            "SourceValue": None,
            "TargetValue": {col: row_dict[col] for col in common_columns},
            "Reason": "Missing Key in Source"
        })

    # Handle missing keys in target
    missing_in_target = joined_df.filter(F.col(f"target.{primary_key}").isNull()) \
        .select(
            F.col(f"source.{primary_key}").alias("PrimaryKey"),
            *[F.col(f"source.{col}").alias(col) for col in common_columns]
        )

    for row in missing_in_target.collect():
        row_dict = row.asDict()
        missing_keys.add(row_dict["PrimaryKey"])  # Track the key as missing
        mismatched_records.append({
            "PrimaryKey": row_dict["PrimaryKey"],
            "Column": "All Columns",
            "SourceValue": {col: row_dict[col] for col in common_columns},
            "TargetValue": None,
            "Reason": "Missing Key in Target"
        })

    # Handle column mismatches, skipping keys already marked as missing
    for col in common_columns:
        mismatched_df = joined_df.filter(
            (F.col(f"source.{col}") != F.col(f"target.{col}")) |
            (F.col(f"source.{col}").isNull() & F.col(f"target.{col}").isNotNull()) |
            (F.col(f"source.{col}").isNotNull() & F.col(f"target.{col}").isNull())
        ).select(
            F.coalesce(F.col(f"source.{primary_key}"), F.col(f"target.{primary_key}")).alias("PrimaryKey"),
            F.lit(col).alias("Column"),
            F.col(f"source.{col}").alias("SourceValue"),
            F.col(f"target.{col}").alias("TargetValue")
        )

        for row in mismatched_df.collect():
            row_dict = row.asDict()
            primary_key_value = row_dict["PrimaryKey"]

            # Skip if the key is already marked as missing
            if primary_key_value in missing_keys:
                continue

            reason = "Null Mismatch" if row_dict["SourceValue"] is None or row_dict["TargetValue"] is None else "Value Mismatch"
            mismatched_records.append({
                "PrimaryKey": primary_key_value,
                "Column": row_dict["Column"],
                "SourceValue": row_dict["SourceValue"],
                "TargetValue": row_dict["TargetValue"],
                "Reason": reason
            })

    # Handle duplicate keys
    duplicate_source_keys = [row.asDict()[primary_key] for row in duplicate_in_source.select(primary_key).collect()]
    duplicate_target_keys = [row.asDict()[primary_key] for row in duplicate_in_target.select(primary_key).collect()]

    for key in duplicate_source_keys:
        mismatched_records.append({
            "PrimaryKey": key,
            "Column": "All Columns",
            "SourceValue": None,
            "TargetValue": None,
            "Reason": "Duplicate Key in Source"
        })

    for key in duplicate_target_keys:
        mismatched_records.append({
            "PrimaryKey": key,
            "Column": "All Columns",
            "SourceValue": None,
            "TargetValue": None,
            "Reason": "Duplicate Key in Target"
        })

    # Sort the final records by PrimaryKey
    mismatched_records.sort(key=lambda x: x["PrimaryKey"])

    return mismatched_records


def save_full_validation_script(source_columns, target_columns, mapped_columns, primary_key):
    """
    Save the full validation logic as a .py file with user-provided values.
    """
    script_content = f"""from pyspark.sql import SparkSession, functions as F

def perform_full_validation(source_df, target_df):
    print(\"\\n=== Starting Full Validation ===\\n\")

    mismatched_records = []

    # Prepare join condition using the primary key
    join_condition = source_df[\"{primary_key}\"] == target_df[\"{primary_key}\"]

    # Perform a full outer join
    joined_df = source_df.alias(\"source\").join(
        target_df.alias(\"target\"),
        on=join_condition,
        how=\"full_outer\"
    )

    # Check for mismatched columns
"""
    for source_col, target_col in mapped_columns:
        script_content += f"""
    condition = (
        (F.col(f\"source.{source_col}\") != F.col(f\"target.{target_col}\")) |
        (F.col(f\"source.{source_col}\").isNull() & F.col(f\"target.{target_col}\").isNotNull()) |
        (F.col(f\"source.{source_col}\").isNotNull() & F.col(f\"target.{target_col}\").isNull())
    )

    mismatched_df = joined_df.filter(condition).select(
        F.coalesce(F.col(f\"source.{primary_key}\"), F.col(f\"target.{primary_key}\")).alias(\"Primary Key\"),
        F.col(f\"source.{source_col}\").alias(f\"Source {source_col}\"),
        F.col(f\"target.{target_col}\").alias(f\"Target {target_col}\")
    )

    for row in mismatched_df.collect():
        mismatched_records.append({{
            \"Primary Key\": row[\"Primary Key\"],
            f\"Source {source_col}\": row[f\"Source {source_col}\"],
            f\"Target {target_col}\": row[f\"Target {target_col}\"]
        }})
"""

    script_content += """
    # Output mismatched records
    print(\"\\n=== Mismatched Records ===\\n\")
    for record in mismatched_records:
        print(record)
"""

    # Save script to file
    script_path = "generated_full_validation.py"
    with open(script_path, "w") as script_file:
        script_file.write(script_content)
    print(f"Full validation script has been saved to: {script_path}")

In [5]:
!pip install langchain-openai

Collecting langchain-openai
  Downloading langchain_openai-0.3.3-py3-none-any.whl.metadata (2.7 kB)
Collecting langchain-core<0.4.0,>=0.3.33 (from langchain-openai)
  Downloading langchain_core-0.3.33-py3-none-any.whl.metadata (6.3 kB)
Collecting tiktoken<1,>=0.7 (from langchain-openai)
  Downloading tiktoken-0.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Downloading langchain_openai-0.3.3-py3-none-any.whl (54 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.5/54.5 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading langchain_core-0.3.33-py3-none-any.whl (412 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m412.7/412.7 kB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tiktoken-0.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m31.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collect

In [6]:
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1


In [7]:
import pandas as pd
import os
from pyspark.sql import functions as F
from langchain_openai import AzureChatOpenAI
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Azure OpenAI setup
azure_openai_endpoint = os.getenv("azure_openai_endpoint")
azure_openai_api_key = os.getenv("azure_openai_api_key")
azure_openai_deployment_name = os.getenv("azure_openai_deployment_name")
azure_openai_api_version = os.getenv("azure_openai_api_version")

# Initialize AzureChatOpenAI
llm = AzureChatOpenAI(
    azure_endpoint=azure_openai_endpoint,
    api_key=azure_openai_api_key,
    deployment_name=azure_openai_deployment_name,
    api_version=azure_openai_api_version,
    temperature=0,
    max_tokens=4096
)

def sanitize_output(response):
    """
    Cleans the output to ensure it only contains valid PySpark transformation logic.
    Removes unwanted newlines and ensures single-line expressions.
    """
    sanitized = response.strip().replace("```python", "").replace("```", "").strip()
    return " ".join(sanitized.splitlines())

def azure_translate_logic(logic):
    """
    Translates logic into a single-line PySpark expression using Azure OpenAI.
    """
    prompt = f"""
    Translate the following transformation logic into a concise and valid PySpark expression.
    Ensure the following:
    - Use appropriate PySpark functions such as F.col, F.when, F.to_date, F.date_format, etc., for date columns.
    - Handle dates explicitly in the format 'dd-MM-yyyy' where necessary.
    - Return the transformation logic expression as a single line (e.g., F.col(...), F.to_date(...)).
    - Do not include DataFrame references, assignments, or import statements.
    - Avoid multiline expressions or line continuations.
    Transformation Logic: {logic}
    """

    try:
        response = llm.invoke(prompt.strip())
        return sanitize_output(response.content)
    except Exception as e:
        return f"# Error translating logic: {str(e)}"

def generate_logic_safe(row):
    """
    Generate a concise, single-line PySpark expression for a given row.
    """
    transformation_logic = row.get("Transformation Logic", "").strip()

    if "Direct Move" in transformation_logic:
        return "# Skipped: No valid transformation logic provided"

    # Translate the logic using Azure OpenAI
    return azure_translate_logic(transformation_logic)

def apply_transformations_with_date_handling(source_df, mapping_df):
    """
    Apply transformations to the source DataFrame with explicit handling for date columns in 'dd-MM-yyyy' format.
    """
    for _, row in mapping_df.iterrows():
        target_col = row["Target"]
        pyspark_logic = row["PySpark Logic"]

        if pd.notna(pyspark_logic) and pyspark_logic.strip() and pyspark_logic != "# Skipped: No valid transformation logic provided":
            print(f"\n=== Applying Transformation for Column: '{target_col}' ===")
            try:
                print(f"Logic for '{target_col}': {pyspark_logic.strip()}")
                # Evaluate and apply the transformation logic
                transformed_expr = eval(pyspark_logic.strip())

                # Explicitly cast to date if the logic involves date handling
                if "to_date" in pyspark_logic or "date_format" in pyspark_logic:
                    source_df = source_df.withColumn(target_col, F.to_date(F.col(target_col), "dd-MM-yyyy"))
                else:
                    source_df = source_df.withColumn(target_col, transformed_expr)

                print(f"Transformation applied successfully for column: '{target_col}'")
            except Exception as e:
                print(f"Error applying logic for column '{target_col}': {e}")
        else:
            print(f"Skipping transformation for column '{target_col}' as no valid PySpark Logic is provided.")
    return source_df

def process_mapping_logic(mapping_df):
    """
    Process the mapping DataFrame row by row to generate concise PySpark expressions.
    """
    mapping_df["PySpark Logic"] = mapping_df.apply(generate_logic_safe, axis=1)
    mapping_df.to_csv("transformed.csv", index=False)
    return mapping_df

def process_all_sheets(file_path):
    """
    Process all sheets in the Excel file and save the results in a single Excel file.
    """
    sheets = pd.read_excel(file_path, sheet_name=None)  # Load all sheets as a dictionary
    writer = pd.ExcelWriter("Processed_ETL_Mapping.xlsx", engine="xlsxwriter")

    for sheet_name, df in sheets.items():
        print(f"\nProcessing sheet: {sheet_name}")
        processed_df = process_mapping_logic(df)
        processed_df.to_excel(writer, sheet_name=sheet_name, index=False)

    writer.close()
    print("\nAll sheets processed and saved to 'Processed_ETL_Mapping.xlsx'.")


In [10]:
import pandas as pd
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.window import Window
import json
import os ,sys


def is_json_string(value):
    """
    Check if a given string is a valid JSON.
    """
    if isinstance(value, str):
        try:
            json.loads(value)
            return True
        except json.JSONDecodeError:
            return False
    return False


def normalize_json_fields(df):
    """
    Normalize all JSON-like fields in a DataFrame.
    Detect JSON-like columns and clean their values.
    """
    json_columns = []

    for col in df.columns:
        if df[col].apply(lambda x: is_json_string(x)).any():
            json_columns.append(col)
            # Normalize the column
            df[col] = df[col].apply(lambda x: json.loads(x) if is_json_string(x) else x)

    #print(f"Detected JSON columns: {json_columns}")
    return df, json_columns


def detect_and_drop_datetime_columns(df):
    """
    Detect columns with values resembling date/datetime strings and drop them.
    """
    datetime_columns = []
    for col in df.columns:
        # Check if column values match date/datetime patterns
        sample_values = df.select(col).filter(F.col(col).isNotNull()).limit(100).rdd.flatMap(lambda x: x).collect()
        for value in sample_values:
            if isinstance(value, str):
                try:
                    # Attempt parsing with a common date format
                    pd.to_datetime(value, format="%d-%m-%Y", errors='raise')
                    datetime_columns.append(col)
                    break
                except:
                    pass
    if datetime_columns:
        #print(f"Detected and Dropping DateTime columns: {datetime_columns}")
        df = df.drop(*datetime_columns)
    return df


def clean_dataframe(df):
    """
    Clean and normalize JSON-like fields in a Spark DataFrame.
    """
    for col in df.columns:
        df = df.withColumn(col, F.udf(lambda x: json.dumps(json.loads(x)) if is_json_string(x) else x, T.StringType())(F.col(col)))
    return df

def validate_metadata(source_df, target_df, mapped_columns):
    """
    Validate if the mapped columns exist in the source and target DataFrames.
    """
    source_columns = set(source_df.columns)
    target_columns = set(target_df.columns)
    mapped_columns_set = set(mapped_columns)

    missing_in_source = mapped_columns_set - source_columns
    missing_in_target = mapped_columns_set - target_columns

    if missing_in_source or missing_in_target:
        return False, missing_in_source, missing_in_target
    return True, None, None


# def apply_transformations_with_debugging(source_df, mapping_df):
#     """
#     Apply transformations to the source DataFrame based on PySpark Logic with detailed debugging.
#     For multi-column transformations, append the same transformed values back to all involved Source columns.
#     Also, count the successfully applied transformations and those skipped.
#     """
#     from pyspark.sql import functions as F

#     # Step 1: Create a copy of the source DataFrame for transformations
#     transformed_df = source_df  # This will hold all transformed values
#     original_df = source_df  # Keep the original DataFrame intact

#     applied_count = 0
#     skipped_count = 0

#     for _, row in mapping_df.iterrows():
#         source_col = row["Source"]  # Source column name(s)
#         target_col = row["Target"]  # Target column name
#         pyspark_logic = row["PySpark Logic"]  # PySpark logic to apply

#         if pd.notna(pyspark_logic) and pyspark_logic.strip() and pyspark_logic != "# No valid transformation logic available":
#             print(f"\n=== Applying Transformation for Target Column: '{target_col}' ===")
#             try:
#                 # Parse the source columns (e.g., split "Col1, Col2" into a list)
#                 source_columns = [col.strip() for col in source_col.split(",")]

#                 # Evaluate the transformation logic dynamically
#                 print(f"Logic for '{target_col}': {pyspark_logic.strip()}")
#                 transformed_expr = eval(pyspark_logic.strip(), {"F": F, "source_df": original_df})

#                 # Add the transformed column to the result DataFrame
#                 transformed_df = transformed_df.withColumn(target_col, transformed_expr)

#                 # Append the same transformed value back to all Source columns involved
#                 for col in source_columns:
#                     if col in transformed_df.columns:
#                         transformed_df = transformed_df.withColumn(col, F.col(target_col))
#                         print(f"Appended transformed value to Source column: '{col}'")

#                 applied_count += 1
#                 print(f"Transformation applied successfully for column: '{target_col}'")

#             except Exception as e:
#                 print(f"Error applying logic for column '{target_col}': {e}")
#                 skipped_count += 1
#         else:
#             print(f"Skipping transformation for column '{target_col}' as no valid PySpark Logic is provided.")
#             skipped_count += 1

#     return transformed_df, applied_count, skipped_count


# def export_transformed_df(transformed_df, output_path: str = "Transformed_Data.csv"):
#     """
#     Export the transformed PySpark DataFrame to a CSV file.
#     """
#     transformed_df.toPandas().to_csv(output_path, index=False)
#     print(f"Transformed DataFrame exported to {output_path}.")

def apply_transformations_with_debugging(source_df, mapping_df):
    """
    Apply transformations to a copy of the source DataFrame with detailed debugging.
    Use the original source data as a reference during transformations.
    Handle multi-column transformations by assigning the transformed value to all involved columns.
    """
    from pyspark.sql import functions as F

    # Step 1: Create a copy of the source DataFrame for transformations
    transformed_df = source_df.alias("transformed_copy")  # Copy for applying transformations
    original_df = source_df.alias("original_source")  # Keep the original source data intact

    applied_count = 0
    skipped_count = 0

    for _, row in mapping_df.iterrows():
        source_col = row["Source"]  # Source column name(s)
        target_col = row["Target"]  # Target column name
        pyspark_logic = row["PySpark Logic"]  # PySpark logic to apply

        if pd.notna(pyspark_logic) and pyspark_logic.strip() and pyspark_logic != "# No valid transformation logic available":
            print(f"\n=== Applying Transformation for Target Column: '{target_col}' ===")
            try:
                # Parse the source columns (e.g., split "Col1, Col2" into a list)
                source_columns = [col.strip() for col in source_col.split(",")]

                # Build the transformation logic using the original DataFrame as reference
                print(f"Logic for '{target_col}': {pyspark_logic.strip()}")
                transformed_expr = eval(pyspark_logic.strip(), {"F": F, "source_df": original_df})

                # Add the transformed column to the transformed DataFrame
                transformed_df = transformed_df.withColumn(target_col, transformed_expr)

                # If multiple source columns are involved, ensure their transformed values remain intact in the reference
                for col in source_columns:
                    if col in transformed_df.columns:
                        transformed_df = transformed_df.withColumn(col, F.col(target_col))
                        print(f"Updated Source column '{col}' with transformed value.")

                applied_count += 1
                print(f"Transformation applied successfully for column: '{target_col}'")

            except Exception as e:
                print(f"Error applying logic for column '{target_col}': {e}")
                skipped_count += 1
        else:
            print(f"Skipping transformation for column '{target_col}' as no valid PySpark Logic is provided.")
            skipped_count += 1

    print(f"\nTotal Transformations Applied: {applied_count}")
    print(f"Total Transformations Skipped: {skipped_count}")

    return transformed_df, applied_count, skipped_count

def export_transformed_df(transformed_df, output_path: str = "Transformed_Data.csv"):
    """
    Export the transformed PySpark DataFrame to a CSV file.
    """
    transformed_df.toPandas().to_csv(output_path, index=False)
    print(f"Transformed DataFrame exported to {output_path}.")


def debug_all_transformations(mapping_df):
    """
    Debug transformation logic for all columns in the mapping DataFrame.
    """
    print("\n=== Debugging All Transformations ===")
    for _, row in mapping_df.iterrows():
        target_col = row["Target"]
        pyspark_logic = row["PySpark Logic"]

        print(f"\n=== Debugging Column: '{target_col}' ===")
        if pd.notna(pyspark_logic) and pyspark_logic.strip():
            print(f"Logic: {pyspark_logic.strip()}")
        else:
            print(f"No valid logic for column: '{target_col}'")

import io

def write_validation_results_to_excel(validation_results):
    """
    Write validation results to an Excel file using an in-memory buffer.
    JSON-like fields in the results are flattened into separate columns.

    Args:
        validation_results (list): List of validation result dictionaries.

    Returns:
        BytesIO: In-memory file buffer containing the Excel data.
    """
    # Flatten JSON-like fields if necessary
    flattened_records = []

    for record in validation_results:
        primary_key = record.get("primary_key", "")
        mismatched_columns = record.get("mismatched_columns", {})

        if mismatched_columns:
            for col, details in mismatched_columns.items():
                flattened_records.append({
                    "PrimaryKey": primary_key,
                    "Column": col,
                    "SourceValue": details.get("Source", ""),
                    "TargetValue": details.get("Target", ""),
                    "Reason": details.get("Reason", "")
                })
        else:
            # If there are no mismatched columns, keep the original record structure
            flattened_records.append(record)

    # Convert to DataFrame
    validation_df = pd.DataFrame(flattened_records)

    # Write to an in-memory buffer
    file_buffer = io.BytesIO()
    validation_df.to_excel(file_buffer, index=False, engine='openpyxl')

    file_buffer.seek(0)  # Reset the buffer's position to the beginning
    print("Validation results written to buffer")
    return file_buffer


if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("DataValidationWithMappings") \
        .master("local[*]") \
        .config("spark.sql.caseSensitive", "true") \
        .getOrCreate()


    source_path = "/content/source_trail_1.csv"
    target_path = "/content/target_trail_1 - Copy.csv"
    mapping_path = "/content/mapping_document 1 (1) (5) (2).xlsx"


    # Load data into Pandas for JSON detection and cleaning
    source_pdf = pd.read_csv(source_path)
    target_pdf = pd.read_csv(target_path)
    mapping_df = pd.read_excel(mapping_path)

    mapping_df = process_mapping_logic(mapping_df)

    # Normalize JSON fields in source and target data
    source_pdf, source_json_columns = normalize_json_fields(source_pdf)
    target_pdf, target_json_columns = normalize_json_fields(target_pdf)

    # Save cleaned JSON back as CSV
    source_path_cleaned = "/content/data/source_cleaned_final.csv"
    target_path_cleaned = "/content/data/target_cleaned_final.csv"
    os.makedirs(os.path.dirname(source_path_cleaned), exist_ok=True)

    source_pdf.to_csv(source_path_cleaned, index=False)
    target_pdf.to_csv(target_path_cleaned, index=False)

    # Load cleaned data into Spark DataFrames
    source_df = spark.read.csv(source_path_cleaned, header=True)
    target_df = spark.read.csv(target_path_cleaned, header=True)

    # Clean the DataFrames
    source_df_cleaned = clean_dataframe(source_df)
    target_df_cleaned = clean_dataframe(target_df)


    source_df_cleaned , applied_count, skipped_count = apply_transformations_with_debugging(source_df_cleaned, mapping_df)
    export_transformed_df(source_df_cleaned, "Transformed_Data_2.csv")


    # ============================================================
    # STEP 1: Check if Primary Key is present in both DataFrames
    # ============================================================
    primary_key = get_primary_key(mapping_path)
    source_cols = set(source_df_cleaned.columns)
    print(source_cols)
    target_cols = set(target_df_cleaned.columns)

    if primary_key not in source_cols or primary_key not in target_cols:
        print("\n=== MetaData Validation Error ===")
        print(f"Primary key '{primary_key}' not found in Source or Target.")
        #print("MetaData of Source and Target doesn't match. Skipping validations.")
        sys.exit()

    # ============================================================
    # STEP 2: Check if mapped columns exist in both DataFrames
    # ============================================================

    column_mappings = load_mappings(mapping_path)
    print("Column Mappings:", column_mappings)

    is_valid_metadata, missing_in_source, missing_in_target = validate_metadata(
        source_df_cleaned,
        target_df_cleaned,
        column_mappings
    )

    # ============================================================
    # STEP 3: If all checks pass, proceed with validations
    # ============================================================
    print("\n=== Proceeding with Validations ===")

    # Get mapped columns for validation
    mapped_columns = apply_mappings(column_mappings)

    # Perform Basic Validation
    print("\n=== Basic Validation ===")
    basic_results , metrics = perform_dynamic_validation(source_df_cleaned, target_df_cleaned, mapped_columns, primary_key)
    print(metrics)
    save_dynamic_script(source_cols, target_cols, mapped_columns, primary_key)

# Convert the JSON results into a pandas DataFrame and print as a table
    if isinstance(basic_results, list):
      basic_df = pd.DataFrame(basic_results)
      basic_df.to_excel("basic_valid.xlsx",index=False)
      print(basic_df.to_string(index=False))
    else:
      print("Basic validation results are not in the expected format (list of dictionaries).")

# Perform Full Validation
    print("\n=== Full Validation ===")
    mismatched_records = perform_full_validation(source_df_cleaned, target_df, primary_key)
    print(mismatched_records)
    result_excel = write_validation_results_to_excel(mismatched_records)
    print(result_excel)


=== Applying Transformation for Target Column: 'PatientID' ===
Logic for 'PatientID': # Skipped: No valid transformation logic provided
Error applying logic for column 'PatientID': invalid syntax (<string>, line 1)

=== Applying Transformation for Target Column: 'SampleID' ===
Logic for 'SampleID': # Skipped: No valid transformation logic provided
Error applying logic for column 'SampleID': invalid syntax (<string>, line 1)

=== Applying Transformation for Target Column: 'StudyID' ===
Logic for 'StudyID': # Skipped: No valid transformation logic provided
Error applying logic for column 'StudyID': invalid syntax (<string>, line 1)

=== Applying Transformation for Target Column: 'Age' ===
Logic for 'Age': F.lit(2025) - F.year(F.to_date(F.col("DateOfBirth"), "dd-MM-yyyy"))
Updated Source column 'DateOfBirth' with transformed value.
Transformation applied successfully for column: 'Age'

=== Applying Transformation for Target Column: 'BiomarkerIndex' ===
Logic for 'BiomarkerIndex': F.round