Job needs the ability to iterate through two directories, housed in mounted folders in Databricks, and run both pair-wise and field-level comparisons between sets of two files (one modern and one legacy)

 legacy and modern directories : 
 - 2 sets of output files 
 - 1 from legacy code and 1 from modern code with output files to different S3 directories.


- output directory
- prefix/suffix
- agency/segment/state code


irmf_payee_state_41762

For the other output files there are 3 types of reports: These report output files will not be loaded back into databricks. We want to only complete file-level comparison between related legacy and modern files.

Report 1:   417-62-40.   will produce 1 report per segment (AA - AZ)
file naming convention:

I41766-F040.<SS>.L<YYYY><CC>
 
Report 2:  417-63-40 will produce 1 report per segment (AA - AZ)
file naming convention:

I41767-F040.<SS>.L<YYYY><CC>
 
Report 3 :  417-64-99  will produce 1 report per segment (AA-AZ)
file naming convention:

I41767-F099.<SS>.L<YYYY><CC>


TODO: Use difflib to have structured and unstructured comparisons. Unstructured is a string-by-string comparison using difflib, so we'd need a way to detect if a file is structured/unstructured and route accordingly for comparisons.


In [0]:

from pyspark.sql.types import StructType, StructField, StringType


# Define the schema for the CSV files
schema = StructType([
    StructField("column1", StringType(), True),
    StructField("column2", StringType(), True),
    # Add more fields as per the actual CSV structure
])


# Read CSV files into DataFrames with the defined schema
df_417_62_40 = spark.read.schema(schema).csv("/Volumes/labuser11156715_1755179084/default/i41766-f040/*.csv")
df_417_63_40 = spark.read.schema(schema).csv("/Volumes/labuser11156715_1755179084/default/i41767-f040/*.csv")
df_417_64_99 = spark.read.schema(schema).csv("/Volumes/labuser11156715_1755179084/default/i41767-f099/*.csv")

# Write DataFrames to tables with schema merging enabled
df_417_62_40.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("labuser11156715_1755179084.default.irmf_payee_state_41766_f040")
df_417_63_40.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("labuser11156715_1755179084.default.irmf_payee_state_41767_f040")
df_417_64_99.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("labuser11156715_1755179084.default.irmf_payee_state_41767_f099")

from pyspark.sql.functions import concat_ws

# Concatenate all columns into a single column with a table-like format
df_417_62_40_concat = df_417_62_40.select(concat_ws(" | ", *df_417_62_40.columns).alias("concatenated_row"))

display(df_417_62_40_concat)
# Generate summary statistics for each DataFrame
summary_417_62_40 = df_417_62_40.summary()
summary_417_63_40 = df_417_63_40.summary()
summary_417_64_99 = df_417_64_99.summary()

# Display the summary statistics
#display(summary_417_62_40)
#display(summary_417_63_40)
#display(summary_417_64_99)

In [0]:
# Compare rows in each summary DataFrame by joining on 'summary' column
comparison_62_63 = summary_417_62_40.join(summary_417_63_40, on="summary", how="inner")
comparison_63_99 = summary_417_63_40.join(summary_417_64_99, on="summary", how="inner")
comparison_62_99 = summary_417_62_40.join(summary_417_64_99, on="summary", how="inner")

display(comparison_62_63)
display(comparison_63_99)
display(comparison_62_99)

In [0]:
# Create volume for 41766 text files
spark.sql("""
DROP VOLUME IF EXISTS labuser11156715_1755179084.default.`I41767-F099_text`
""")
spark.sql("""
CREATE VOLUME labuser11156715_1755179084.default.`I41767-F099_text`
""")

In [0]:
import difflib

test_file = '/Volumes/labuser11156715_1755179084/default/i41767-f099_text/417-62_report.txt'
compare_file = '/Volumes/labuser11156715_1755179084/default/i41767-f099_text/417-63_report.txt'

with open(test_file, 'r') as f1,  open(compare_file, 'r') as f2:
    file1_lines = f1.readlines()
    file2_lines = f2.readlines()
    match_bool = file1_lines == file2_lines
    print(f"Testfile: {test_file}")
    print(f"Compare File: {compare_file}")
    print(f"Matche = {match_bool}")

difference = difflib.unified_diff(file1_lines, file2_lines, fromfile = test_file, tofile = compare_file)
i = 0
for line in difference: 
   print(line, end='')

TODO: Make compare_dbfs iterative on directories, error handling to that function

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


def compare_dbfs_files_distributed(
    file1_path: str,
    file2_path: str,
    output_csv_path: str = None,
    show_differences: bool = True,
    show_limit: int = 50
):
    """
    Distributed comparison of two DBFS text files using Spark DataFrames.
   
    Args:
        file1_path (str): Path to first file in DBFS (e.g., "dbfs:/FileStore/...")
        file2_path (str): Path to second file in DBFS.
        output_csv_path (str): Optional DBFS path to write mismatches as CSV (e.g. "dbfs:/FileStore/comparison/mismatches.csv")
        show_differences (bool): If True, prints mismatches.
        show_limit (int): Maximum number of mismatches to print.
   
    Returns:
        DataFrame: Spark DataFrame containing mismatched lines.
    """
    # Read files as Spark DataFrames
    df1 = spark.read.text(file1_path).withColumnRenamed("value", "line1")
    df2 = spark.read.text(file2_path).withColumnRenamed("value", "line2")
   
    # Assign row numbers for proper alignment
    w = Window.orderBy(F.monotonically_increasing_id())
    df1 = df1.withColumn("line_no", F.row_number().over(w))
    df2 = df2.withColumn("line_no", F.row_number().over(w))
   
   
    # Full outer join by line number
    joined = df1.join(df2, on="line_no", how="outer").orderBy("line_no")
   
    # Find lines that differ or are missing
    mismatches = joined.filter(
        (F.col("line1") != F.col("line2")) | F.col("line1").isNull() | F.col("line2").isNull()
    )
   
    mismatch_count = mismatches.count()
    print(f"File 1: {file1_path}")
    print(f"File 2: {file2_path}")
    print(f"Match = {mismatch_count == 0}")
   
    # Print mismatches if requested
    if mismatch_count > 0 and show_differences:
        print("\n--- Differences (showing up to", show_limit, "rows) ---")
        mismatches.show(show_limit, truncate=False)
        print("\n-------------------")
   
    # Write mismatches to DBFS CSV if a path is given
    if output_csv_path is not None and mismatch_count > 0:
        mismatches.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_csv_path)
        print(f"Mismatches written to: {output_csv_path}")
   
    return mismatches

