In [1]:
from DQA import metadata_builder
import pandas as pd

In [2]:
help(metadata_builder)

Help on module DQA.metadata_builder in DQA:

NAME
    DQA.metadata_builder

CLASSES
    builtins.object
        RSIMetadataBuilder

    class RSIMetadataBuilder(builtins.object)
     |  RSIMetadataBuilder(file_name, api_key, meta_path, model_name='openai/gpt-4o', max_retries=3, retry_delay=2)
     |
     |  Methods defined here:
     |
     |  __init__(self, file_name, api_key, meta_path, model_name='openai/gpt-4o', max_retries=3, retry_delay=2)
     |      Parameters
     |      ----------
     |      file_name : str
     |          name of the data file (without extension) to build metadata for
     |      api_key : str
     |          LLM API key
     |      model_name : str
     |          Model to use (e.g., "openai/gpt-4o")
     |      meta_path : str
     |          Path to knowledge base file (RSI memory)
     |      max_retries : int
     |          Number of retries for API call failures
     |      retry_delay : int
     |          Seconds to wait between retries
     |
    

In [3]:
filename = 'employees'
data_path = 'data/' + f'{filename}.csv'
df = pd.read_csv(data_path)
df.iloc[:5]

Unnamed: 0,Emp_ID,Name,Gender,Age,City,Department,Salary,Email
0,1,Tariq,Male,47.0,Karachi,Sales,123609.72,tariq76@gmail.com
1,2,Hina,Female,26.0,Quetta,HR,125683.94,hina90@gmail.com
2,3,Usman,Male,36.0,Lahore,Sales,,usman71@gmail.com
3,4,Sana,Female,21.0,Quetta,Finance,147901.41,sana72@gmail.com
4,5,Ayesha,Male,40.0,lahore,HR,123880.8,ayesha25@gmail.com


In [None]:
api_key = "--"

builder = metadata_builder.RSIMetadataBuilder(api_key=api_key, file_name=filename, 
                                              model_name="openai/gpt-4o", \
                                                meta_path=f"config/metadata/")
first_order = builder.build_first_order_metadata(df)

In [None]:

enriched = builder.ai_augment_metadata(first_order)
builder.save_metadata(enriched)

#Example of manual feedback
# builder.update_feedback("amt", {
    # "semantic_type": "currency",
    # "possible_meaning": "Transaction amount in AUD",
    # "expected_format": "float",
    # "potential_issues": "Missing or negative values"
# })
# 

In [None]:
pip install pyarrow

In [4]:
import dask.dataframe as dd
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime
import re

def load_data_from_csv(file_path):
    return dd.read_csv(file_path)

def load_data_from_sql(table_name, connection_string, index_col):
    """
    Load table from database using SQLAlchemy + Dask
    """
    engine = create_engine(connection_string)
    # Dask reads SQL tables in parallel for big data
    ddf = dd.read_sql_table(table_name, con=engine, index_col=index_col)
    return ddf


#===================================================================


In [8]:
load_data_from_csv(data_path).head()

Unnamed: 0,Emp_ID,Name,Gender,Age,City,Department,Salary,Email
0,1,Tariq,Male,47.0,Karachi,Sales,123609.72,tariq76@gmail.com
1,2,Hina,Female,26.0,Quetta,HR,125683.94,hina90@gmail.com
2,3,Usman,Male,36.0,Lahore,Sales,,usman71@gmail.com
3,4,Sana,Female,21.0,Quetta,Finance,147901.41,sana72@gmail.com
4,5,Ayesha,Male,40.0,lahore,HR,123880.8,ayesha25@gmail.com


In [None]:
def check_not_null(ddf, column):
    missing = ddf[column].isnull().sum().compute()
    return missing

def check_unique(ddf, column):
    # Count duplicates manually
    counts = ddf.groupby(column)[column].count()
    duplicates = counts[counts > 1].sum().compute()
    return duplicates

def check_range(ddf, column, min_val, max_val):
    invalid = ddf[(ddf[column] < min_val) | (ddf[column] > max_val)][column].count().compute()
    return invalid

def check_regex(ddf, column, pattern):
    invalid = ddf[~ddf[column].str.match(pattern, na=False)][column].count().compute()
    return invalid

def run_validations(ddf, table_name, table_rules, rules_config):
    results = []
    for column, checks in table_rules.items():
        for check in checks:
            if check == "not_null":
                res = check_not_null(ddf, column)
                results.append((column, check, res))
            elif check == "unique":
                res = check_unique(ddf, column)
                results.append((column, check, res))
            elif check == "range":
                min_val, max_val = rules_config["ranges"][column]
                res = check_range(ddf, column, min_val, max_val)
                results.append((column, check, res))
            elif check == "regex":
                pattern = rules_config["regex_patterns"][column]
                res = check_regex(ddf, column, pattern)
                results.append((column, check, res))
    return results

def generate_report(results, table_name):
    df = pd.DataFrame(results, columns=["Column", "Check", "Failed_Count"])
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"reports/{table_name}_data_quality_{timestamp}.csv"
    df.to_csv(filename, index=False)
    print(f"Report saved to {filename}")
