## import libraries

In [1]:
from cadv_exploration.utils import load_dotenv

load_dotenv()
from scripts.python.utils import setup_logger
from llm.langchain.downstream_task_prompt import SQL_QUERY_TASK_DESCRIPTION, ML_INFERENCE_TASK_DESCRIPTION
from inspector.deequ.deequ_inspector_manager import DeequInspectorManager
from llm.langchain import LangChainCADV
from data_models import Constraints

from loader import FileLoader

from cadv_exploration.dq_manager import DeequDataQualityManager
from cadv_exploration.utils import get_project_root
import pandas as pd
import oyaml as yaml

logger = setup_logger("toy_example.log")



## Load the data and utility functions

In [2]:
dq_manager = DeequDataQualityManager()
train_file_path = get_project_root() / "data" / "toy_example" / "files" / "hospitalisations_train.csv"
test_file_path = get_project_root() / "data" / "toy_example" / "files" / "hospitalisations_test.csv"
train_data = FileLoader.load_csv(train_file_path, na_values=["NULL"])
test_data = FileLoader.load_csv(test_file_path, na_values=["NULL"])
spark_train_data, spark_train = dq_manager.spark_df_from_pandas_df(train_data)
spark_test_data, spark_test = dq_manager.spark_df_from_pandas_df(test_data)


def validate_on_test_data(constraints, test_data):
    code_column_map = constraints.get_suggestions_code_column_map(valid_only=False)
    code_list = [item for item in code_column_map.keys()]
    spark_test_data, spark_test = dq_manager.spark_df_from_pandas_df(test_data)
    status_on_test_data = dq_manager.validate_on_spark_df(spark_test, spark_test_data, code_list,
                                                          return_raw=True)
    code_list_for_constraints = [
        (code_list[i], status_on_test_data[i].constraint_status, status_on_test_data[i].constraint_message) for i
        in
        range(len(code_list))]
    return pd.DataFrame(code_list_for_constraints, columns=["column_name", "constraint_status", "constraint_message"])

:: loading settings :: url = jar:file:/Users/haochen/Library/Caches/pypoetry/virtualenvs/cadv-exploration-4wWqlI_J-py3.9/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/haochen/.ivy2/cache
The jars for the packages stored in: /Users/haochen/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3f238f2e-647b-42dc-80e8-93b34470e492;1.0
	confs: [default]
	found com.amazon.deequ#deequ;2.0.7-spark-3.5 in central
	found org.scala-lang#scala-reflect;2.12.10 in local-m2-cache
	found org.scalanlp#breeze_2.12;2.1.0 in local-m2-cache
	found org.scalanlp#breeze-macros_2.12;2.1.0 in local-m2-cache
	found org.typelevel#spire_2.12;0.17.0 in local-m2-cache
	found org.typelevel#spire-macros_2.12;0.17.0 in local-m2-cache
	found org.typelevel#algebra_2.12;2.0.1 in local-m2-cache
	found org.typelevel#cats-kernel_2.12;2.1.1 in local-m2-cache
	found org.typelevel#spire-platform_2.12;0.17.0 in local-m2-cache
	found org.typelevel#spire-util_2.12;0.17.0 in local-m2-cache
	found dev.ludovic.netlib#blas;3.0.1 in local-m2-cache
	found dev.ludovic.netlib#lapack;3.0.1 in local-m2-c

In [3]:
train_data

Unnamed: 0,ssn,gender,race,bloodtype,diagnosis,admission_day,discharge_day,insurance,cost,complications
0,420-64-XXXX,1,white,A pos,cough,10,10,UHG,10,N
1,423-33-XXXX,1,asian,O pos,fraction,10,12,KP,1000,N
2,545-31-XXXX,2,white,A pos,fraction,17,19,UHG,1000,N
3,222-24-XXXX,1,hispanic,O pos,cancer,20,25,CG,10000,N


In [4]:
test_data

Unnamed: 0,ssn,gender,race,bloodtype,diagnosis,admission_day,discharge_day,insurance,cost,complications
0,221-04-XXXX,1.0,black,AB neg,stroke,21.0,21,UHG,0,N
1,434-29-XXXX,,asian,O pos,,,22,KP,1000,Y
2,212-56-XXXX,1.0,white,O pos,bloodknot,22.0,29,UHG,1000,N


## get constraints with deequ

In [5]:
constraints = dq_manager.get_constraints_for_spark_df(spark_train, spark_train_data, spark_test,
                                                      spark_test_data)
print(yaml.dump(constraints.to_dict()))

