In [1]:
import pandas as pd
import glob
import json
from functools import reduce, partial
from sql_metadata import Parser
import sqlparse
import re
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [80]:
extract_queries_and_meta_data()

2025-04-02 10:47:01,580 - INFO - Started extract_queries_and_meta_data.
2025-04-02 10:47:01,594 - INFO - Started read_in_excel_files from path ./1_migrated_excel_queries/test.xlsx.


2025-04-02 10:47:01,954 - INFO - Successfully read 1 excel file(s) from ./1_migrated_excel_queries/test.xlsx.
2025-04-02 10:47:01,956 - INFO - Started process_query_df.
2025-04-02 10:47:01,958 - INFO - Started preprocess_query_df for migrated queries.
2025-04-02 10:47:03,123 - INFO - Preprocessing of migrated queries successful.
2025-04-02 10:47:03,123 - INFO - Started write_queries_to_json for migrated queries.
2025-04-02 10:47:03,149 - INFO - Queries successfully written to JSON-file.
2025-04-02 10:47:03,151 - INFO - Started extract_query_meta_data from migrated queries.
2025-04-02 10:47:03,154 - INFO - Meta data from migrated queries successfully extracted.
2025-04-02 10:47:03,155 - INFO - Started extract_query_tables from migrated queries.
2025-04-02 10:47:03,652 - INFO - Tables from migrated queries successfully extracted.
2025-04-02 10:47:03,652 - INFO - Started extract_query_columns from migrated queries.
2025-04-02 10:47:04,135 - INFO - Columns from migrated queries successfull

##### `extract_queries_and_meta_data`

In [79]:
def extract_queries_and_meta_data():
    logging.info(f"Started {extract_queries_and_meta_data.__name__}.")

    # Migrated Queries
    MIGRATED_QUERY_SOURCE_PATH = "./1_migrated_excel_queries/test.xlsx"
    MIGRATED_QUERY_TARGET_PATH = "./3_extracted_queries/migrated_queries"
    MIGRATED_QUERY_IDENTIFIER = "migrated"

    migrated_query_df = read_in_excel_files(MIGRATED_QUERY_SOURCE_PATH)
    process_query_df(migrated_query_df, MIGRATED_QUERY_TARGET_PATH, MIGRATED_QUERY_IDENTIFIER)

    # New Queries
    # NEW_QUERY_SOURCE_PATH = "./2_new_excel_queries/*.xlsx"
    # NEW_QUERY_TARGET_PATH = "./3_extracted_queries/new_queries"
    # NEW_QUERY_IDENTIFIER = "new"

    # new_query_df = read_in_excel_files(NEW_QUERY_SOURCE_PATH)
    # process_query_df(new_query_df, NEW_QUERY_TARGET_PATH, NEW_QUERY_IDENTIFIER)

##### `read_in_excel_files`

In [3]:
def read_in_excel_files(PATH):
    logging.info(f"Started {read_in_excel_files.__name__} from path {PATH}.")
    try:
        excel_data = glob.glob(PATH)

        dataframes = [pd.read_excel(data, engine="openpyxl") for data in excel_data]
        query_df = pd.concat(dataframes, ignore_index=True)

        logging.info(f"Successfully read {len(excel_data)} excel file(s) from {PATH}.")

    except Exception as e:
        logging.error(f"Error in reading excel files: {e}")

    return query_df

##### `process_query_df`

In [72]:
def process_query_df(query_df, QUERY_TARGET_PATH, QUERY_IDENTIFIER):
    logging.info(f"Started {process_query_df.__name__}.")

    query_df_processed = reduce(
        lambda accu, func: func(accu),
        [
            partial(
                preprocess_query_df,
                QUERY_IDENTIFIER=QUERY_IDENTIFIER
            ),
            partial(
                write_queries_to_json,
                QUERY_TARGET_PATH=QUERY_TARGET_PATH,
                QUERY_IDENTIFIER=QUERY_IDENTIFIER
            ),
        ],
        query_df
    )

    query_meta_data = extract_query_meta_data(query_df_processed, QUERY_IDENTIFIER)

    query_tables = extract_query_tables(query_df_processed, QUERY_IDENTIFIER)

    query_columns = extract_query_columns(query_df_processed, QUERY_IDENTIFIER)

    write_query_data_to_json(query_meta_data, query_tables, query_columns, QUERY_IDENTIFIER)

    logging.info(f"Completed {process_query_df.__name__} for {QUERY_IDENTIFIER} queries.")

