In [13]:
import duckdb
import pandas as pd
import os


In [None]:
import os

def save_sql_to_file(sql_string: str, script_name: str):
    """
    Creates a storage folder (if it doesn't exist) and writes the given SQL string
    to a .sql file within that folder. If a file with the same base name exists,
    it prepends an increasing number (1, 2, 3, ...) to the filename.

    Args:
        sql_string: The SQL query string to be saved.
        script_name: The base name of the script (used for the filename).
    """
    folder_name = "C:\\lopu-kg-test\\project\\src\\main\\sql_for_pipelines"

    # Create the storage folder if it doesn't exist
    if not os.path.exists(folder_name):
        try:
            os.makedirs(folder_name)
            print(f"Folder '{folder_name}' created successfully.")
        except OSError as e:
            print(f"Error creating folder '{folder_name}': {e}")
            return

    base_filename = f"{script_name}.sql"
    counter = 1
    file_path = os.path.join(folder_name, f"{counter}_{base_filename}")

    while os.path.exists(file_path):
        counter += 1
        file_path = os.path.join(folder_name, f"{counter}_{base_filename}")

    # Write the SQL string to the file
    try:
        with open(file_path, "w") as f:
            f.write(sql_string)
        print(f"SQL query successfully written to '{file_path}'.")
    except IOError as e:
        print(f"Error writing to file '{file_path}': {e}")



SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_my_script.sql'.
SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\2_my_script.sql'.
SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\3_my_script.sql'.


In [1]:
import duckdb

def get_duckdb_query_stats(conn: duckdb.DuckDBPyConnection, query: str) -> dict:
    """Executes a DuckDB query and returns its statistics."""
    try:
        conn.execute(query)
        #Get the latest query execution info.
        result = conn.execute("SELECT rows, execution_time FROM duckdb_queries ORDER BY finished DESC LIMIT 1").fetchone()
        if result:
            rows, execution_time = result
            return {"rows": rows, "execution_time": execution_time}
        else:
            return {"error": "Could not retrieve query statistics."}

    except duckdb.Error as e:
        return {"error": str(e)}

def get_duckdb_copy_stats(conn: duckdb.DuckDBPyConnection, copy_command: str) -> dict:
    """Executes a DuckDB COPY command and returns its statistics."""
    try:
        conn.execute(copy_command)
        #Get the latest copy command execution info.
        result = conn.execute("SELECT rows, bytes_written, execution_time FROM duckdb_queries ORDER BY finished DESC LIMIT 1").fetchone()
        if result:
            rows, bytes_written, execution_time = result
            return {"rows": rows, "bytes_written": bytes_written, "execution_time": execution_time}
        else:
            return {"error": "Could not retrieve copy statistics."}

    except duckdb.Error as e:
        return {"error": str(e)}


In [15]:
import xml.etree.ElementTree as ET
import pandas as pd
import os

def xml_to_dataframe(xml_file_path):
    """
    Parses an XML file and converts it into a pandas DataFrame.

    Args:
        xml_file_path (str): The path to the XML file.

    Returns:
        pandas.DataFrame: The DataFrame containing the XML data, or None if an error occurs.
    """
    try:
        tree = ET.parse(xml_file_path)
        root = tree.getroot()

        data = []
        for action in root:
            action_data = {}
            # Extract Action attributes
            for key, value in action.attrib.items():
                action_data["Action." + key.split("}")[-1]] = value

            for element in action:
                for key, column0 in element.attrib.items():
                    action_data["Customer." + key.split("}")[-1]] = column0 # value, oli siin enne 04.04 muudatus
                if len(element) == 0:  # Simple element
                    action_data[element.tag.split("}")[-1]] = element.text
                else:  # Nested element
                    for sub_element in element:
                        for key, value in sub_element.attrib.items():
                            action_data["Account." + key.split("}")[-1]] = value
                        if len(sub_element) == 0:
                            action_data[element.tag.split("}")[-1] + "." + sub_element.tag.split("}")[-1]] = sub_element.text
                        else:
                            for sub_sub_element in sub_element:
                                if len(sub_sub_element) == 0:
                                    action_data[element.tag.split("}")[-1] + "." + sub_element.tag.split("}")[-1] + "." + sub_sub_element.tag.split("}")[-1]] = sub_sub_element.text
                                else:
                                    for sub_sub_sub_element in sub_sub_element:
                                        action_data[element.tag.split("}")[-1] + "." + sub_element.tag.split("}")[-1] + "." + sub_sub_element.tag.split("}")[-1] + "." + sub_sub_sub_element.tag.split("}")[-1]] = sub_sub_sub_element.text
            data.append(action_data)

        df = pd.DataFrame(data)
        return df

    except Exception as e:
        print(f"Error processing XML file: {e}")
        return None

path_to_xml = "C:\lopu-kg-test\project\src\data\Batch1\CustomerMgmt.xml"

xml_dataframe = xml_to_dataframe(path_to_xml)



In [16]:
def rename_columns(df):
    """
    Renames DataFrame columns by extracting the last part after '.', 
    and appends a counter if duplicates are found.

    Args:
        df (pd.DataFrame): The DataFrame to rename columns.

    Returns:
        pd.DataFrame: The DataFrame with renamed columns.
    """
    new_columns = []
    seen_columns = {}  # Track seen column names and their counts

    for col in df.columns:
        parts = col.split('.')
        new_col = parts[-1]  # Extract the last part

        if new_col in seen_columns:
            seen_columns[new_col] += 1
            new_col = f"{new_col}_{seen_columns[new_col]}"  # Append a counter
        else:
            seen_columns[new_col] = 0

        new_columns.append(new_col)

    df.columns = new_columns
    return df


xml_dataframe = rename_columns(xml_dataframe)


In [None]:
def load_data_to_duckdb(db_path, src_folder):
    data_dict = {
        "Date.txt": "wh_db.DimDate",
        "Time.txt": "wh_db.DimTime",
        "StatusType.txt": "wh_db.StatusType",
        "TaxRate.txt": "wh_db.TaxRate",
        "TradeType.txt": "wh_db.TradeType",
        "HR.csv": "temp_broker", #wh_db moved out to take into account SK_ID
        "Industry.txt": "wh_db.industry"
        # "FinwireCMP.txt": "wh_db.DimCompany"
    }
    
    con = duckdb.connect(database=db_path)
    
    for file_name, table_name in data_dict.items():
        file_path = os.path.join(src_folder, file_name)
        
        if file_name == "HR.csv":
            query = f"""
                CREATE OR REPLACE TEMP TABLE {table_name} AS 
                SELECT * FROM read_csv('{file_path}', delim=',', columns={{
                    'employeeid': 'BIGINT',
                    'managerid': 'BIGINT',
                    'employeefirstname': 'STRING',
                    'employeelastname': 'STRING',
                    'employeemi': 'STRING',
                    'employeejobcode': 'STRING',
                    'employeebranch': 'STRING',
                    'employeeoffice': 'STRING',
                    'employeephone': 'STRING'
                }}, header=False) 
                WHERE employeejobcode = '314';
            """

            insert_target_query = f"""
                INSERT INTO wh_db.DimBroker
                SELECT 
                    employeeid sk_brokerid,
                    employeeid brokerid,
                    managerid,
                    employeefirstname firstname,
                    employeelastname lastname,
                    employeemi middleinitial,
                    employeebranch branch,
                    employeeoffice office,
                    employeephone phone,
                    true iscurrent,
                    1 batchid, --temp, later read from db
                    (SELECT min(datevalue::DATE) as effectivedate FROM wh_db.DimDate) effectivedate,
                    '9999-12-31'::DATE enddate
                    FROM temp_broker;
                """
        else:
            query = f"COPY {table_name} FROM '{file_path}' (DELIMITER '|');"

        con.sql(query)
        save_sql_to_file(query, table_name)
        if file_name == "HR.csv":
            con.sql(insert_target_query)
            save_sql_to_file(insert_target_query, "wh_db.DimBroker")
        print(f"Loaded {file_name} into {table_name}")
        print(file_path)
    
    return con

# Example usage:
db_path = 'initial_db.duckdb'
src_folder = 'src/data/Batch1'
con = load_data_to_duckdb(db_path, src_folder)

# Test loading
#print(con.sql("SELECT * FROM wh_db.DimTime limit 10").fetchdf(10))


SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.DimDate.sql'.
Loaded Date.txt into wh_db.DimDate
src/data/Batch1\Date.txt
SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.DimTime.sql'.
Loaded Time.txt into wh_db.DimTime
src/data/Batch1\Time.txt
SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.StatusType.sql'.
Loaded StatusType.txt into wh_db.StatusType
src/data/Batch1\StatusType.txt
SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.TaxRate.sql'.
Loaded TaxRate.txt into wh_db.TaxRate
src/data/Batch1\TaxRate.txt
SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.TradeType.sql'.
Loaded TradeType.txt into wh_db.TradeType
src/data/Batch1\TradeType.txt
SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_temp_broker.sql'.


