In [1]:
import pandas as pd
from sqlalchemy import create_engine
from snowflake.connector import connect
import data_diff
import logging

In [2]:
sflk_user = "SAPI_WORKSPACE_971362817"
sflk_password = "{{PWD FOR WORKSPACE}}"
sflk_account = "owb79125.us-east-1"

sflk_database = "SAPI_9609"
sflk_schema = "WORKSPACE_971362817"
sflk_warehouse = "KEBOOLA"

keboola_project_id = "SAPI_9609"


## Connection Tests

In [3]:
def test_conn():
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/{database_name}/{schema_name}?warehouse={warehouse_name}&role={role_name}'.format(
            user=sflk_user,
            password=sflk_password,
            account_identifier=sflk_account,
            database_name=sflk_database,
            schema_name=sflk_schema,
            warehouse_name=sflk_warehouse,
            role_name=sflk_user
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()

In [4]:
def get_table_names(branch_id: str, prod_table_name: str) -> tuple:
    """
    String operator - handling Keboola storage naming convention
    Returns the table names for prod and dev environments based on the given branch ID.
    """

    table_name_tmp = prod_table_name

    # figuring out and removing the storage stage for now
    keboola_storage_ids = ["in.c-", "out.c-"]
    storage_stage = ""
    for word in keboola_storage_ids:
        if word in table_name_tmp:
            #print(f"Found '{word}' in '{table_name}'")
            storage_stage = word
            table_name_tmp = table_name_tmp.replace(word, "")
        else:
            pass
    
    #print(storage_stage)

    #replace comment, we will put them back
    table_name_tmp = table_name_tmp.replace('"', '')
    #print(table_name_tmp)
    
    # Split the table name into schema and table parts
    schema_name, table_name = table_name_tmp.split(".")
    #print(storage_stage, schema_name, table_name)

    # Prod Schema
    prod_schema_name = f'"{storage_stage}{schema_name}"'
    #print(prod_schema_name)
    
    # Construct the dev environment schema name by appending the branch ID to the prod schema name
    dev_schema_name = f'"{storage_stage}{branch_id}-{schema_name}"'
    
    # Construct the dev environment table name by appending the branch ID to the prod table name
    dev_table_name = f'{dev_schema_name}."{table_name}"'
    
    #"""
    logging.info("Dev table id: {}".format(dev_table_name))
    logging.info("Prod table id: {}".format(prod_table_name))
    logging.info("Dev schema name: {}".format(dev_schema_name))
    logging.info("Prod schema name: {}".format(prod_schema_name))
    print("Dev table id: {}".format(dev_table_name))
    print("Prod table id:", prod_table_name)
    print("Dev schema name:", dev_schema_name)
    print("Prod schema name:", prod_schema_name)
    #"""

    # Return both table names as a tuple
    return (dev_table_name, prod_table_name, dev_schema_name, prod_schema_name)


def check_columns(table_name_prod, schema_name_prod):
    """
    Connect to Snowflake and get column list
    By loading data sample in pandas
    # https://docs.snowflake.com/developer-guide/python-connector/python-connector-pandas
    """

    sflk_dataset = table_name_prod.split(".")[2]
    sflk_dataset = sflk_dataset.replace('"','')



    conn = connect(
            user=sflk_user,
            password=sflk_password,
            account=sflk_account
        )

    dataset = '"{database_name}"."{schema_name}"."{dataset_name}"'.format(
            database_name=sflk_database,
            schema_name=schema_name_prod,
            dataset_name=sflk_dataset
        )


    #sql = 'SELECT * FROM {} LIMIT 10'.format(dataset)
    sql = 'DESCRIBE TABLE {}'.format(dataset)

    logging.info("Executing Query: {}".format(sql))
    #print("Executing Query: {}".format(sql))

    # Execute SQL
    cursor = conn.cursor()
    cursor.execute(sql)

    # Convert output to a dataframe
    df = pd.DataFrame(cursor.fetchall(), columns=['name', 'type', 'kind', 'null', 'default', 'primary key', 'unique key', 'check', 'expression', 'comment', 'policy name'])

    #df = cursor.fetch_pandas_all()
    cursor.close()

    #col_list = df.columns.tolist()

    return df


# Start

In [5]:
test_conn()

  results = connection.execute('select current_version()').fetchone()


7.15.0


In [6]:
branch_id = "719410"
table_name_prod = '"in.c-data-diff-prep"."orders"'


## Data Diff

In [8]:
## Table Prep

dev_table_id, prod_table_id, dev_schema_name, prod_schema_name = get_table_names(branch_id, table_name_prod)

## grab table info from describe
schema_name_prod = prod_schema_name.replace('"','')
table_describe = check_columns(table_name_prod, schema_name_prod)
schema_name_dev = dev_schema_name.replace('"','')

#print(table_describe)

## column list
col_list = table_describe.iloc[:, 0].tolist()
logging.info("Available columns: {}".format(col_list))
print("Available columns: {}".format(col_list))

# Filter rows containing "TIMESTAMP" in the "type" column
ts_filtered = table_describe[table_describe["type"].str.contains("TIMESTAMP")]
ts_filtered = ts_filtered.iloc[:, 0].tolist()
logging.info("Timestamp columns: {}".format(ts_filtered))
print("Timestamp columns: {}".format(ts_filtered))

# Filter rows identified as Primary Key in the "type" column
pk_filtered = table_describe[table_describe["primary key"].str.contains("Y")]
pk_filtered = pk_filtered.iloc[:, 0].tolist()
logging.info("Primary Key columns: {}".format(pk_filtered))
print("Primary Key columns: {}".format(pk_filtered))

