In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

In [0]:
%run ./installs

In [0]:
import polars as pl
import polars.selectors as cs
from sparkpl.converter import spark_to_polars, polars_to_spark

from wadoh_raccoon.utils import helpers
from wadoh_subtyping import transform as tf, qa
from databricks.sdk.runtime import *

# import local functions
import processor
import write
import utils

In [0]:
# run the whole process
res = processor.match_phl()
# breakpoint()
# run the rematch process
rematch = processor.rematch(pull_res=res.pull_res)

temp = (
    rematch.rematch_exact_matched
    .select(~cs.ends_with("_right"))
    .select(~cs.ends_with("_"))
    .select(~cs.ends_with("_em"))
)

# transform and output final roster
# first need to combine with rematch roster
if len(temp)>0 and len(res.exact_matched_full) > 0:

    combined = (pl.concat([res.exact_matched_full,temp]))

    print('rematch records found, append to roster')
    create_roster = (
        tf.create_roster(
            matched_and_transformed_df=combined,
            respnet=res.pull_res.respnet
        )
    )

    final_roster = (tf.dedup_roster(roster_inp=create_roster,reference_inp=res.pull_res.respnet))

elif len(temp)==0 and len(res.exact_matched_full) > 0:
    print("exact matches found in main, no rematches found")
    create_roster = (
        tf.create_roster(
            matched_and_transformed_df=res.exact_matched_full,
            respnet=res.pull_res.respnet
        )
    )

    final_roster = (tf.dedup_roster(roster_inp=create_roster,reference_inp=res.pull_res.respnet))

elif len(temp)>0 and len(res.exact_matched_full)==0:
    print("rematch matches found, no main exact mains though.")
    create_roster = (
        tf.create_roster(
            matched_and_transformed_df=temp,
            respnet=res.pull_res.respnet
        )
    )

    final_roster = (tf.dedup_roster(roster_inp=create_roster,reference_inp=res.pull_res.respnet))

elif len(temp)==0 and len(res.exact_matched_full)==0:
    print('Exit: both rematch and normal roster are empty')
    final_roster = pl.DataFrame()

In [0]:
write_data=True

In [0]:
if write_data:
    try: 
        review_cols = [
            'submission_number',
            'internal_create_date',
            'CASE_ID',
            'first_name_clean',
            'last_name_clean',
            'first_name_clean_right',
            'last_name_clean_right',
            'submitted_dob',
            'submitted_collection_date',
            'reference_collection_date',
            'business_day_count',
            'match_ratio',
            'reverse_match_ratio',
            'matched',
            'reverse_matched'
        ]
        # combine the review dfs if they are both filled
        if len(rematch.rematch_fuzzy_matched) > 0 and len(res.fuzzy_matched) == 0:
            review = (
                rematch.rematch_fuzzy_matched
                .unique(subset='submission_number')
                .select(review_cols)
            )
        elif len(rematch.rematch_fuzzy_matched) == 0 and len(res.fuzzy_matched) > 0:
            review = (
                res.fuzzy_matched_full
                .unique(subset='submission_number')
                .select(review_cols)
            )
        elif len(rematch.rematch_fuzzy_matched) == 0 and len(res.fuzzy_matched) == 0:
            print('\nNothing to add to review table.')
            review = pl.DataFrame()
    except ValueError:
        print('review dataframe creation didnt work') 

    if len(review) > 0:

        to_write = (
            review
            .with_columns(
                good_match=False,
                resolved=False,
                notes=None
            )
            # .write_csv(f"{dir_name}/fuzzy_matched_review_{date.today()}.csv")
        )

        # utils.polars_to_spark_temp_view(spark,to_write,'to_write_df')

        spark_df = polars_to_spark(to_write)
        spark_df.createOrReplaceTempView("to_write_df")
        utils.safe_insert_sql_uc(
            spark=spark,
            catalog='tc_aim_prod',
            schema='diqa_sandbox',
            table='fuzzy_review_resp',
            source_view='to_write_df',
            join_key='submission_number'
        )


    if isinstance(final_roster, pl.DataFrame) and len(final_roster) > 0:
        print('\nWriting roster to roster table') 

        # utils.polars_to_spark_temp_view(spark,final_roster,'final_roster_df')
        spark_df = polars_to_spark(final_roster)
        spark_df.createOrReplaceTempView("final_roster_df")
        utils.safe_insert_sql_uc(
            spark=spark,
            catalog='tc_aim_prod',
            schema='diqa_sandbox',
            table='roster_resp',
            source_view='final_roster_df',
            join_key='submission_number'
        )
    
    print('\nWriting outputs to internal tables')
    write.write_phl(pull_res=res.pull_res, w_res=res.w_res, main_res=res)

    print('\nWriting rematch outputs to tables')
    write.write_rematch(rematch_res=rematch,pull_res=res.pull_res)