TypeError: sql(): incompatible function arguments. The following argument types are supported:
    1. (self: duckdb.duckdb.DuckDBPyConnection, query: object, *, alias: str = '', params: object = None) -> duckdb.duckdb.DuckDBPyRelation

Invoked with: <duckdb.duckdb.DuckDBPyConnection object at 0x0000022C2E0F7830>, "\n                INSERT INTO wh_db.DimBroker\n                SELECT \n                    employeeid sk_brokerid,\n                    employeeid brokerid,\n                    managerid,\n                    employeefirstname firstname,\n                    employeelastname lastname,\n                    employeemi middleinitial,\n                    employeebranch branch,\n                    employeeoffice office,\n                    employeephone phone,\n                    true iscurrent,\n                    1 batchid, --temp, later read from db\n                    (SELECT min(datevalue::DATE) as effectivedate FROM wh_db.DimDate) effectivedate,\n                    '9999-12-31'::DATE enddate\n                    FROM temp_broker;\n                ", 'wh_db.DimBroker'

In [21]:
insert_target_query = f"""
                INSERT INTO wh_db.DimBroker
                SELECT 
                    employeeid sk_brokerid,
                    employeeid brokerid,
                    managerid,
                    employeefirstname firstname,
                    employeelastname lastname,
                    employeemi middleinitial,
                    employeebranch branch,
                    employeeoffice office,
                    employeephone phone,
                    true iscurrent,
                    1 batchid, --temp, later read from db
                    (SELECT min(datevalue::DATE) as effectivedate FROM wh_db.DimDate) effectivedate,
                    '9999-12-31'::DATE enddate
                    FROM temp_broker;
                """

con_path = r"initial_db.duckdb"
con = duckdb.connect(con_path)
save_sql_to_file(insert_target_query, "wh_db.DimBroker")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.DimBroker.sql'.


In [6]:
print(con.execute("select COLUMN_COMMENT from information_schema.columns where table_name = 'DimBroker' limit 1").fetchdf())

             COLUMN_COMMENT
0  Surrogate key for broker


In [7]:
def get_table_schema(db_path, table_name):
    query = f"""
        SELECT 
            column_name AS name, 
            data_type AS type, 
            COLUMN_COMMENT AS description
        FROM information_schema.columns
        WHERE table_name = '{table_name}'
    """

    con = duckdb.connect(database=db_path)
    result = con.sql(query).fetchall()

    # Format result into the required structure
    schema_columns = [
        {"name": row[0], "type": row[1], "description": row[2] if row[2] else ""}
        for row in result
    ]

    return result

# Example usage
db_path = 'initial_db.duckdb'
table_name = 'DimBroker'
schema_columns = get_table_schema(db_path, table_name)

print(schema_columns)


[('sk_brokerid', 'BIGINT', 'Surrogate key for broker'), ('brokerid', 'BIGINT', 'Natural key for broker'), ('managerid', 'BIGINT', 'Natural key for managerâ€™s HR record'), ('firstname', 'VARCHAR', 'First name'), ('lastname', 'VARCHAR', 'Last Name'), ('middleinitial', 'VARCHAR', 'Middle initial'), ('branch', 'VARCHAR', 'Facility in which employee has office'), ('office', 'VARCHAR', 'Office number or description'), ('phone', 'VARCHAR', 'Employee phone number'), ('iscurrent', 'BOOLEAN', 'True if this is the current record'), ('batchid', 'INTEGER', 'Batch ID when this record was inserted'), ('effectivedate', 'DATE', 'Beginning of date range when this record was the current record'), ('enddate', 'DATE', 'Ending of date range when this record was the current record. A record that is not expired will use the date 9999-12-31.')]


In [22]:
stage_CustomerMgmt = """ 
        CREATE OR REPLACE TABLE wh_db_stage.CustomerMgmt  AS  
        SELECT
        try_cast(C_ID as BIGINT) customerid,
        try_cast(CA_ID as BIGINT) accountid,
        try_cast(CA_B_ID as BIGINT) brokerid,
        nullif(C_TAX_ID, '') taxid,
        nullif(CA_NAME, '') accountdesc,
        try_cast(CA_TAX_ST as TINYINT) taxstatus,
        CASE
            WHEN ActionType IN ('NEW', 'ADDACCT', 'UPDACCT', 'UPDCUST') THEN 'Active'
            WHEN ActionType IN ('CLOSEACCT', 'INACT') THEN 'Inactive'
            ELSE NULL
        END AS status,
        nullif(C_L_NAME, '') lastname,
        nullif(C_F_NAME, '') firstname,
        nullif(C_M_NAME, '') middleinitial,
        nullif(upper(C_GNDR), '') gender,
        try_cast(C_TIER as TINYINT) tier,
        try_cast(C_DOB as DATE) dob,
        nullif(C_ADLINE1, '') addressline1,
        nullif(C_ADLINE2, '') addressline2,
        nullif(C_ZIPCODE, '') postalcode,
        nullif(C_CITY, '') city,
        nullif(C_STATE_PROV, '') stateprov,
        nullif(C_CTRY, '') country,
        CASE
            WHEN nullif(C_LOCAL, '') IS NOT NULL THEN
                concat(
                    CASE WHEN nullif(C_CTRY_CODE, '') IS NOT NULL THEN '+' || C_CTRY_CODE || ' ' ELSE '' END,
                    CASE WHEN nullif(C_AREA_CODE, '') IS NOT NULL THEN '(' || C_AREA_CODE || ') ' ELSE '' END,
                    C_LOCAL,
                    COALESCE(C_EXT, '')
                )
            ELSE NULL
        END AS phone1,
        CASE
            WHEN nullif(C_LOCAL_1, '') IS NOT NULL THEN
                concat(
                    CASE WHEN nullif(C_CTRY_CODE_1, '') IS NOT NULL THEN '+' || C_CTRY_CODE_1 || ' ' ELSE '' END,
                    CASE WHEN nullif(C_AREA_CODE_1, '') IS NOT NULL THEN '(' || C_AREA_CODE_1 || ') ' ELSE '' END,
                    C_LOCAL_1,
                    COALESCE(C_EXT_1, '')
                )
            ELSE NULL
        END AS phone2,
        CASE
            WHEN nullif(C_LOCAL_2, '') IS NOT NULL THEN
                concat(
                    CASE WHEN nullif(C_CTRY_CODE_2, '') IS NOT NULL THEN '+' || C_CTRY_CODE_2 || ' ' ELSE '' END,
                    CASE WHEN nullif(C_AREA_CODE_2, '') IS NOT NULL THEN '(' || C_AREA_CODE_2 || ') ' ELSE '' END,
                    C_LOCAL_2,
                    COALESCE(C_EXT_2, '')
                )
            ELSE NULL
        END AS phone3,
        nullif(C_PRIM_EMAIL, '') email1,
        nullif(C_ALT_EMAIL, '') email2,
        nullif(C_LCL_TX_ID, '') lcl_tx_id,
        nullif(C_NAT_TX_ID, '') nat_tx_id,
        try_cast(ActionTS as TIMESTAMP) update_ts,
        ActionType
           
            FROM xml_dataframe"""

con.sql(stage_CustomerMgmt)
save_sql_to_file(insert_target_query, "wh_db_stage.CustomerMgmt")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db_stage.CustomerMgmt.sql'.


In [None]:
save_sql_to_file(insert_target_query, "wh_db.DimBroker")

In [24]:
import os
import re

def find_finwire_files(folder_path):
    """
    Traverses a folder recursively and finds files matching the pattern
    FINWIRE<YYYY>Q<Q> (e.g., FINWIRE2016Q3, FINWIRE2016Q4.txt).

    It specifically looks for filenames where the part *before* the extension
    exactly matches the pattern 'FINWIRE' followed by 4 digits (year),
    'Q', and 1 digit (quarter). Files with additional suffixes before the
    extension (like '_audit' in FINWIRE1967Q2_audit.csv) are skipped.

    Args:
        folder_path (str): The path to the folder to traverse.

    Returns:
        list: A list of full paths to the matching files.
               Returns an empty list if the folder doesn't exist or no
               matching files are found.
    """
    matching_files = []

    # Regex pattern:
    # ^        - anchor to the start of the string
    # FINWIRE  - literal string
    # \d{4}    - exactly 4 digits (for the year)
    # Q        - literal character 'Q'
    # \d       - exactly 1 digit (for the quarter)
    # $        - anchor to the end of the string
    # This pattern ensures the base name matches exactly.
    pattern = re.compile(r"^FINWIRE\d{4}Q\d$")

    # Check if the folder path exists
    if not os.path.isdir(folder_path):
        print(f"Warning: Folder not found at {folder_path}")
        return matching_files

    # os.walk traverses the directory tree (root, dirs, files)
    for root, _, files in os.walk(folder_path):
        for filename in files:
            # Split the filename into the base name and the extension
            base_name, _ = os.path.splitext(filename)

            # Check if the base name matches the compiled regex pattern
            if pattern.match(base_name):
                # If it matches, construct the full path and add it to the list
                full_path = os.path.join(root, filename)
                matching_files.append(full_path)

    return matching_files

