In [1]:
from sqlalchemy import create_engine
# local machine db cdi_laboratorio / user bruno / pass bruno

db = 'cdi_laboratorio'
user = 'bruno'
password = 'bruno'

db_uri = f'postgresql://{user}:{password}@localhost:5432/{db}'
engine = create_engine(db_uri)

In [2]:
def check_stored_procedure_exists(method_code, granularity='column'):
    """
    Check if a stored procedure exists in the database.
    :param method_code: Database code for the applied method.
    :param granularity: Options are 'cell', 'column', 'table', 'dataset'.
    :return: 
    """
    
    with engine.connect() as connection:
        # Prepare the SQL command to check if the stored procedure exists
        sql_command = text("""
            SELECT EXISTS (
                SELECT 1
                FROM pg_proc
                WHERE proname = :method_code || '_' || :type
            )
        """)
        
        # Execute the SQL command with the provided method code
        result = connection.execute(sql_command, {"method_code": method_code.lower(), "type": granularity})
        
        # Return whether the stored procedure exists
        return result.scalar()

        

In [3]:
def execute_stored_procedure_column(method_code, table_name, column_name):
    with engine.connect() as connection:
        # Check stored procedure existence
        if check_stored_procedure_exists(method_code, granularity='column'):
            # Prepare the SQL command to call the stored procedure
            sql_command = text(f"SELECT {method_code}_column(:table_name, :column_name)")
            
            # Execute the stored procedure with the provided parameters
            result = connection.execute(sql_command, {"table_name": table_name, "column_name": column_name})
            
            # If the stored procedure returns a result, fetch it
            if result.returns_rows:
                return result.fetchall()
            else:
                return None
        return None

In [4]:
def execute_stored_procedure_cell(method_code, pk_name, table_name, column_name, page_size=1000):
    """
    Executes a stored procedure for cell-level data quality checks.
    :param method_code: 
    :param pk_name: 
    :param table_name: 
    :param column_name: 
    :param page: Page number for pagination
    :param page_size: Number of rows per page
    :return: A generator yielding results from the stored procedure.
    """
    page = 0
    with engine.connect() as connection:
        if check_stored_procedure_exists(method_code, granularity='cell'):
            sql_command = text(f"""
                SELECT * from {method_code}_cell(:pk_name, :table_name, :column_name) LIMIT :page_size OFFSET :page * :page_size
            """)
            
            while True:
                result = connection.execute(sql_command, {
                    "pk_name": pk_name,
                    "table_name": table_name,
                    "column_name": column_name,
                    "page": page,
                    "page_size": page_size
                })
                
                if result.rowcount == 0:
                    break
                
                if result.returns_rows:
                    page += 1
                    yield result.fetchall()
                else:
                    break

In [5]:
def create_report():
    """ 
    Creates a measurement report and returns its db id 
    """
    with engine.connect() as connection:
        sql_command = text("""
            INSERT INTO meta_dq_measurement_report default values 
            RETURNING id
        """)
        
        result = connection.execute(sql_command)
        report_id = result.fetchone()[0]
        connection.commit()
        return report_id

In [6]:
def create_results(results, report_id):
    """
    Inserts results in the meta_dq_results table
    :param results: list of tuples (method_code, granularity, table_name, table_schema, column_name, result_value)
    :return: 
    """
    with engine.connect() as connection:
        for applied_method_id, granularity, table_name, table_schema, column_name, pk_id, result in results:
            # Prepare the SQL command to insert the results
            sql_command = text("""
                INSERT INTO meta_dq_results (report_id, applied_method_id, granularity, table_name, table_schema, column_name, row_id, result)
                VALUES (:report_id, :applied_method_id, :granularity, :table_name, :table_schema, :column_name, :pk_id, :result)
            """)
            
            # Execute the SQL command with the provided parameters
            connection.execute(sql_command, {
                "report_id": report_id,
                "applied_method_id": applied_method_id,
                "granularity": granularity,
                "table_name": table_name,
                "table_schema": table_schema,
                "column_name": column_name,
                "result": result,
                "pk_id": pk_id
            })
        connection.commit()  # Commit the transaction to save changes
    
    
    

In [7]:
from sqlalchemy import text

def get_methods_to_apply(granularity='column'):
    with engine.connect() as connection:
        # Prepare the SQL command to retrieve applied methods
        sql_command = text("""
            SELECT am.id applied_method_id, am.code code, rel.table_name, rel.table_schema, rel.column_name
            FROM meta_dq_applied_method am
            JOIN meta_dq_applied_method_applied_to_rel rel ON am.id = rel.applied_method_id
            WHERE rel.granularity = :granularity
        """)
        
        
        # Execute the SQL command
        result = connection.execute(sql_command, {"granularity": granularity})
        
        # Fetch all results and return as a list of dictionaries
        return result.fetchall()

In [8]:
import uuid

report_id = create_report()

for applied_method_id, code, table, table_schema, column in get_methods_to_apply(granularity='column'):
    result = execute_stored_procedure_column(code, table, column)
    result = result[0][0] if result else None  # Extract the first value if result is not None
    if result:
        create_results([
            (applied_method_id, 'column', table, table_schema, column, None, result)
        ], report_id)
        print(f"Processed {code} on {table}.{column} with result: {result}")

    
for applied_method_id, code, table, table_schema, column in get_methods_to_apply(granularity='cell'):
    page_size = 500000
    for result in execute_stored_procedure_cell(code, '_id', table, column, page_size=page_size):
        values = [[pk_id, float(val)] for pk_id, val in result]
        create_results([
            (applied_method_id, 'cell', table, table_schema, column, pk_id, val) for pk_id, val in values
        ], report_id)
        print(f"Processed {len(values)} rows for {code} on {table}.{column}")
    
    


Processed MET_AP_1 on books.isbn with result: 0.560934670624519
Processed MET_AP_1 on users.id with result: 0.999999223744115
Processed MET_AP_1 on users.age with result: 0.130485509243267
Processed MET_AP_5 on users.location with result: 0.212878861387868
Processed MET_AP_4 on books.isbn with result: 0.559841162219595
Processed MET_AP_3 on books.published_date with result: 0.175302420188356
Processed MET_AP_2 on books.authors with result: 0.00241440041011733
Processed MET_AP_2 on books.categories with result: 0.00161442356190208
Processed MET_AP_7 on books.rating_count with result: 2.0671236387990837e-06
Processed MET_AP_9 on users.name with result: 0.7639988045659372
Processed MET_AP_8 on users.age with result: 0.13010203883608193
Processed MET_AP_10 on books.isbn with result: 0.5609346706245194