##### `preprocess_query_df`

In [None]:
def preprocess_query_df(query_df, QUERY_IDENTIFIER):
    logging.info(f"Started {preprocess_query_df.__name__} for {QUERY_IDENTIFIER} queries.")

    query_df = query_df.dropna(subset=["SQL"]).fillna("")

    query_df["SQL"] = (query_df["SQL"]
                       .str.replace('ê', 'e').str.replace('é', 'e').str.replace('è', 'e').str.replace('à', 'a').str.replace('ç', 'c')
                       .str.replace('ô', 'o').str.replace('û', 'u').str.replace('ù', 'u').str.replace('î', 'i').str.replace('ï', 'i')
                       .str.replace('â', 'a').str.replace('ä', 'a').str.replace('ö', 'o').str.replace('ü', 'u').str.replace('ÿ', 'y')
                       .str.replace('ñ', 'n').str.replace('É', 'E').str.replace('È', 'E').str.replace('À', 'A').str.replace('Ç', 'C')
                       .str.replace('Ô', 'O').str.replace('Û', 'U').str.replace('Ù', 'U').str.replace('Î', 'I').str.replace('Ï', 'I')
                       .str.replace('Â', 'A').str.replace('Ä', 'A').str.replace('Ö', 'O').str.replace('Ü', 'U').str.replace('Ÿ', 'Y')
                       .str.replace('Ñ', 'N')
    )

    pattern_1 = r'WITH\s+"\w+"\s+AS\s*\(.*?\)\s*SELECT'
    pattern_2 = r'"[^"]*"\.'

    for index, query in enumerate(query_df["SQL"]):
        formatted_query = query.upper()
        formatted_query = sqlparse.format(formatted_query, reindent=True, keyword_case='upper', strip_comments=True).strip()
        formatted_query = re.sub(pattern_1, 'SELECT', formatted_query, flags=re.DOTALL)
        formatted_query = re.sub(pattern_2, '', formatted_query)
        query_df.at[index, 'SQL'] = formatted_query

    logging.info(f"Preprocessing of {QUERY_IDENTIFIER} queries successful.")

    return query_df

##### `write_queries_to_json`

In [6]:
def write_queries_to_json(query_df, QUERY_TARGET_PATH, QUERY_IDENTIFIER):
    logging.info(f"Started {write_queries_to_json.__name__} for {QUERY_IDENTIFIER} queries.")

    for index, query in enumerate(query_df["SQL"]):
        try:
            with open(f"{QUERY_TARGET_PATH}/{QUERY_IDENTIFIER}_query_{index}.sql", "w", encoding='utf-8') as file:
                file.write(query)

        except Exception as e:
            logging.error(f"Error writing query {index} to JSON-file: {e}")

    logging.info(f"Queries successfully written to JSON-file.")

    return query_df

##### `extract_query_meta_data`

In [77]:
def extract_query_meta_data(query_df, QUERY_IDENTIFIER):
    logging.info(f"Started {extract_query_meta_data.__name__} from {QUERY_IDENTIFIER} queries.")

    meta_data = {}
    for index, (idx, row) in enumerate(query_df.iterrows()):
        if "Datasource" not in row:
            datasource = "nA"
        else:
            datasource = row["Datasource"].upper()
        if "Product Name" not in row:
            product_name = "nA"
        else:
            product_name = row["Product Name"].upper()
        if "Report Name" not in row:
            report_name = "nA"
        else:
            report_name = row["Report Name"].upper()
        if "Report Path" not in row:
            report_path = "nA"
        else:
            report_path = row["Report Path"].upper()

        meta_data[f"report_{index}"] = {
                f"report_name": report_name,
                f"product_name": product_name,
                f"report_path": report_path,
                f"datasource": datasource,
            }

    logging.info(f"Meta data from {QUERY_IDENTIFIER} queries successfully extracted.")

    return meta_data