find_finwire_files(src_folder)


['src/data/Batch1\\FINWIRE1967Q1',
 'src/data/Batch1\\FINWIRE1967Q2',
 'src/data/Batch1\\FINWIRE1967Q3',
 'src/data/Batch1\\FINWIRE1967Q4',
 'src/data/Batch1\\FINWIRE1968Q1',
 'src/data/Batch1\\FINWIRE1968Q2',
 'src/data/Batch1\\FINWIRE1968Q3',
 'src/data/Batch1\\FINWIRE1968Q4',
 'src/data/Batch1\\FINWIRE1969Q1',
 'src/data/Batch1\\FINWIRE1969Q2',
 'src/data/Batch1\\FINWIRE1969Q3',
 'src/data/Batch1\\FINWIRE1969Q4',
 'src/data/Batch1\\FINWIRE1970Q1',
 'src/data/Batch1\\FINWIRE1970Q2',
 'src/data/Batch1\\FINWIRE1970Q3',
 'src/data/Batch1\\FINWIRE1970Q4',
 'src/data/Batch1\\FINWIRE1971Q1',
 'src/data/Batch1\\FINWIRE1971Q2',
 'src/data/Batch1\\FINWIRE1971Q3',
 'src/data/Batch1\\FINWIRE1971Q4',
 'src/data/Batch1\\FINWIRE1972Q1',
 'src/data/Batch1\\FINWIRE1972Q2',
 'src/data/Batch1\\FINWIRE1972Q3',
 'src/data/Batch1\\FINWIRE1972Q4',
 'src/data/Batch1\\FINWIRE1973Q1',
 'src/data/Batch1\\FINWIRE1973Q2',
 'src/data/Batch1\\FINWIRE1973Q3',
 'src/data/Batch1\\FINWIRE1973Q4',
 'src/data/Batch1\\F

In [26]:
# siin pooleli

src_folder = 'src/data/Batch1'
list_of_finwire_files = find_finwire_files(src_folder)
# file_path = os.path.join(src_folder, "FINWIRE2016Q3")


file_path = list_of_finwire_files[1]
fin_wire_sql = f""" 
    INSERT INTO wh_db_stage.FinWire
    SELECT
        CASE
            WHEN SUBSTR(column0, 16, 3) = 'FIN' THEN
                CASE
                    WHEN TRY_CAST(TRIM(SUBSTR(column0, 187, 60)) AS BIGINT) IS NOT NULL THEN 'FIN_COMPANYID'
                    ELSE 'FIN_NAME'
                END
            ELSE SUBSTR(column0, 16, 3)
        END AS rectype,
        STRPTIME(SUBSTR(column0, 1, 8), '%Y%m%d') AS recdate,
        SUBSTR(column0, 19) AS value
    FROM read_csv_auto('{file_path}', HEADER=FALSE, filename=false, all_varchar=true)
    """

for file_path in list_of_finwire_files:
 #   file_path = os.path.join(src_folder, file)
    # õige
    con.sql(fin_wire_sql)
    save_sql_to_file(fin_wire_sql, "wh_db_stage.FinWire")
    print(file_path, "--DONE--")
    break # limit

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db_stage.FinWire.sql'.
src/data/Batch1\FINWIRE1967Q1 --DONE--


In [28]:
# õige
dimcompany_insert_sql = """INSERT INTO wh_db.DimCompany
WITH cmp AS (
    SELECT
        recdate,
        TRIM(SUBSTR(value, 1, 60)) AS CompanyName,
        TRIM(SUBSTR(value, 61, 10)) AS CIK,
        TRIM(SUBSTR(value, 71, 4)) AS Status,
        TRIM(SUBSTR(value, 75, 2)) AS IndustryID,
        TRIM(SUBSTR(value, 77, 4)) AS SPrating,
        TRY_CAST(TRY_CAST(SUBSTRING(value, 81, 8) AS TIMESTAMP) AS DATE) AS FoundingDate,
        TRIM(SUBSTR(value, 89, 80)) AS AddrLine1,
        TRIM(SUBSTR(value, 169, 80)) AS AddrLine2,
        TRIM(SUBSTR(value, 249, 12)) AS PostalCode,
        TRIM(SUBSTR(value, 261, 25)) AS City,
        TRIM(SUBSTR(value, 286, 20)) AS StateProvince,
        TRIM(SUBSTR(value, 306, 24)) AS Country,
        TRIM(SUBSTR(value, 330, 46)) AS CEOname,
        TRIM(SUBSTR(value, 376, 150)) AS Description
    FROM wh_db_stage.FinWire
    WHERE rectype = 'CMP'
)
SELECT
    CAST(strftime(effectivedate, '%Y%m%d') || companyid AS BIGINT) AS sk_companyid,
    companyid,
    status,
    name,
    industry,
    sprating,
    islowgrade,
    ceo,
    addressline1,
    addressline2,
    postalcode,
    city,
    stateprov,
    country,
    description,
    foundingdate,
    CASE WHEN enddate = '9999-12-31'::DATE THEN TRUE ELSE FALSE END AS iscurrent,
    batchid,
    effectivedate,
    enddate
FROM (
    SELECT
        CAST(cik AS BIGINT) AS companyid,
        CASE cmp.status
            WHEN 'ACTV' THEN 'Active'
            WHEN 'CMPT' THEN 'Completed'
            WHEN 'CNCL' THEN 'Canceled'
            WHEN 'PNDG' THEN 'Pending'
            WHEN 'SBMT' THEN 'Submitted'
            WHEN 'INAC' THEN 'Inactive'
            ELSE NULL -- or a default value, if needed
        END AS status,
        CompanyName AS name,
        ind.in_name AS industry,
        CASE
            WHEN SPrating IN ('AAA', 'AA', 'AA+', 'AA-', 'A', 'A+', 'A-', 'BBB', 'BBB+', 'BBB-', 'BB', 'BB+', 'BB-', 'B', 'B+', 'B-', 'CCC', 'CCC+', 'CCC-', 'CC', 'C', 'D') THEN SPrating
            ELSE NULL::VARCHAR
        END AS sprating,
        CASE
            WHEN SPrating IN ('AAA', 'AA', 'A', 'AA+', 'A+', 'AA-', 'A-', 'BBB', 'BBB+', 'BBB-') THEN FALSE
            WHEN SPrating IN ('BB', 'B', 'CCC', 'CC', 'C', 'D', 'BB+', 'B+', 'CCC+', 'BB-', 'B-', 'CCC-') THEN TRUE
            ELSE NULL::BOOLEAN
        END AS islowgrade,
        CEOname AS ceo,
        AddrLine1 AS addressline1,
        AddrLine2 AS addressline2,
        PostalCode AS postalcode,
        City AS city,
        StateProvince AS stateprov,
        Country AS country,
        Description AS description,
        FoundingDate AS foundingdate,
        1 AS batchid,
        recdate AS effectivedate,
        COALESCE(
            LEAD(try_cast(recdate AS DATE)) OVER (PARTITION BY cik ORDER BY recdate),
            try_cast('9999-12-31' AS DATE)
        ) AS enddate
    FROM cmp
    JOIN wh_db.industry ind ON cmp.industryid = ind.in_id
)
WHERE effectivedate < enddate;
"""

con.sql(dimcompany_insert_sql)
save_sql_to_file(dimcompany_insert_sql, "wh_db.DimCompany")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\2_wh_db.DimCompany.sql'.


In [30]:
src_folder = 'src/data/Batch1'
file_path = os.path.join(src_folder, "Prospect.csv")
batch_number = int(''.join(filter(str.isdigit, os.path.basename(src_folder))))

temp_propect = f"""
CREATE OR REPLACE TEMP TABLE temp_propect AS 
SELECT     *, 
    {batch_number} AS batchid 
    FROM read_csv_auto('{file_path}', columns={{
    "agencyid": "STRING",
    "lastname": "STRING",
    "firstname": "STRING",
    "middleinitial": "STRING",
    "gender": "STRING",
    "addressline1": "STRING",
    "addressline2": "STRING",
    "postalcode": "STRING",
    "city": "STRING",
    "state": "STRING",
    "country": "STRING",
    "phone": "STRING",
    "income": "INT",
    "numbercars": "INT",
    "numberchildren": "INT",
    "maritalstatus": "STRING",
    "age": "INT",
    "creditrating": "INT",
    "ownorrentflag": "STRING",
    "employer": "STRING",
    "numbercreditcards": "INT",
    "networth": "INT",
}}, header=False);
"""

con.sql(temp_propect)
save_sql_to_file(temp_propect, "wh_db_stage.ProspectIncremental")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db_stage.ProspectIncremental.sql'.


In [31]:
temp_propect_marketingnameplate = """
CREATE OR REPLACE TEMP TABLE temp_propect_marketingnameplate AS 
SELECT
    *,
    CASE 
        WHEN LENGTH(
            CONCAT(
                CASE WHEN networth > 1000000 OR income > 200000 THEN 'HighValue+' ELSE '' END,
                CASE WHEN numberchildren > 3 OR numbercreditcards > 5 THEN 'Expenses+' ELSE '' END,
                CASE WHEN age > 45 THEN 'Boomer+' ELSE '' END,
                CASE WHEN income < 50000 OR creditrating < 600 OR networth < 100000 THEN 'MoneyAlert+' ELSE '' END,
                CASE WHEN numbercars > 3 OR numbercreditcards > 7 THEN 'Spender+' ELSE '' END,
                CASE WHEN age < 25 AND networth > 1000000 THEN 'Inherited+' ELSE '' END
            )
        ) > 0 
        THEN LEFT(
            CONCAT(
                CASE WHEN networth > 1000000 OR income > 200000 THEN 'HighValue+' ELSE '' END,
                CASE WHEN numberchildren > 3 OR numbercreditcards > 5 THEN 'Expenses+' ELSE '' END,
                CASE WHEN age > 45 THEN 'Boomer+' ELSE '' END,
                CASE WHEN income < 50000 OR creditrating < 600 OR networth < 100000 THEN 'MoneyAlert+' ELSE '' END,
                CASE WHEN numbercars > 3 OR numbercreditcards > 7 THEN 'Spender+' ELSE '' END,
                CASE WHEN age < 25 AND networth > 1000000 THEN 'Inherited+' ELSE '' END
            ),
            LENGTH(
                CONCAT(
                    CASE WHEN networth > 1000000 OR income > 200000 THEN 'HighValue+' ELSE '' END,
                    CASE WHEN numberchildren > 3 OR numbercreditcards > 5 THEN 'Expenses+' ELSE '' END,
                    CASE WHEN age > 45 THEN 'Boomer+' ELSE '' END,
                    CASE WHEN income < 50000 OR creditrating < 600 OR networth < 100000 THEN 'MoneyAlert+' ELSE '' END,
                    CASE WHEN numbercars > 3 OR numbercreditcards > 7 THEN 'Spender+' ELSE '' END,
                    CASE WHEN age < 25 AND networth > 1000000 THEN 'Inherited+' ELSE '' END
                )
            ) - 1
        )
        ELSE NULL 
    END AS marketingnameplate
FROM temp_propect;
"""

con.sql(temp_propect_marketingnameplate)
save_sql_to_file(temp_propect_marketingnameplate, "wh_db_stage.ProspectIncremental")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\2_wh_db_stage.ProspectIncremental.sql'.


In [14]:
## täpselt ei tea, mille järgi tuvastada ridu - st kas agency_id, firstname_lastname piisab
con.sql(""" 
ALTER TABLE wh_db_stage.ProspectIncremental
    ADD CONSTRAINT ProspectIncremental_pk PRIMARY KEY (agencyid, lastname, firstname);
""")

In [32]:
ProspectIncremental = """
INSERT INTO wh_db_stage.ProspectIncremental (
    agencyid, lastname, firstname, middleinitial, gender, addressline1, 
    addressline2, postalcode, city, state, country, phone, income, 
    numbercars, numberchildren, maritalstatus, age, creditrating, 
    ownorrentflag, employer, numbercreditcards, networth, 
    marketingnameplate, recordbatchid, batchid
)
SELECT
    tp.agencyid, tp.lastname, tp.firstname, tp.middleinitial, tp.gender, tp.addressline1, 
    tp.addressline2, tp.postalcode, tp.city, tp.state, tp.country, tp.phone, tp.income, 
    tp.numbercars, tp.numberchildren, tp.maritalstatus, tp.age, tp.creditrating, 
    tp.ownorrentflag, tp.employer, tp.numbercreditcards, tp.networth, 
    tp.marketingnameplate, tp.batchid, tp.batchid
FROM temp_propect_marketingnameplate AS tp
ON CONFLICT (agencyid, lastname, firstname) DO UPDATE SET
    middleinitial = excluded.middleinitial,
    gender = excluded.gender,
    addressline1 = excluded.addressline1,
    addressline2 = excluded.addressline2,
    postalcode = excluded.postalcode,
    city = excluded.city,
    state = excluded.state,
    country = excluded.country,
    phone = excluded.phone,
    income = excluded.income,
    numbercars = excluded.numbercars,
    numberchildren = excluded.numberchildren,
    maritalstatus = excluded.maritalstatus,
    age = excluded.age,
    creditrating = excluded.creditrating,
    ownorrentflag = excluded.ownorrentflag,
    employer = excluded.employer,
    numbercreditcards = excluded.numbercreditcards,
    networth = excluded.networth,
    marketingnameplate = excluded.marketingnameplate,
    recordbatchid = excluded.batchid;
"""
con.sql(ProspectIncremental)
save_sql_to_file(ProspectIncremental, "wh_db_stage.ProspectIncremental")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\3_wh_db_stage.ProspectIncremental.sql'.


In [16]:
con.sql("select * from wh_db_stage.CustomerMgmt where customerid = 20")

┌────────────┬───────────┬──────────┬─────────────┬───────────────────────────────────────────────┬───────────┬──────────┬──────────┬───────────┬───────────────┬─────────┬──────┬────────────┬──────────────────┬──────────────┬────────────┬───────────────┬───────────┬──────────────────────────┬────────────────┬──────────┬───────────────────┬───────────────────────────┬───────────────────────────────┬───────────┬───────────┬─────────────────────┬────────────┐
│ customerid │ accountid │ brokerid │    taxid    │                  accountdesc                  │ taxstatus │  status  │ lastname │ firstname │ middleinitial │ gender  │ tier │    dob     │   addressline1   │ addressline2 │ postalcode │     city      │ stateprov │         country          │     phone1     │  phone2  │      phone3       │          email1           │            email2             │ lcl_tx_id │ nat_tx_id │      update_ts      │ ActionType │
│   int64    │   int64   │  int64   │   varchar   │                    varchar

In [17]:
con.sql(""" 
select distinct * from 
wh_db_stage.CustomerMgmt a
LEFT 
join wh_db.TaxRate b on
        a.nat_tx_id  = b.TX_ID
where a.customerid = 20
order by update_ts asc

""")

┌────────────┬───────────┬──────────┬─────────────┬───────────────────────────────────────────────┬───────────┬──────────┬──────────┬───────────┬───────────────┬─────────┬──────┬────────────┬──────────────────┬──────────────┬────────────┬───────────────┬───────────┬──────────────────────────┬────────────────┬──────────┬───────────────────┬───────────────────────────┬───────────────────────────────┬───────────┬───────────┬─────────────────────┬────────────┬─────────┬────────────────────────────────────────────┬─────────┐
│ customerid │ accountid │ brokerid │    taxid    │                  accountdesc                  │ taxstatus │  status  │ lastname │ firstname │ middleinitial │ gender  │ tier │    dob     │   addressline1   │ addressline2 │ postalcode │     city      │ stateprov │         country          │     phone1     │  phone2  │      phone3       │          email1           │            email2             │ lcl_tx_id │ nat_tx_id │      update_ts      │ ActionType │  tx_id  │    

In [33]:
# Cust historical pipeline

customers = """ 
CREATE OR REPLACE TEMP TABLE customers AS
  SELECT
    customerid,
    taxid,
    status,
    lastname,
    firstname,
    middleinitial,
    gender,
    tier,
    dob,
    addressline1,
    addressline2,
    postalcode,
    city,
    stateprov,
    country,
    phone1,
    phone2,
    phone3,
    email1,
    email2,
    lcl_tx_id,
    nat_tx_id,
    1 batchid,
    update_ts
  FROM
    wh_db_stage.CustomerMgmt c
  WHERE
    ActionType in ('NEW', 'INACT', 'UPDCUST')
"""

con.sql(customers)
save_sql_to_file(customers, "wh_db.DimCustomer")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.DimCustomer.sql'.


In [34]:
customers_final = """CREATE OR REPLACE TEMP TABLE customers_final AS
SELECT
    customerid,
    COALESCE(taxid, last_value(taxid ORDER BY update_ts DESC) OVER w) AS taxid,
    status,
    COALESCE(lastname, last_value(lastname ORDER BY update_ts DESC) OVER w) AS lastname,
    COALESCE(firstname, last_value(firstname ORDER BY update_ts DESC) OVER w) AS firstname,
    COALESCE(middleinitial, last_value(middleinitial ORDER BY update_ts DESC) OVER w) AS middleinitial,
    COALESCE(gender, last_value(gender ORDER BY update_ts DESC) OVER w) AS gender,
    COALESCE(tier, last_value(tier ORDER BY update_ts DESC) OVER w) AS tier,
    COALESCE(dob, last_value(dob ORDER BY update_ts DESC) OVER w) AS dob,
    COALESCE(addressline1, last_value(addressline1 ORDER BY update_ts DESC) OVER w) AS addressline1,
    COALESCE(addressline2, last_value(addressline2 ORDER BY update_ts DESC) OVER w) AS addressline2,
    COALESCE(postalcode, last_value(postalcode ORDER BY update_ts DESC) OVER w) AS postalcode,
    COALESCE(CITY, last_value(CITY ORDER BY update_ts DESC) OVER w) AS CITY,
    COALESCE(stateprov, last_value(stateprov ORDER BY update_ts DESC) OVER w) AS stateprov,
    COALESCE(country, last_value(country ORDER BY update_ts DESC) OVER w) AS country,
    COALESCE(phone1, last_value(phone1 ORDER BY update_ts DESC) OVER w) AS phone1,
    COALESCE(phone2, last_value(phone2 ORDER BY update_ts DESC) OVER w) AS phone2,
    COALESCE(phone3, last_value(phone3 ORDER BY update_ts DESC) OVER w) AS phone3,
    COALESCE(email1, last_value(email1 ORDER BY update_ts DESC) OVER w) AS email1,
    COALESCE(email2, last_value(email2 ORDER BY update_ts DESC) OVER w) AS email2,
    COALESCE(LCL_TX_ID, last_value(LCL_TX_ID ORDER BY update_ts DESC) OVER w) AS LCL_TX_ID,
    COALESCE(NAT_TX_ID, last_value(NAT_TX_ID ORDER BY update_ts DESC) OVER w) AS NAT_TX_ID,
    batchid,
    CASE 
        WHEN NULLIF(lead(update_ts) OVER w, NULL) IS NULL THEN 'Y' 
        ELSE 'N' 
    END AS iscurrent,
    update_ts::DATE AS effectivedate,
    COALESCE(lead(update_ts::DATE) OVER w, '9999-12-31'::DATE) AS enddate
FROM
    customers
WINDOW w AS (PARTITION BY customerid ORDER BY update_ts);"""

con.sql(customers_final)
save_sql_to_file(customers_final, "wh_db.DimCustomer")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\2_wh_db.DimCustomer.sql'.


In [35]:
# 
dimcustomer = """
INSERT INTO wh_db.DimCustomer (
    sk_customerid,
    customerid,
    taxid,
    status,
    lastname,
    firstname,
    middleinitial,
    gender,
    tier,
    dob,
    addressline1,
    addressline2,
    postalcode,
    city,
    stateprov,
    country,
    phone1,
    phone2,
    phone3,
    email1,
    email2,
    nationaltaxratedesc,
    nationaltaxrate,
    localtaxratedesc,
    localtaxrate,
    agencyid,
    creditrating,
    networth,
    marketingnameplate,
    iscurrent,
    batchid,
    effectivedate,
    enddate
)
WITH MaxSK AS (
    SELECT COALESCE(MAX(sk_customerid), 0) AS max_sk_customerid
    FROM wh_db.DimCustomer
),
CustomerData AS (
    SELECT 
        c.customerid,
        c.taxid,
        c.status,
        c.lastname,
        c.firstname,
        c.middleinitial,
        c.gender,
        c.tier,
        c.dob,
        c.addressline1,
        c.addressline2,
        c.postalcode,
        c.city,
        c.stateprov,
        c.country,
        c.phone1,
        c.phone2,
        c.phone3,
        c.email1, 
        c.email2,
        r_nat.TX_NAME as nationaltaxratedesc,
        r_nat.TX_RATE as nationaltaxrate,
        r_lcl.TX_NAME as localtaxratedesc,
        r_lcl.TX_RATE as localtaxrate,
        p.agencyid,
        p.creditrating,
        p.networth,
        p.marketingnameplate,
        c.iscurrent,
        c.batchid,
        c.effectivedate,
        c.enddate 
    FROM customers_final c
    JOIN wh_db.TaxRate r_lcl 
        ON c.lcl_tx_id = r_lcl.TX_ID
    JOIN wh_db.TaxRate r_nat 
        ON c.nat_tx_id = r_nat.TX_ID
    LEFT JOIN wh_db_stage.ProspectIncremental p 
        ON 
            UPPER(p.lastname) = UPPER(c.lastname)
            AND UPPER(p.firstname) = UPPER(c.firstname)
            AND UPPER(p.addressline1) = UPPER(c.addressline1)
            AND UPPER(NULLIF(p.addressline2, '')) = UPPER(NULLIF(c.addressline2, ''))
            AND UPPER(p.postalcode) = UPPER(c.postalcode)
    WHERE c.effectivedate < c.enddate
)
SELECT 
    ROW_NUMBER() OVER () + (SELECT max_sk_customerid FROM MaxSK) + 1 AS sk_customerid,
    c.customerid,
    c.taxid,
    c.status,
    c.lastname,
    c.firstname,
    c.middleinitial,
    IF(c.gender IN ('M', 'F'), c.gender, 'U') AS gender,
    c.tier,
    c.dob,
    c.addressline1,
    c.addressline2,
    c.postalcode,
    c.city,
    c.stateprov,
    c.country,
    c.phone1,
    c.phone2,
    c.phone3,
    c.email1, 
    c.email2,
    nationaltaxratedesc,
    nationaltaxrate,
    localtaxratedesc,
    localtaxrate,
    agencyid,
    creditrating,
    networth,
    marketingnameplate,
    iscurrent,
    batchid,
    effectivedate,
    enddate 
FROM CustomerData c;
 """

con.sql(dimcustomer)
save_sql_to_file(dimcustomer, "wh_db.DimCustomer")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\3_wh_db.DimCustomer.sql'.


In [36]:
# kontrolli kui kõik FINWIRE on laetud
DimSecurity = """ 
INSERT INTO wh_db.DimSecurity
WITH SEC AS (
    SELECT
        recdate AS effectivedate,
        TRIM(SUBSTR(value, 1, 15)) AS Symbol,
        TRIM(SUBSTR(value, 16, 6)) AS issue,
        TRIM(SUBSTR(value, 22, 4)) AS Status,
        TRIM(SUBSTR(value, 26, 70)) AS Name,
        TRIM(SUBSTR(value, 96, 6)) AS exchangeid,
        TRY_CAST(SUBSTR(value, 102, 13) AS BIGINT) AS sharesoutstanding,
        TRY_CAST(STRPTIME(SUBSTR(value, 115, 8), '%Y%m%d') AS DATE) AS firsttrade,
        TRY_CAST(STRPTIME(SUBSTR(value, 123, 8), '%Y%m%d') AS DATE) AS firsttradeonexchange,
        TRY_CAST(SUBSTR(value, 131, 12) AS DOUBLE) AS Dividend,
        TRIM(CASE WHEN  regexp_matches(SUBSTR(value, -10), '^[0-9]+$') THEN  REGEXP_REPLACE(SUBSTR(value, -10), '^0+', '')
            ELSE SUBSTR(value, -60)
        end) as conameorcik
    FROM wh_db_stage.FinWire
    WHERE rectype = 'SEC'
),
dc AS (
    SELECT
        sk_companyid,
        name AS conameorcik,
        EffectiveDate,
        EndDate
    FROM wh_db.DimCompany
    UNION ALL
    SELECT
        sk_companyid,
        CAST(companyid AS VARCHAR) AS conameorcik,
        EffectiveDate,
        EndDate
    FROM wh_db.DimCompany
),
SEC_prep AS (
    SELECT
        SEC.* EXCLUDE (Status, conameorcik),
        COALESCE(TRY_CAST(conameorcik AS BIGINT)::VARCHAR, conameorcik) AS conameorcik,
        CASE 
            WHEN status = 'ACTV' THEN 'Active'
            WHEN status = 'CMPT' THEN 'Completed'
            WHEN status = 'CNCL' THEN 'Canceled'
            WHEN status = 'PNDG' THEN 'Pending'
            WHEN status = 'SBMT' THEN 'Submitted'
            WHEN status = 'INAC' THEN 'Inactive'
            ELSE NULL -- Or handle other cases
        END AS status,
        COALESCE(
            LEAD(effectivedate) OVER (PARTITION BY Symbol ORDER BY effectivedate),
            ('9999-12-31')::DATE
        ) AS enddate
    FROM SEC
),
        SEC_final AS (
    SELECT
        SEC.Symbol,
        SEC.issue,
        SEC.status,
        SEC.Name,
        SEC.exchangeid,
        dc.sk_companyid,
        SEC.sharesoutstanding,
        SEC.firsttrade,
        SEC.firsttradeonexchange,
        SEC.Dividend,
        CASE WHEN SEC.effectivedate < dc.EffectiveDate THEN dc.EffectiveDate ELSE SEC.effectivedate END AS effectivedate,
        CASE WHEN SEC.enddate > dc.EndDate THEN dc.EndDate ELSE SEC.enddate END AS enddate
    FROM SEC_prep SEC
    JOIN dc
        ON SEC.conameorcik = dc.conameorcik
        AND SEC.effectivedate < dc.EndDate
        AND SEC.enddate > dc.EffectiveDate
)
SELECT
    ROW_NUMBER() OVER () AS sk_securityid,
    Symbol,
    issue,
    status,
    Name,
    exchangeid,
    sk_companyid,
    sharesoutstanding,
    firsttrade,
    firsttradeonexchange,
    Dividend,
    CASE WHEN enddate = '9999-12-31'::DATE THEN TRUE ELSE FALSE END AS iscurrent,
    1 AS batchid,
    effectivedate,
    enddate
FROM SEC_final
WHERE effectivedate < enddate;


"""

con.sql(DimSecurity)
save_sql_to_file(DimSecurity, "wh_db.DimSecurity")


SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.DimSecurity.sql'.


In [24]:
src_folder = 'src/data/Batch1'
file_path = os.path.join(src_folder, "TradeHistory.txt")
con.sql(f""" 

SELECT
*
FROM read_csv_auto('{file_path}', HEADER=FALSE, filename=false, all_varchar=true, columns = {{
    "tradeid": "BIGINT",
    "th_dts": "TIMESTAMP",
    "status": "STRING"
}})
""")


┌─────────┬─────────────────────┬─────────┐
│ tradeid │       th_dts        │ status  │
│  int64  │      timestamp      │ varchar │
├─────────┼─────────────────────┼─────────┤
│       0 │ 2012-07-07 00:00:47 │ SBMT    │
│       0 │ 2012-07-07 00:02:08 │ CMPT    │
│       1 │ 2012-07-07 00:06:15 │ SBMT    │
│       1 │ 2012-07-07 00:11:02 │ CMPT    │
│       2 │ 2012-07-07 00:06:57 │ PNDG    │
│       2 │ 2012-07-08 12:07:35 │ SBMT    │
│       2 │ 2012-07-08 12:11:23 │ CMPT    │
│       3 │ 2012-07-07 00:09:25 │ PNDG    │
│       3 │ 2012-09-15 11:35:22 │ SBMT    │
│       3 │ 2012-09-15 11:38:32 │ CMPT    │
│       · │          ·          │  ·      │
│       · │          ·          │  ·      │
│       · │          ·          │  ·      │
│    3938 │ 2012-07-18 01:46:57 │ PNDG    │
│    3938 │ 2012-07-30 02:40:00 │ SBMT    │
│    3938 │ 2012-07-30 02:41:57 │ CMPT    │
│    3939 │ 2012-07-18 01:51:32 │ PNDG    │
│    3939 │ 2012-09-08 16:39:12 │ CNCL    │
│    3940 │ 2012-07-18 01:52:23 

In [37]:
# DimAccount
DimAccount = """ 
INSERT INTO wh_db.DimAccount 
WITH account AS (
    SELECT
    accountid,
    customerid,
    accountdesc,
    taxstatus,
    brokerid,
    status,
    update_ts,
    1 batchid
from wh_db_stage.CustomerMgmt
where ActionType NOT IN ('UPDCUST', 'INACT')
  ),
account_final AS (
  SELECT
    accountid, -- Kept accountid as it's the partitioning key
    customerid,
    COALESCE(
        accountdesc,
        last_value(accountdesc) OVER w -- Retained  for correct logic
    ) AS accountdesc,
    COALESCE(
        taxstatus,
        last_value(taxstatus)  OVER w -- Retained 
    ) AS taxstatus,
    COALESCE(
        brokerid,
        last_value(brokerid)  OVER w -- Retained 
    ) AS brokerid,
    COALESCE(
        status,
        last_value(status)  OVER w -- Retained 
    ) AS status,
    batchid,
    CASE
        WHEN lead(update_ts) OVER w IS NULL THEN 'Y' -- Check if it's the last record in the partition
        ELSE 'N'
    END AS iscurrent,
    update_ts::DATE AS effectivedate, -- Using target format's casting style
    COALESCE(
        lead(update_ts::DATE) OVER w, -- Get next record's date within the partition
        '9999-12-31'::DATE           -- Default for the last record
    ) AS enddate
FROM
    account a -- Using the original table name
WINDOW w AS (
    PARTITION BY accountid -- Partitioning by the key from the original query
    ORDER BY update_ts     -- Ordering by the timestamp from the original query
 )
),
  account_cust_updates AS (
  SELECT
    a.* EXCLUDE (effectivedate, enddate, customerid),
    c.sk_customerid,
    if(
      a.effectivedate < c.effectivedate,
      c.effectivedate,
      a.effectivedate
    ) effectivedate,
    if(a.enddate > c.enddate, c.enddate, a.enddate) enddate
  FROM account_final a
  FULL OUTER JOIN wh_db.DimCustomer c 
    ON a.customerid = c.customerid
    AND c.enddate > a.effectivedate
    AND c.effectivedate < a.enddate
  WHERE a.effectivedate < a.enddate
)
SELECT
    CAST(strftime(a.effectivedate, '%Y%m%d') || a.accountid AS BIGINT),
    a.accountid,
    b.sk_brokerid, 
    a.sk_customerid,
    a.accountdesc,
    a.TaxStatus,
    a.status,
    -- Using standard CASE instead of IF() for iscurrent calculation
    CASE
        WHEN a.enddate = '9999-12-31'::DATE THEN true
        ELSE false
    END AS iscurrent,
    a.batchid,
    a.effectivedate,
    a.enddate
FROM
    account_cust_updates a
JOIN
    wh_db.DimBroker b ON a.brokerid = b.brokerid;
"""

con.sql(DimAccount)
save_sql_to_file(DimAccount, "wh_db.DimAccount")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.DimAccount.sql'.


In [38]:
src_folder = 'src/data/Batch1'
file_path = os.path.join(src_folder, "Trade.txt")

# DimTrade
tradetxt = f""" 

CREATE OR REPLACE TABLE wh_db_stage.tradetxt AS 
SELECT * from  read_csv('{file_path}', HEADER=FALSE, filename=false, all_varchar=true, delim='|',strict_mode=false , columns = {{
    "tradeid": "BIGINT", 
    "t_dts": "TIMESTAMP",
    "status": "STRING", 
    "t_tt_id": "STRING",
    "cashflag": "TINYINT",
    "t_s_symb": "STRING",
    "quantity": "INT",
    "bidprice": "DOUBLE",
    "t_ca_id": "BIGINT",
    "executedby": "STRING",
    "tradeprice": "DOUBLE",
    "fee": "DOUBLE",
    "commission": "DOUBLE",
    "tax": "DOUBLE"}});

"""

src_folder = 'src/data/Batch1'
file_path = os.path.join(src_folder, "TradeHistory.txt")
tradehistory = f""" 
CREATE OR REPLACE TABLE wh_db_stage.tradehistory AS 
select * FROM read_csv_auto('{file_path}', HEADER=FALSE, filename=false, all_varchar=true, columns = {{
    "tradeid": "BIGINT", --TH_T_ID
    "th_dts": "TIMESTAMP", --TH_DTS
    "status": "STRING"}}); --TH_ST_ID

"""

con.sql(tradetxt)
save_sql_to_file(tradetxt, "wh_db_stage.tradehistory")
con.sql(tradehistory)
save_sql_to_file(tradehistory, "wh_db_stage.tradehistory")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db_stage.tradehistory.sql'.
SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\2_wh_db_stage.tradehistory.sql'.


In [39]:
DimTrade = f""" 
INSERT INTO wh_db.DimTrade
WITH tradehistorical AS (
select 
    a.tradeid,
    --brokerid
    CASE
    		WHEN b.status = 'SBMT' AND a.t_tt_id IN ( 'TMB', 'TMS' ) OR b.status = 'PNDG' THEN b.TH_DTS
			WHEN b.status IN ( 'CMPT', 'CNCL' ) THEN NULL
		END AS SK_CreateDateID
		, CASE 
			WHEN b.status = 'SBMT' AND a.t_tt_id IN ( 'TMB', 'TMS' ) OR b.status = 'PNDG' THEN b.TH_DTS
			WHEN b.status IN ( 'CMPT', 'CNCL' ) THEN NULL
		END AS SK_CreateTimeID
		, CASE 
			WHEN b.status = 'SBMT' AND a.t_tt_id IN ( 'TMB', 'TMS' ) OR b.status = 'PNDG' THEN NULL
			WHEN b.status IN ( 'CMPT', 'CNCL' ) THEN b.TH_DTS
		END AS SK_CloseDateID
		, CASE 
			WHEN b.status = 'SBMT' AND a.t_tt_id IN ( 'TMB', 'TMS' ) OR b.status = 'PNDG' THEN NULL
			WHEN b.status IN ( 'CMPT', 'CNCL' ) THEN b.TH_DTS
    END AS SK_CloseTimeID,
    c.st_name,
    CASE t_tt_id
      WHEN 'TMB' THEN 'Market Buy'
      WHEN 'TMS' THEN 'Market Sell'
      WHEN 'TSL' THEN 'Stop Loss'
      WHEN 'TLS' THEN 'Limit Sell'
      WHEN 'TLB' THEN 'Limit Buy'
    ELSE NULL -- Or some default value if needed
END AS type,
  a.cashflag,
  --ds.sk_securityid
  --ds.sk_companyid
  a.quantity,
  a.bidprice,
   -- sk_customerid
  -- sk_accountid
  a.executedby,
  a.tradeprice,
  a.fee,
  a.commission,
  a.tax,
  1 batchid,
  a.t_s_symb,
  a.t_ca_id
  from wh_db_stage.tradetxt a
  join wh_db_stage.tradehistory b
on a.tradeid = b.tradeid
  join wh_db.StatusType c -- ainult üks praegu
    ON a.status = c.st_id
)
select 
  trade.tradeid
  ,da.sk_brokerid
  ,CAST(strftime(trade.SK_CreateDateID, '%Y%m%d') || da.accountid || da.sk_brokerid AS BIGINT)
  ,CAST(strftime(trade.SK_CreateTimeID, '%Y%m%d') || da.accountid || da.sk_brokerid AS BIGINT)
  ,CAST(strftime(trade.SK_CloseDateID, '%Y%m%d') || da.accountid || da.sk_brokerid AS BIGINT)
  ,CAST(strftime(trade.SK_CloseTimeID, '%Y%m%d') || da.accountid || da.sk_brokerid AS BIGINT)
  ,trade.st_name
  ,trade.type
  ,trade.cashflag
  ,ds.sk_securityid
  ,ds.sk_companyid
  ,trade.quantity
  ,trade.bidprice
  ,da.sk_customerid
  ,da.sk_accountid
  ,trade.executedby
  ,trade.tradeprice
  ,trade.fee
  ,trade.commission
  ,trade.tax
  ,1 batchid
  
from tradehistorical trade
JOIN wh_db.DimSecurity ds
  ON 
    ds.symbol = trade.t_s_symb
    AND SK_CreateDateID::DATE >= ds.effectivedate 
    AND SK_CreateDateID::DATE < ds.enddate
JOIN wh_db.DimAccount da
  ON 
    trade.t_ca_id = da.accountid 
    AND SK_CreateDateID::DATE >= da.effectivedate 
    AND SK_CreateDateID::DATE < da.enddate;
        """

con.sql(DimTrade)
save_sql_to_file(DimTrade, "wh_db.DimTrade")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.DimTrade.sql'.


In [40]:
# FactCashBalances
src_folder = 'src/data/Batch1'
file_path = os.path.join(src_folder, "CashTransaction.txt")
FactCashBalances = f""" 
INSERT INTO wh_db.FactCashBalances
WITH historical AS (
    SELECT
        accountid,
        ct_dts::DATE AS datevalue,
        SUM(ct_amt) AS account_daily_total,
        1 batchid
    FROM read_csv(
        '{file_path}',
        HEADER=FALSE,
        columns={{'accountid': 'BIGINT', 'ct_dts': 'TIMESTAMP', 'ct_amt': 'DOUBLE', 'ct_name': 'VARCHAR'}}
    )
    GROUP BY ALL
  )
SELECT
    a.sk_customerid,
    a.sk_accountid,
    CAST(STRFTIME(datevalue, '%Y%m%d') AS BIGINT) AS sk_dateid,
    SUM(account_daily_total) OVER (PARTITION BY c.accountid ORDER BY datevalue) AS cash,
    c.batchid
FROM historical c
JOIN wh_db.DimAccount a
    ON c.accountid = a.accountid
    AND c.datevalue >= a.effectivedate
    AND c.datevalue < a.enddate;

    """

con.sql(FactCashBalances)
save_sql_to_file(FactCashBalances, "wh_db.FactCashBalances")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.FactCashBalances.sql'.


In [None]:
# replace filename
FactHoldings = f""" 
INSERT INTO wh_db.FactHoldings
WITH Holdings AS (
    SELECT
        *,
        1 AS batchid
    FROM read_csv(
        'src/data/Batch1\\HoldingHistory.txt',
        HEADER=FALSE,
        columns={{'hh_h_t_id': 'INTEGER', 'hh_t_id': 'INTEGER', 'hh_before_qty': 'INTEGER', 'hh_after_qty': 'INTEGER'}}
    )
)
SELECT
  hh_h_t_id tradeid,
  hh_t_id currenttradeid,
  sk_customerid,
  sk_accountid,
  sk_securityid,
  sk_companyid,
  sk_closedateid sk_dateid,
  sk_closetimeid sk_timeid,
  tradeprice currentprice,
  hh_after_qty currentholding,
  h.batchid
FROM Holdings h
  JOIN wh_db.DimTrade dt 
    ON tradeid = hh_t_id;
     """
con.sql(FactHoldings)
save_sql_to_file(FactHoldings, "wh_db.FactHoldings")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.FactHoldings.sql'.


In [42]:
#replace table name
FactWatches = f""" 
INSERT INTO wh_db.FactWatches
WITH watchhistory AS (
    SELECT
        *,
        1 AS batchid
    FROM read_csv(
        'src/data/Batch1\\WatchHistory.txt',
        HEADER=FALSE,
        columns={{'w_c_id': 'BIGINT', 'w_s_symb': 'VARCHAR', 'w_dts': 'TIMESTAMP', 'w_action': 'VARCHAR'}}
    )
),
watches AS (
    SELECT
        wh.w_c_id AS customerid,
        wh.w_s_symb AS symbol,
        MIN(w_dts)::DATE AS dateplaced,
        CASE WHEN w_action = 'CNCL' THEN w_dts ELSE NULL END::DATE AS dateremoved,
        MIN(batchid) AS batchid
    FROM watchhistory wh
    GROUP BY ALL
)
select
  c.sk_customerid sk_customerid,
  s.sk_securityid sk_securityid,
  CAST(strftime(dateplaced, '%Y%m%d') AS BIGINT)  sk_dateid_dateplaced,
  CAST(strftime(dateremoved, '%Y%m%d') AS BIGINT) sk_dateid_dateremoved,
  wh.batchid 
from watches wh
JOIN wh_db.DimSecurity s 
  ON 
    s.symbol = wh.symbol
    AND wh.dateplaced >= s.effectivedate 
    AND wh.dateplaced < s.enddate
JOIN wh_db.DimCustomer c 
  ON
    wh.customerid = c.customerid
    AND wh.dateplaced >= c.effectivedate 
    AND wh.dateplaced < c.enddate;
"""
con.sql(FactWatches)
save_sql_to_file(FactWatches, "wh_db.FactWatches")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.FactWatches.sql'.


In [43]:

Financial = f""" 
INSERT INTO wh_db.Financial
WITH finwire_parsed AS (
    -- Parse the fixed-width data and extract relevant fields including conditional id/cname
    SELECT
        CAST(SUBSTRING(value FROM 1 FOR 4) AS INTEGER) AS fi_year,
        CAST(SUBSTRING(value FROM 5 FOR 1) AS INTEGER) AS fi_qtr,
        -- Explicitly cast strptime result to DATE if effectivedate/enddate are DATEs
        strptime(SUBSTRING(value FROM 6 FOR 8), '%Y%m%d')::DATE AS fi_qtr_start_date,
        CAST(SUBSTRING(value FROM 22 FOR 17) AS DOUBLE) AS fi_revenue,
        CAST(SUBSTRING(value FROM 39 FOR 17) AS DOUBLE) AS fi_net_earn,
        CAST(SUBSTRING(value FROM 56 FOR 12) AS DOUBLE) AS fi_basic_eps,
        CAST(SUBSTRING(value FROM 68 FOR 12) AS DOUBLE) AS fi_dilut_eps,
        CAST(SUBSTRING(value FROM 80 FOR 12) AS DOUBLE) AS fi_margin,
        CAST(SUBSTRING(value FROM 92 FOR 17) AS DOUBLE) AS fi_inventory,
        CAST(SUBSTRING(value FROM 109 FOR 17) AS DOUBLE) AS fi_assets,
        CAST(SUBSTRING(value FROM 126 FOR 17) AS DOUBLE) AS fi_liability,
        CAST(SUBSTRING(value FROM 143 FOR 13) AS BIGINT) AS fi_out_basic,
        CAST(SUBSTRING(value FROM 156 FOR 13) AS BIGINT) AS fi_out_dilut,
        -- Conditionally extract ID (as TEXT) or Name
        -- Ensure id and cname columns are mutually exclusive (one is NULL if other is not)
        CASE
            WHEN regexp_matches(SUBSTRING(value FROM -10), '^[0-9]+$')
            -- Remove leading zeros and trim; result is TEXT
            THEN trim(regexp_replace(SUBSTRING(value FROM -10), '^0+', ''))
            ELSE NULL
        END AS company_id_text,
        CASE
            WHEN NOT regexp_matches(SUBSTRING(value FROM -10), '^[0-9]+$')
            THEN trim(SUBSTRING(value FROM -60))
            ELSE NULL
        END AS company_name,
        -- Keep recdate for joining condition
        recdate
    FROM wh_db_stage.FinWire
    -- Filter record types early in the CTE
    WHERE rectype IN ('FIN_COMPANYID', 'FIN_NAME')
)
SELECT
    -- Select the SK from the successfully joined DimCompany record
    dc.sk_companyid,
    -- Select all parsed financial columns, excluding intermediate fields
    fp.* EXCLUDE (company_id_text, company_name, recdate)
FROM finwire_parsed AS fp
-- Use a single INNER JOIN (or LEFT JOIN if you need un-matched finwire rows)
JOIN wh_db.DimCompany AS dc
    -- Apply the date range filter first (can help with partitioning/pruning)
    ON fp.recdate >= dc.effectivedate
   AND fp.recdate < dc.enddate
    -- Apply the conditional join logic: match EITHER id OR name
   AND (
        (fp.company_id_text IS NOT NULL AND fp.company_id_text = dc.companyid::TEXT) -- Cast dc.companyid to TEXT if needed
        OR
        (fp.company_name IS NOT NULL AND fp.company_name = dc.name)
   );
"""

con.sql(Financial)
save_sql_to_file(Financial, "wh_db.Financial")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.Financial.sql'.


In [44]:
# Prospect
Prospect = """
INSERT INTO wh_db.Prospect
WITH cust AS (
    SELECT
        lastname,
        firstname,
        addressline1,
        addressline2,
        postalcode
    FROM wh_db.DimCustomer
    WHERE iscurrent = 'Y'
)
SELECT
    p.agencyid,
    CAST(STRFTIME(recdate.batchdate, '%Y%m%d') AS BIGINT) AS sk_recorddateid,
    CAST(STRFTIME(origdate.batchdate, '%Y%m%d') AS BIGINT) AS sk_updatedateid,
    p.batchid,
    CASE WHEN c.LastName IS NOT NULL THEN TRUE ELSE FALSE END AS iscustomer,
    p.lastname,
    p.firstname,
    p.middleinitial,
    p.gender,
    p.addressline1,
    p.addressline2,
    p.postalcode,
    city,
    state,
    country,
    phone,
    income,
    numbercars,
    numberchildren,
    maritalstatus,
    age,
    creditrating,
    ownorrentflag,
    employer,
    numbercreditcards,
    networth,
    marketingnameplate
FROM wh_db_stage.ProspectIncremental p
JOIN wh_db.BatchDate recdate
    ON p.recordbatchid = recdate.batchid
JOIN wh_db.BatchDate origdate
    ON p.batchid = origdate.batchid
LEFT JOIN cust c
    ON
         UPPER(p.LastName) = UPPER(c.lastname)
        AND UPPER(p.FirstName) = UPPER(c.firstname)
        AND UPPER(p.AddressLine1) = UPPER(c.addressline1)
        AND UPPER(COALESCE(p.addressline2, '')) = UPPER(COALESCE(c.addressline2, ''))
        AND UPPER(p.PostalCode) = UPPER(c.postalcode)
         ;
"""

con.sql(Prospect)
save_sql_to_file(Prospect, "wh_db.Prospect")

SQL query successfully written to 'C:\lopu-kg-test\project\src\main\sql_for_pipelines\1_wh_db.Prospect.sql'.


In [34]:
con_path = r"initial_db.duckdb"
con = duckdb.connect(con_path)
con.sql("CALL start_ui();")


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌──────────────────────────────────────┐
│                result                │
│               varchar                │
├──────────────────────────────────────┤
│ UI started at http://localhost:4213/ │
└──────────────────────────────────────┘

In [None]:
# Customer hakkab alles batch2-st?

src_folder = 'src/data/Batch2'
file_path = os.path.join(src_folder, "Customer.txt")
batch_number = int(''.join(filter(str.isdigit, os.path.basename(src_folder))))

columns = {
    "cdc_flag": "VARCHAR",
    "cdc_dsn": "BIGINT",
    "customerid": "BIGINT",
    "taxid": "VARCHAR",
    "status": "VARCHAR",
    "lastname": "VARCHAR",
    "firstname": "VARCHAR",
    "middleinitial": "VARCHAR",
    "gender": "VARCHAR",
    "tier": "TINYINT",
    "dob": "DATE",
    "addressline1": "VARCHAR",
    "addressline2": "VARCHAR",
    "postalcode": "VARCHAR",
    "city": "VARCHAR",
    "stateprov": "VARCHAR",
    "country": "VARCHAR",
    "c_ctry_1": "VARCHAR",
    "c_area_1": "VARCHAR",
    "c_local_1": "VARCHAR",
    "c_ext_1": "VARCHAR",
    "c_ctry_2": "VARCHAR",
    "c_area_2": "VARCHAR",
    "c_local_2": "VARCHAR",
    "c_ext_2": "VARCHAR",
    "c_ctry_3": "VARCHAR",
    "c_area_3": "VARCHAR",
    "c_local_3": "VARCHAR",
    "c_ext_3": "VARCHAR",
    "email1": "VARCHAR",
    "email2": "VARCHAR",
    "lcl_tx_id": "VARCHAR",
    "nat_tx_id": "VARCHAR"
}

df = con.sql(f"SELECT * FROM read_csv('{file_path}', columns = $columns)", params={"columns": columns}).df()

print(df.dtypes)
print(df.head())

cdc_flag                 object
cdc_dsn                   int64
customerid                int64
taxid                    object
status                   object
lastname                 object
firstname                object
middleinitial            object
gender                   object
tier                       int8
dob              datetime64[us]
addressline1             object
addressline2             object
postalcode               object
city                     object
stateprov                object
country                  object
c_ctry_1                 object
c_area_1                 object
c_local_1                object
c_ext_1                  object
c_ctry_2                 object
c_area_2                 object
c_local_2                object
c_ext_2                  object
c_ctry_3                 object
c_area_3                 object
c_local_3                object
c_ext_3                  object
email1                   object
email2                   object
lcl_tx_i

In [36]:
# peale batch ingestionit täita ka wh_db.BatchDate

src_folder = 'src/data/Batch1'
file_path = os.path.join(src_folder, "BatchDate.txt")
batch_number = int(''.join(filter(str.isdigit, os.path.basename(src_folder))))

con.sql(f"""
INSERT INTO wh_db.BatchDate
SELECT batchdate::DATE,
    {batch_number} AS batchid 
    FROM read_csv_auto('{file_path}', columns={{
    "batchdate": "DATE"
}}, header=False);
""")

In [45]:
con.commit()

<duckdb.duckdb.DuckDBPyConnection at 0x25ea65385b0>

In [45]:
template_name = "llm_prompt_for_column_level_lineage_easy"
llm_answers_dir = f"C:\lopu-kg-test\project\src\LLM_answers/{template_name}"
os.makedirs(llm_answers_dir, exist_ok=True)

In [46]:
con = duckdb.connect(database=db_path)
#con.sql("select * from sqlite_temp_schema ")
con.sql("select column_name from information_schema.columns where table_name = 'ProspectIncremental'").df()

Unnamed: 0,column_name
0,agencyid
1,lastname
2,firstname
3,middleinitial
4,gender
5,addressline1
6,addressline2
7,postalcode
8,city
9,state


In [None]:
print(con.execute("select table_name, COLUMN_COMMENT from information_schema.columns").fetchdf())

    COLUMN_COMMENT
0             None
1             None
2             None
3             None
4             None
..             ...
421           None
422           None
423           None
424           None
425           None

[426 rows x 1 columns]


1. Information schemast tabelid ja skeemad sisse
2. Data pipelineid ja nende logimine, et pärast võrrelda LLM-i tulemusi päris tulemustega.
3. 

Küsimused:
1. Kas a la risk metric summa on alati numbriline väärtus?
2. Meil oleks vaja luua kontroll, et statustype ei tohi olla D, milliseid samme peaksime muutma?