25/02/18 15:36:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Python Callback server started!
constraints:
  admission_day:
    code:
    - - .isComplete("admission_day")
      - Invalid
    - - .isNonNegative("admission_day")
      - Valid
    assumptions: []
  bloodtype:
    code:
    - - .isComplete("bloodtype")
      - Valid
    - - .isContainedIn("bloodtype", ["A pos", "O pos"])
      - Invalid
    assumptions: []
  complications:
    code:
    - - .isComplete("complications")
      - Valid
    - - .isContainedIn("complications", ["N"])
      - Invalid
    assumptions: []
  cost:
    code:
    - - .isComplete("cost")
      - Valid
    - - .isNonNegative("cost")
      - Valid
    assumptions: []
  diagnosis:
    code:
    - - .isComplete("diagnosis")
      - Invalid
    assumptions: []
  discharge_day:
    code:
    - - .isComplete("discharge_day")
      - Valid
    - - .isNonNegative("discharge_day")
      - Valid
    - - .isUnique("discharge_day")
      - Valid
    assumptions: []
  gender:
    code:
    - - .isComplete("gender")
      - In

In [6]:
validate_on_test_data(constraints, test_data)

Unnamed: 0,column_name,constraint_status,constraint_message
0,".isComplete(""gender"")",Failure,Value: 0.6666666666666666 does not meet the co...
1,".isNonNegative(""gender"")",Success,
2,".isComplete(""insurance"")",Success,
3,".isComplete(""race"")",Success,
4,".isComplete(""cost"")",Success,
5,".isNonNegative(""cost"")",Success,
6,".isComplete(""discharge_day"")",Success,
7,".isNonNegative(""discharge_day"")",Success,
8,".isUnique(""discharge_day"")",Success,
9,".isContainedIn(""bloodtype"", [""A pos"", ""O pos""])",Failure,Value: 0.6666666666666666 does not meet the co...


# downstream task 1

## prepare context for LLM

In [7]:
column_desc = DeequInspectorManager().spark_df_to_column_desc(spark_train_data, spark_train)
context = """
nonsensitive_df = duckdb.sql("SELECT * EXCLUDE ssn, gender, race
FROM 's3://datalake/latest/hospitalisations.csv'").df()
hosp_df = nonsensitive_df.dropna()
strokes_total = duckdb.sql("SELECT COUNT(*) FROM hosp_df
WHERE diagnosis = 'stroke'").fetch()
strokes_for_rare_bloodtypes = duckdb.sql("SELECT COUNT(*)
FROM hosp_df WHERE diagnosis = 'stroke'
AND bloodtype IN ('AB negative', 'B negative')").fetch()
generate_report(strokes_total, strokes_for_rare_bloodtypes)"""

## run LLM on toy model with default settings

In [8]:
lc = LangChainCADV(model_name="gpt-4o", downstream_task_description=SQL_QUERY_TASK_DESCRIPTION,
                   assumption_generation_trick=None, logger=logger)

relevant_columns_list, expectations, suggestions = lc.invoke(
    input_variables={"column_desc": column_desc, "script": context},
    num_stages=3,
    max_retries=3
)
code_list_for_constraints = [item for v in suggestions.values() for item in v]

# Validate the constraints on the original data to see if they are grammarly correct
code_list_for_constraints_valid = dq_manager.filter_constraints(code_list_for_constraints, spark_test,
                                                                spark_test_data)
constraints = Constraints.from_llm_output(relevant_columns_list, expectations, suggestions,
                                          code_list_for_constraints_valid)

print(yaml.dump(constraints.to_dict()))

constraints:
  bloodtype:
    code:
    - - .isComplete('bloodtype')
      - Valid
    - - .isContainedIn('bloodtype', ['AB negative', 'B negative'])
      - Invalid
    assumptions:
    - Values in the 'bloodtype' column should be in the set {'AB negative', 'B negative'}
      if they are considered rare blood types for the purpose of filtering.
  diagnosis:
    code:
    - - .isComplete('diagnosis')
      - Invalid
    - - .isContainedIn('diagnosis', ['stroke'])
      - Invalid
    assumptions:
    - Values in the 'diagnosis' column should include 'stroke' for accurate counting
      of strokes.



In [10]:
validate_on_test_data(constraints, test_data)

Unnamed: 0,column_name,constraint_status,constraint_message
0,.isComplete('diagnosis'),Failure,Value: 0.6666666666666666 does not meet the co...
1,".isContainedIn('diagnosis', ['stroke'])",Failure,Value: 0.6666666666666666 does not meet the co...
2,.isComplete('bloodtype'),Success,
3,".isContainedIn('bloodtype', ['AB negative', 'B...",Failure,Value: 0.0 does not meet the constraint requir...