# Lets figure out leftover columns
# all - pks
other_col = list(set(col_list).difference(set(pk_filtered)))
# diff - timestamp
if "_timestamp" in col_list:
    diff_col = list(set(other_col).difference(set(["_timestamp"])))
else:
    print("Column '_timestamp' has not been found in the dataset. Has the table been created within Keboola platform?")
    logging.warning("Column '_timestamp' has not been found in the dataset. Has the table been created within Keboola platform?")
    diff_col = other_col
logging.info("Other columns: {}".format(diff_col))
print("Other columns: {}".format(diff_col))

# plain table name
table_name = table_name_prod.split(".")[2]
table_name = table_name.replace('"','')
logging.info("Table name: {}".format(table_name))
print("Table name: {}".format(table_name))

# This is a result checking columns in the previous step, basically list of table columns except "_timestamp" and PKs
sflk_key_columns = tuple(pk_filtered)
sflk_dataset_columns = tuple(diff_col)
# ('order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date','order_delivered_customer_date', 'order_estimated_delivery_date')


## Forming Data Diff conn directories
table_source_dict = {
    "driver": "snowflake",
    "user": sflk_user,
    "password": sflk_password,
    "account": sflk_account,
    "role": sflk_user,
    "warehouse": sflk_warehouse,
    "database": keboola_project_id,
    "schema": schema_name_prod
}
table_source_name = table_name

table_destination_dict = {
    "driver": "snowflake",
    "user": sflk_user,
    "password": sflk_password,
    "account": sflk_account,
    "role": sflk_user,
    "warehouse": sflk_warehouse,
    "database": keboola_project_id,
    "schema": schema_name_dev
}

table_destination_name = table_name

table_source = data_diff.connect_to_table(
    table_source_dict
    ,table_source_name
    ,key_columns=sflk_key_columns
    ,update_column="_timestamp"
    ,extra_columns=sflk_dataset_columns
    )
table_destination=data_diff.connect_to_table(
    table_destination_dict
    ,table_destination_name
    ,key_columns=sflk_key_columns
    ,update_column="_timestamp"
    ,extra_columns=sflk_dataset_columns
    )

#print("----------------------")
#print("Using data:")
#print(table_source, "\n", table_destination)
logging.info("Using data:")
logging.debug(table_source)
logging.debug(table_destination)


Dev table id: "in.c-719410-data-diff-prep"."orders"
Prod table id: "in.c-data-diff-prep"."orders"
Dev schema name: "in.c-719410-data-diff-prep"
Prod schema name: "in.c-data-diff-prep"
Available columns: ['order_id', 'customer_id', 'order_status', 'order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date', 'order_delivered_customer_date', 'order_estimated_delivery_date', '_timestamp']
Timestamp columns: ['_timestamp']
Primary Key columns: ['order_id', 'customer_id']
Other columns: ['order_delivered_customer_date', 'order_purchase_timestamp', 'order_status', 'order_approved_at', 'order_estimated_delivery_date', 'order_delivered_carrier_date']
Table name: orders


In [9]:
## Lets diff
#print("----------------------")
#print("Running DIFF")
logging.info("Running DIFF")
diff_result = data_diff.diff_tables(table_source, table_destination, threaded=True, max_threadpool_size=6)
diff_list = list(diff_result)



In [10]:
print("----------------------")
print("printing diff sample:")
for a in diff_list[:8]:
    print(a)

print("----------------------")
diff_df = pd.DataFrame(diff_list)
print("Number of observed differences: {}".format(len(diff_df)))
print("----------------------")
# Get unique value counts as a DataFrame
value_counts = pd.DataFrame(diff_df[0].value_counts()).reset_index()
# Rename columns
value_counts.columns = ["observation", "count"]
# Print the resulting DataFrame
print("General Stats:")
print(value_counts)
print("----------------------")
print("Relevant columns:")
print(table_source.relevant_columns)
print("----------------------")

----------------------
printing diff sample:
('-', ('00042b26cf59d7ce69dfabb4e55b4fd9', '58dbd0b2d70206bf40e62cd34e84d795', '2023-04-25 21:01:14.000000', '2017-03-01 16:42:31.000', '2017-02-04 13:57:51.000', 'delivered', '2017-02-04 14:10:13.000', '2017-03-17 00:00:00.000', '2017-02-16 09:46:09.000'))
('-', ('000e906b789b55f64edcb1f84030f90d', '6a3b2fc9f270df258605e22bef19fd88', '2023-04-25 21:01:14.000000', '2017-12-09 17:27:23.000', '2017-11-21 18:54:23.000', 'delivered', '2017-11-21 19:09:02.000', '2017-12-07 00:00:00.000', '2017-11-22 20:46:54.000'))
('-', ('0010b2e5201cc5f1ae7e9c6cc8f5bd00', '57ef317d4818cb42680fc9dfd13867ce', '2023-04-25 21:01:14.000000', '2017-09-23 13:21:21.000', '2017-09-11 17:39:33.000', 'delivered', '2017-09-11 18:04:37.000', '2017-09-27 00:00:00.000', '2017-09-12 17:02:57.000'))
('-', ('00119ff934e539cf26f92b9ef0cdfed8', '7dd2e283f47deac853cf70f3b63c8d86', '2023-04-25 21:01:14.000000', '2017-08-16 17:29:59.000', '2017-08-06 00:42:49.000', 'delivered', '2017