In [0]:
test_file = '/Volumes/labuser11156715_1755179084/default/i41767-f099_text/417-62_report.txt'
compare_file = '/Volumes/labuser11156715_1755179084/default/i41767-f099_text/417-63_report.txt'
csv_file = '/Volumes/labuser11156715_1755179084/default/file_compare_csv/output.csv'
df_mismatches = compare_dbfs_files_distributed(test_file, compare_file, csv_file)
display(df_mismatches)

In [0]:
import difflib
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def parse_with_difflib(file_path, schema):
    df = spark.read.text(file_path)
    
    # Apply difflib parsing to each row in each column
    def parse_row(row):
        # Example: join all column values and get a unified diff with itself (placeholder logic)
        text = " ".join([str(x) for x in row])
        diff = "\n".join(difflib.unified_diff([text], [text]))
        return diff

    parse_udf = udf(parse_row, StringType())
    df_parsed = df.withColumn("difflib_parsed", parse_udf(*df.columns))
    return df_parsed

df_417_62_40 = parse_with_difflib("/Volumes/labuser11156715_1755179084/default/i41767-f099_text/*", schema)
df_417_63_40 = parse_with_difflib("/Volumes/labuser11156715_1755179084/default/i41767-f099_text/*", schema)
df_417_64_99 = parse_with_difflib("/Volumes/labuser11156715_1755179084/default/i41767-f099_text/*", schema)

display(df_417_62_40)
display(df_417_63_40)
display(df_417_64_99)

In [0]:
%sql
CREATE OR REPLACE TABLE labuser11156715_1755179084.default.irmf_payee_state_41766_f040 
-- CREATE OR REPLACE TABLE labuser11156715_1755179084.default.irmf_payee_state_41767_f040 
--CREATE OR REPLACE TABLE labuser11156715_1755179084.default.irmf_payee_state_41767_f099
(
        separate_cd varchar(3),
        valid_ind varchar(1),
        primary_tin varchar(9),
        student_ind varchar(1),
        dist_code_1099k varchar(2),
        merch_category_cd varchar(4),
        student_ind_grad varchar(1),
        reserve2 varchar(1),
        ind1099k varchar(1),
        document_code varchar(2),
        payer_tin varchar(9),
        payer_tin_type varchar(1),
        amended_indicator varchar(1),
        tax_year varchar(4),
        payment_cd_1 varchar(3),
        payment_sign_1 varchar(1),
        payment_amt_1 varchar(11),
        payment_cd_2 varchar(3),
        payment_sign_2 varchar(1),
        payment_amt_2 varchar(11),
        payment_cd_3 varchar(3),
        payment_sign_3 varchar(1),
        payment_amt_3 varchar(11),
        payment_cd_4 varchar(3),
        payment_sign_4 varchar(1),
        payment_amt_4 varchar(11),
        payment_cd_5 varchar(3),
        payment_sign_5 varchar(1),
        payment_amt_5 varchar(11),
        payment_cd_6 varchar(3),
        payment_sign_6 varchar(1),
        payment_amt_6 varchar(11),
        payment_cd_7 varchar(3),
        payment_sign_7 varchar(1),
        payment_amt_7 varchar(11),
        payment_cd_8 varchar(3),
        payment_sign_8 varchar(1),
        payment_amt_8 varchar(11),
        payment_cd_9 varchar(3),
        payment_sign_9 varchar(1),
        payment_amt_9 varchar(11),
        payment_cd_10 varchar(3),
        payment_sign_10 varchar(1),
        payment_amt_10 varchar(11),
        payment_cd_11 varchar(3),
        payment_sign_11 varchar(1),
        payment_amt_11 varchar(11),
        payment_cd_12 varchar(3),
        payment_sign_12 varchar(1),
        payment_amt_12 varchar(11),
        payment_cd_13 varchar(3),
        payment_sign_13 varchar(1),
        payment_amt_13 varchar(11),
        payer_acct_num varchar(20),
        perfection_history varchar(2),
        change_tin_ind varchar(1),
        chng_nm_ctrl_ind varchar(1),
        payee_name_line_1 varchar(50),
        payee_name_line_2 varchar(40),
        payee_address varchar(40),
        payee_city_state varchar(40),
        payee_zip_cd varchar(9),
        payer_name_line_1 varchar(40),
        payer_name_line_2 varchar(40),
        payer_address_line varchar(40),
        payer_city_state_zip varchar(40),
        direct_sales varchar(1),
        gambling_cd varchar(1),
        item_desc varchar(38),
        bond_cusip varchar(13),
        job_id varchar(50),
        source_file_nm varchar(50)
    );