## run LLM on toy model with with_deequ trick

It will add, delete and modify the constraints generated by deequ

In [11]:
deequ_assumptions = dq_manager.get_constraints_for_spark_df(spark_train, spark_train_data).to_string()
lc = LangChainCADV(model_name="gpt-4o", downstream_task_description=SQL_QUERY_TASK_DESCRIPTION,
                   assumption_generation_trick='with_deequ', logger=logger)

relevant_columns_list, expectations, suggestions = lc.invoke(
    input_variables={"column_desc": column_desc, "script": context, "deequ_assumptions": deequ_assumptions},
    num_stages=3,
    max_retries=3
)
code_list_for_constraints = [item for v in suggestions.values() for item in v]

# Validate the constraints on the original data to see if they are grammarly correct
code_list_for_constraints_valid = dq_manager.filter_constraints(code_list_for_constraints, spark_test,
                                                                spark_test_data)
constraints = Constraints.from_llm_output(relevant_columns_list, expectations, suggestions,
                                          code_list_for_constraints_valid)

print(yaml.dump(constraints.to_dict()))

constraints:
  bloodtype:
    code:
    - - .isContainedIn('bloodtype', ['AB negative', 'B negative'])
      - Invalid
    assumptions:
    - Should potentially include rare blood types like 'AB negative' and 'B negative'
  diagnosis:
    code:
    - - .isContainedIn('diagnosis', ['stroke'])
      - Invalid
    assumptions:
    - Must contain medical diagnoses such as 'stroke'
  gender:
    code:
    - - .isComplete('gender')
      - Invalid
    assumptions:
    - Should be complete for all records
  race:
    code:
    - - .isComplete('race')
      - Valid
    assumptions:
    - Should be complete for all records
  ssn:
    code:
    - - .isComplete('ssn')
      - Valid
    assumptions:
    - Should be complete for all records



In [12]:
validate_on_test_data(constraints, test_data)

Unnamed: 0,column_name,constraint_status,constraint_message
0,".isContainedIn('diagnosis', ['stroke'])",Failure,Value: 0.6666666666666666 does not meet the co...
1,".isContainedIn('bloodtype', ['AB negative', 'B...",Failure,Value: 0.0 does not meet the constraint requir...
2,.isComplete('ssn'),Success,
3,.isComplete('gender'),Failure,Value: 0.6666666666666666 does not meet the co...
4,.isComplete('race'),Success,


## run LLM on toy model with with_experience trick

In [13]:
lc = LangChainCADV(model_name="gpt-4o", downstream_task_description=SQL_QUERY_TASK_DESCRIPTION,
                   assumption_generation_trick="with_experience", logger=logger)

relevant_columns_list, expectations, suggestions = lc.invoke(
    input_variables={"column_desc": column_desc, "script": context},
    num_stages=3,
    max_retries=3
)
code_list_for_constraints = [item for v in suggestions.values() for item in v]

# Validate the constraints on the original data to see if they are grammarly correct
code_list_for_constraints_valid = dq_manager.filter_constraints(code_list_for_constraints, spark_test,
                                                                spark_test_data)
constraints = Constraints.from_llm_output(relevant_columns_list, expectations, suggestions,
                                          code_list_for_constraints_valid)

print(yaml.dump(constraints.to_dict()))

constraints:
  bloodtype:
    code:
    - - .isComplete('bloodtype')
      - Valid
    - - .isContainedIn('bloodtype', ['O pos', 'A pos', 'AB negative', 'B negative'])
      - Invalid
    assumptions:
    - Value should be within ['O pos', 'A pos', 'AB negative', 'B negative'].
    - Should not be NULL.
  diagnosis:
    code:
    - - .isComplete('diagnosis')
      - Invalid
    - - .isContainedIn('diagnosis', ['cancer', 'cough', 'fraction', 'stroke'])
      - Invalid
    assumptions:
    - Value should be within ['cancer', 'cough', 'fraction', 'stroke'].
    - Should not be NULL.



In [14]:
validate_on_test_data(constraints, test_data)

Unnamed: 0,column_name,constraint_status,constraint_message
0,".isContainedIn('diagnosis', ['cancer', 'cough'...",Failure,Value: 0.6666666666666666 does not meet the co...
1,.isComplete('diagnosis'),Failure,Value: 0.6666666666666666 does not meet the co...
2,".isContainedIn('bloodtype', ['O pos', 'A pos',...",Failure,Value: 0.6666666666666666 does not meet the co...
3,.isComplete('bloodtype'),Success,