##### `extract_query_tables`

In [8]:
def extract_query_tables(query_df, QUERY_IDENTIFIER):
    logging.info(f"Started {extract_query_tables.__name__} from {QUERY_IDENTIFIER} queries.")

    INVALID_TABLE_NAMES = [
        "BOTH",
        "TRIM",
        "SUBSTR",
        "SUM",
        "CAST",
        "CASE",
        "TRUNC",
        "CONCAT",
        "TO_CHAR",
        "ROUND",
        "DISTINCT",
        "TRAILING",
        "LENGTH",
        "BOTH"
    ]

    extracted_tables = []

    for index, query in query_df["SQL"].items():
        try:
            raw_query_tables = []
            raw_query_tables_cleansed = []

            parser = Parser(query)
            raw_query_tables = parser.tables
            raw_query_tables = [table.upper() for table in raw_query_tables if table.upper() not in INVALID_TABLE_NAMES]
            raw_query_tables_cleansed = [table.split(".")[-1] for table in raw_query_tables]

            raw_query_tables.sort()
            raw_query_tables_cleansed.sort()

            extracted_tables.append([raw_query_tables, raw_query_tables_cleansed])

            # logging.info(f"Extracted tables from query at index {index}.")

        except Exception as e:
            logging.error(f"Error extracting table {query}: {e}")

    logging.info(f"Tables from {QUERY_IDENTIFIER} queries successfully extracted.")

    return extracted_tables

##### `extract_query_columns`

In [None]:
def extract_query_columns(query_df, QUERY_IDENTIFIER):
    logging.info(f"Started {extract_query_columns.__name__} from {QUERY_IDENTIFIER} queries.")

    extracted_columns = []
    error_queries = []

    for index, query in query_df["SQL"].items():
        try:
            processed_query_columns = []
            processed_query_columns_cleansed = []

            parser = Parser(query)
            processed_query_columns = parser.columns
            processed_query_columns_cleansed = [col.split(".")[-1] for col in processed_query_columns]

            processed_query_columns = sorted(set(processed_query_columns))
            processed_query_columns_cleansed = sorted(set(processed_query_columns_cleansed))

            extracted_columns.append([processed_query_columns, processed_query_columns_cleansed])

        except Exception as e:
            logging.error(f"Error extracting columns in query at index {index}: {e}.")
            error_queries.append({'query_identifier': QUERY_IDENTIFIER, 'index': index, 'query': query, 'error': str(e)})

    logging.info(f"Columns from {QUERY_IDENTIFIER} queries successfully extracted.")

    return extracted_columns

##### `write_query_data_to_json`

In [10]:
def write_query_data_to_json(meta_data, tables, columns, QUERY_IDENTIFIER):
    logging.info(f"Started {write_query_data_to_json.__name__} for {QUERY_IDENTIFIER} queries.")

    JSON_PATH = f"./4_json_results/{QUERY_IDENTIFIER}_query_data.json"

    for index, (table, column) in enumerate(zip(tables, columns)):
        meta_data[f"report_{index}"][f"tables"] = table[0]
        meta_data[f"report_{index}"][f"tables_cleansed"] = table[1]
        meta_data[f"report_{index}"][f"columns"] = column[0]
        meta_data[f"report_{index}"][f"columns_cleansed"] = column[1]

    with open(JSON_PATH, "w", encoding="utf-8") as outfile:
        json.dump(meta_data, outfile, indent=4, ensure_ascii=False)

    logging.info(f"{len(meta_data.keys())} queries successfully written to JSON-file. Path: {JSON_PATH}")