# Downstream task 2

## prepare context for LLM

In [15]:
column_desc = DeequInspectorManager().spark_df_to_column_desc(spark_train_data, spark_train)
context = """
df = pd.read_csv("s3://datalake/latest/hospitalisations.csv") 
df['cost_smoothed'] = np.log(df['cost'])
df['admission_day'].fillna(df['discharge_day'])
df['duration'] = df['discharge_day'] - df['admission_day']
categorical_cols = ['diagnosis', 'insurance']
for col in categorical_cols:
 df[col] = pd.get_dummies(df[col], dummy_na=True)
features = df[categorical_cols + ['duration', 'cost_smoothed']]
labels = label_binarize(df['complications'], classes=['Y', 'N'])
model = sklearn.tree.DecisionTreeClassifier()
model.fit(train_features, train_labels)
deploy_to_production(model)
"""

## run LLM on toy model with default settings

In [16]:
lc = LangChainCADV(model_name="gpt-4o", downstream_task_description=ML_INFERENCE_TASK_DESCRIPTION,
                   assumption_generation_trick=None, logger=logger)

relevant_columns_list, expectations, suggestions = lc.invoke(
    input_variables={"column_desc": column_desc, "script": context},
    num_stages=3,
    max_retries=3
)
code_list_for_constraints = [item for v in suggestions.values() for item in v]

# Validate the constraints on the original data to see if they are grammarly correct
code_list_for_constraints_valid = dq_manager.filter_constraints(code_list_for_constraints, spark_test,
                                                                spark_test_data)
constraints = Constraints.from_llm_output(relevant_columns_list, expectations, suggestions,
                                          code_list_for_constraints_valid)

print(yaml.dump(constraints.to_dict()))

constraints:
  admission_day:
    code:
    - - .isComplete('admission_day')
      - Invalid
    assumptions:
    - The 'admission_day' column should contain non-null values, as missing values
      are filled with 'discharge_day' which assumes 'admission_day' can be null.
  complications:
    code:
    - - .isContainedIn('complications', ['Y', 'N'])
      - Valid
    assumptions:
    - The 'complications' column should only contain the values 'Y' or 'N', as it
      is being binarized with these classes.
  cost:
    code:
    - - .isPositive('cost')
      - Invalid
    assumptions:
    - The 'cost' column should contain positive values, as taking the logarithm of
      non-positive numbers is undefined.
  diagnosis:
    code:
    - - '.hasApproxCountDistinct(''diagnosis'', lambda x: x < 100)'
      - Valid
    assumptions:
    - The 'diagnosis' column is expected to be categorical with a limited set of distinct
      values, as it is used in one-hot encoding.
  discharge_day:
    code

In [17]:
validate_on_test_data(constraints, test_data)

Unnamed: 0,column_name,constraint_status,constraint_message
0,.isPositive('cost'),Failure,Value: 0.6666666666666666 does not meet the co...
1,.isComplete('admission_day'),Failure,Value: 0.6666666666666666 does not meet the co...
2,.isComplete('discharge_day'),Success,
3,".hasApproxCountDistinct('diagnosis', lambda x:...",Success,
4,".hasApproxCountDistinct('insurance', lambda x:...",Success,
5,".isContainedIn('complications', ['Y', 'N'])",Success,


## run LLM on toy model with with_deequ trick

It will add, delete and modify the constraints generated by deequ

In [18]:
deequ_assumptions = dq_manager.get_constraints_for_spark_df(spark_train, spark_train_data).to_string()
lc = LangChainCADV(model_name="gpt-4o", downstream_task_description=ML_INFERENCE_TASK_DESCRIPTION,
                   assumption_generation_trick='with_deequ', logger=logger)

relevant_columns_list, expectations, suggestions = lc.invoke(
    input_variables={"column_desc": column_desc, "script": context, "deequ_assumptions": deequ_assumptions},
    num_stages=3,
    max_retries=3
)
code_list_for_constraints = [item for v in suggestions.values() for item in v]

# Validate the constraints on the original data to see if they are grammarly correct
code_list_for_constraints_valid = dq_manager.filter_constraints(code_list_for_constraints, spark_test,
                                                                spark_test_data)
constraints = Constraints.from_llm_output(relevant_columns_list, expectations, suggestions,
                                          code_list_for_constraints_valid)

print(yaml.dump(constraints.to_dict()))

constraints:
  admission_day:
    code:
    - - .isComplete('admission_day')
      - Invalid
    - - .isNonNegative('admission_day')
      - Valid
    assumptions:
    - The 'admission_day' column should be complete with no missing values.
    - The 'admission_day' column should have non-negative values indicating valid
      admission days.
  complications:
    code:
    - - .isComplete('complications')
      - Valid
    - - .isContainedIn('complications', ['Y', 'N'])
      - Valid
    assumptions:
    - The 'complications' column should be complete with no missing values.
    - 'The ''complications'' column should only contain values from the expected set:
      [''Y'', ''N''] for proper label binarization.'
  cost:
    code:
    - - .isComplete('cost')
      - Valid
    - - .isNonNegative('cost')
      - Valid
    assumptions:
    - The 'cost' column should be complete with no missing values.
    - The 'cost' column should have non-negative values to ensure a valid log transformatio

In [19]:
validate_on_test_data(constraints, test_data)

Unnamed: 0,column_name,constraint_status,constraint_message
0,.isComplete('cost'),Success,
1,.isNonNegative('cost'),Success,
2,.isComplete('admission_day'),Failure,Value: 0.6666666666666666 does not meet the co...
3,.isNonNegative('admission_day'),Success,
4,.isComplete('discharge_day'),Success,
5,.isNonNegative('discharge_day'),Success,
6,".isGreaterThanOrEqualTo('discharge_day', 'admi...",Failure,Value: 0.6666666666666666 does not meet the co...
7,.isComplete('diagnosis'),Failure,Value: 0.6666666666666666 does not meet the co...
8,".isContainedIn('diagnosis', ['cancer', 'cough'...",Failure,Value: 0.3333333333333333 does not meet the co...
9,.isComplete('insurance'),Success,


## run LLM on toy model with with_experience trick

In [20]:
lc = LangChainCADV(model_name="gpt-4o", downstream_task_description=ML_INFERENCE_TASK_DESCRIPTION,
                   assumption_generation_trick="with_experience", logger=logger)

relevant_columns_list, expectations, suggestions = lc.invoke(
    input_variables={"column_desc": column_desc, "script": context},
    num_stages=3,
    max_retries=3
)
code_list_for_constraints = [item for v in suggestions.values() for item in v]

# Validate the constraints on the original data to see if they are grammarly correct
code_list_for_constraints_valid = dq_manager.filter_constraints(code_list_for_constraints, spark_test,
                                                                spark_test_data)
constraints = Constraints.from_llm_output(relevant_columns_list, expectations, suggestions,
                                          code_list_for_constraints_valid)

print(yaml.dump(constraints.to_dict()))

constraints:
  admission_day:
    code:
    - - '.hasMax(''admission_day'', lambda x: x <= 20)'
      - Invalid
    - - '.hasMean(''admission_day'', lambda x: x > 0)'
      - Valid
    - - '.hasMin(''admission_day'', lambda x: x >= 10)'
      - Valid
    - - '.hasStandardDeviation(''admission_day'', lambda x: x > 0)'
      - Valid
    - - .isNonNegative('admission_day')
      - Valid
    assumptions:
    - The column 'admission_day' should be non-negative.
    - The column 'admission_day' should be numeric and is expected to have a mean
      and standard deviation.
    - The column 'admission_day' should have a minimum value constraint of 10.
    - The column 'admission_day' should have a maximum value constraint of 20.
  complications:
    code:
    - - .isContainedIn('complications', ['Y', 'N'])
      - Valid
    assumptions:
    - The column 'complications' should have a value within the set {'Y', 'N'} based
      on its use in label binarization.
  cost:
    code:
    - - '.hasMax

In [21]:
validate_on_test_data(constraints, test_data)

Unnamed: 0,column_name,constraint_status,constraint_message
0,".isContainedIn('diagnosis', ['cancer', 'cough'...",Failure,Value: 0.3333333333333333 does not meet the co...
1,".isContainedIn('insurance', ['CG', 'UHG', 'KP'])",Success,
2,.isNonNegative('cost'),Success,
3,".hasMean('cost', lambda x: x > 0)",Success,
4,".hasStandardDeviation('cost', lambda x: x > 0)",Success,
5,".hasMin('cost', lambda x: x >= 10)",Failure,Value: 0.0 does not meet the constraint requir...
6,".hasMax('cost', lambda x: x <= 10000)",Success,
7,.isNonNegative('admission_day'),Success,
8,".hasMean('admission_day', lambda x: x > 0)",Success,
9,".hasStandardDeviation('admission_day', lambda ...",Success,


## stop spark

In [22]:
spark_train.sparkContext._gateway.shutdown_callback_server()
spark_train.stop()
spark_test.sparkContext._gateway.shutdown_callback_server()
spark_test.stop()