#### `Imports`

In [1]:
import pandas as pd
import glob
import json
from functools import reduce, partial
from sql_metadata import Parser
import sqlparse
import re
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

#### `MAIN extract_queries_and_meta_data`

#### `Methods`

In [13]:
extract_queries_and_meta_data()

2025-04-07 11:14:30,599 - INFO - Started extract_queries_and_meta_data.
2025-04-07 11:14:30,601 - INFO - Started read_in_excel_files from path ./1_migrated_excel_queries/test.xlsx.
2025-04-07 11:14:30,616 - INFO - Successfully read 1 excel file(s) from ./1_migrated_excel_queries/test.xlsx.
2025-04-07 11:14:30,617 - INFO - Started process_query_df.
2025-04-07 11:14:30,618 - INFO - Started preprocess_query_df for migrated queries.
2025-04-07 11:14:32,561 - INFO - Preprocessing of migrated queries successful.
2025-04-07 11:14:32,563 - INFO - Started write_queries_to_json for migrated queries.
2025-04-07 11:14:32,592 - INFO - Queries successfully written to JSON-file.
2025-04-07 11:14:32,592 - INFO - Started extract_query_meta_data from migrated queries.
2025-04-07 11:14:32,594 - INFO - Meta data from migrated queries successfully extracted.
2025-04-07 11:14:32,594 - INFO - Started extract_query_tables from migrated queries.
2025-04-07 11:14:33,119 - INFO - Tables from migrated queries suc

##### `extract_queries_and_meta_data`

In [2]:
def extract_queries_and_meta_data():
    logging.info(f"Started {extract_queries_and_meta_data.__name__}.")

    # Migrated Queries
    MIGRATED_QUERY_SOURCE_PATH = "./1_migrated_excel_queries/test.xlsx"
    MIGRATED_QUERY_TARGET_PATH = "./3_extracted_queries/migrated_queries"
    MIGRATED_QUERY_IDENTIFIER = "migrated"

    migrated_query_df = read_in_excel_files(MIGRATED_QUERY_SOURCE_PATH)
    return process_query_df(migrated_query_df, MIGRATED_QUERY_TARGET_PATH, MIGRATED_QUERY_IDENTIFIER)

    # New Queries
    # NEW_QUERY_SOURCE_PATH = "./2_new_excel_queries/*.xlsx"
    # NEW_QUERY_TARGET_PATH = "./3_extracted_queries/new_queries"
    # NEW_QUERY_IDENTIFIER = "new"

    # new_query_df = read_in_excel_files(NEW_QUERY_SOURCE_PATH)
    # process_query_df(new_query_df, NEW_QUERY_TARGET_PATH, NEW_QUERY_IDENTIFIER)

##### `read_in_excel_files`

In [3]:
def read_in_excel_files(PATH):
    """
    Reads all Excel files from the specified directory path and concatenates them into a single DataFrame.

    Parameters:
    PATH (str): The directory path where the Excel files are located. Supports glob patterns.

    Returns:
    pd.DataFrame: A concatenated DataFrame containing the data from all the Excel files, with missing values filled with "NA".
    """
    logging.info(f"Started {read_in_excel_files.__name__} from path {PATH}.")
    try:
        excel_data = glob.glob(PATH)

        dataframes = [pd.read_excel(data, engine="openpyxl") for data in excel_data]
        query_df = pd.concat(dataframes, ignore_index=True)

        logging.info(f"Successfully read {len(excel_data)} excel file(s) from {PATH}.")

    except Exception as e:
        logging.error(f"Error in reading excel files: {e}")

    return query_df.fillna("NA")

##### `process_query_df`

In [4]:
def process_query_df(query_df, QUERY_TARGET_PATH, QUERY_IDENTIFIER):
    """
    Processes the query DataFrame by applying a series of transformations and extracting metadata.

    Parameters:
    query_df (pd.DataFrame): The DataFrame containing query data to be processed.
    QUERY_TARGET_PATH (str): The target path where the processed query data will be written as JSON.
    QUERY_IDENTIFIER (str): A unique identifier for the queries being processed.

    Returns:
    None

    Steps:
    1. Apply preprocessing and write queries to JSON using partial functions.
    2. Extract query metadata, tables, and columns from the processed DataFrame.
    3. Write the extracted metadata, tables, and columns to JSON files.

    Example:
    >>> process_query_df(query_df, "/path/to/target", "example_query")
    """
    logging.info(f"Started {process_query_df.__name__}.")

    query_df_processed = reduce(
        lambda accu, func: func(accu),
        [
            partial(
                preprocess_query_df,
                QUERY_IDENTIFIER=QUERY_IDENTIFIER
            ),
            partial(
                write_queries_to_json,
                QUERY_TARGET_PATH=QUERY_TARGET_PATH,
                QUERY_IDENTIFIER=QUERY_IDENTIFIER
            ),
        ],
        query_df
    )

    query_meta_data = extract_query_meta_data(query_df_processed, QUERY_IDENTIFIER)

    query_tables = extract_query_tables(query_df_processed, QUERY_IDENTIFIER)

    query_columns = extract_query_columns(query_df_processed, QUERY_IDENTIFIER)

    write_query_data_to_json(query_meta_data, query_tables, query_columns, QUERY_IDENTIFIER)

    logging.info(f"Completed {process_query_df.__name__} for {QUERY_IDENTIFIER} queries.")

##### `preprocess_query_df`

In [5]:
def preprocess_query_df(query_df, QUERY_IDENTIFIER):
    """
    Preprocesses a DataFrame containing SQL queries by performing several cleaning and formatting operations.

    Parameters:
    query_df (pd.DataFrame): DataFrame containing SQL queries and report names.
    QUERY_IDENTIFIER (str): Identifier for the type of queries being processed.

    Returns:
    pd.DataFrame: A cleaned and formatted DataFrame with duplicate rows removed.

    The preprocessing steps include:
    - Dropping rows with missing SQL queries and filling other missing values with empty strings.
    - Replacing special characters in SQL queries with their ASCII equivalents.
    - Formatting SQL queries to uppercase, reindenting, removing comments, and applying specific regex patterns.
    - Cleaning the 'Report Name' column using the `clean_string` function.
    - Removing duplicate rows from the DataFrame.
    """
    logging.info(f"Started {preprocess_query_df.__name__} for {QUERY_IDENTIFIER} queries.")

    query_df = query_df.dropna(subset=["SQL"]).fillna("")

    query_df["SQL"] = (query_df["SQL"]
                       .str.replace('ê', 'e').str.replace('é', 'e').str.replace('è', 'e').str.replace('à', 'a').str.replace('ç', 'c')
                       .str.replace('ô', 'o').str.replace('û', 'u').str.replace('ù', 'u').str.replace('î', 'i').str.replace('ï', 'i')
                       .str.replace('â', 'a').str.replace('ä', 'a').str.replace('ö', 'o').str.replace('ü', 'u').str.replace('ÿ', 'y')
                       .str.replace('ñ', 'n').str.replace('É', 'E').str.replace('È', 'E').str.replace('À', 'A').str.replace('Ç', 'C')
                       .str.replace('Ô', 'O').str.replace('Û', 'U').str.replace('Ù', 'U').str.replace('Î', 'I').str.replace('Ï', 'I')
                       .str.replace('Â', 'A').str.replace('Ä', 'A').str.replace('Ö', 'O').str.replace('Ü', 'U').str.replace('Ÿ', 'Y')
                       .str.replace('Ñ', 'N')
    )

    pattern_1 = r'WITH\s+"\w+"\s+AS\s*\(.*?\)\s*SELECT'
    pattern_2 = r'"[^"]*"\.'

    for index, query in enumerate(query_df["SQL"]):
        formatted_query = query.upper()
        formatted_query = sqlparse.format(formatted_query, reindent=True, keyword_case='upper', strip_comments=True).strip()
        formatted_query = re.sub(pattern_1, 'SELECT', formatted_query, flags=re.DOTALL)
        formatted_query = re.sub(pattern_2, '', formatted_query)
        query_df.at[index, 'SQL'] = formatted_query

    query_df["Report Name"] = query_df["Report Name"].apply(clean_string)

    logging.info(f"Preprocessing of {QUERY_IDENTIFIER} queries successful.")

    return query_df.drop_duplicates()

##### `clean_string`

In [6]:
def clean_string(string):
    """
    Cleans a given string by removing specific characters and extra whitespace, and converting it to uppercase.

    Parameters:
    string (str): The input string to be cleaned.

    Returns:
    str: The cleaned and formatted string.

    The cleaning steps include:
    - Removing characters specified in the pattern: square brackets, hash, asterisk, plus, underscore, dot, and digits.
    - Stripping leading and trailing whitespace.
    - Replacing multiple spaces with a single space.
    - Converting the string to uppercase and stripping any remaining leading or trailing whitespace.
    """
    pattern = r'[\[\]#*+_.\d]'
    cleaned_string = re.sub(pattern, ' ', string)
    cleaned_string = cleaned_string.strip()
    cleaned_string = re.sub(r'\s+', ' ', cleaned_string)
    return cleaned_string.upper().strip()

##### `write_queries_to_json`

In [7]:
def write_queries_to_json(query_df, QUERY_TARGET_PATH, QUERY_IDENTIFIER):
    """
    Writes SQL queries from a DataFrame to individual JSON files.

    Parameters:
    query_df (pd.DataFrame): DataFrame containing SQL queries.
    QUERY_TARGET_PATH (str): Path where the JSON files will be saved.
    QUERY_IDENTIFIER (str): Identifier for the type of queries being processed.

    Returns:
    pd.DataFrame: The original DataFrame after attempting to write queries to JSON files.

    The writing steps include:
    - Iterating over the SQL queries in the DataFrame.
    - Writing each query to a separate JSON file named with the QUERY_IDENTIFIER and query index.
    """
    logging.info(f"Started {write_queries_to_json.__name__} for {QUERY_IDENTIFIER} queries.")

    for index, query in enumerate(query_df["SQL"]):
        try:
            with open(f"{QUERY_TARGET_PATH}/{QUERY_IDENTIFIER}_query_{index}.sql", "w", encoding='utf-8') as file:
                file.write(query)

        except Exception as e:
            logging.error(f"Error writing query {index} to JSON-file: {e}")

    logging.info(f"Queries successfully written to JSON-file.")

    return query_df

##### `extract_query_meta_data`

In [8]:
def extract_query_meta_data(query_df, QUERY_IDENTIFIER):
    """
    Extracts metadata from a DataFrame containing SQL queries and returns it as a dictionary.

    Parameters:
    query_df (pd.DataFrame): DataFrame containing SQL queries and associated metadata.
    QUERY_IDENTIFIER (str): Identifier for the type of queries being processed.

    Returns:
    dict: A dictionary containing the extracted metadata for each query.

    The extraction steps include:
    - Iterating over the rows of the DataFrame.
    - Extracting and converting to uppercase the values for 'Datasource', 'Product Name', 'Report Name', and 'Report Path'.
    - Using 'nA' as a default value if any of these fields are missing.
    - Storing the extracted metadata in a dictionary with keys formatted as 'report_{index}'.
    """
    logging.info(f"Started {extract_query_meta_data.__name__} from {QUERY_IDENTIFIER} queries.")

    meta_data = {}
    for index, (idx, row) in enumerate(query_df.iterrows()):
        if "Datasource" not in row:
            datasource = "NA"
        else:
            datasource = row["Datasource"].upper()
        if "Product Name" not in row:
            product_name = "NA"
        else:
            product_name = row["Product Name"].upper()
        if "Report Name" not in row:
            report_name = "NA"
        else:
            report_name = row["Report Name"].upper()
        if "Report Path" not in row:
            report_path = "NA"
        else:
            report_path = row["Report Path"].upper()

        meta_data[report_name] = {
                f"product_name": product_name,
                f"report_path": report_path,
                f"datasource": datasource,
            }

    logging.info(f"Meta data from {QUERY_IDENTIFIER} queries successfully extracted.")

    return meta_data

##### `extract_query_tables`

In [9]:
def extract_query_tables(query_df, QUERY_IDENTIFIER):
    """
    Extracts table names from SQL queries in a DataFrame and returns them in a structured format.

    Parameters:
    query_df (pd.DataFrame): DataFrame containing SQL queries.
    QUERY_IDENTIFIER (str): Identifier for the type of queries being processed.

    Returns:
    list: A list of lists, where each inner list contains two lists:
          - The first list contains the raw table names extracted from the SQL queries.
          - The second list contains the cleansed table names (i.e., without schema prefixes).

    The extraction steps include:
    - Iterating over the SQL queries in the DataFrame.
    - Using a SQL parser to extract table names from each query.
    - Converting table names to uppercase and filtering out invalid table names.
    - Cleansing table names by removing schema prefixes.
    - Sorting the raw and cleansed table names.
    - Appending the sorted lists to the `extracted_tables` list.
    """
    logging.info(f"Started {extract_query_tables.__name__} from {QUERY_IDENTIFIER} queries.")

    INVALID_TABLE_NAMES = [
        "BOTH",
        "TRIM",
        "SUBSTR",
        "SUM",
        "CAST",
        "CASE",
        "TRUNC",
        "CONCAT",
        "TO_CHAR",
        "ROUND",
        "DISTINCT",
        "TRAILING",
        "LENGTH",
        "BOTH"
    ]

    extracted_tables = []

    for index, query in query_df["SQL"].items():
        try:
            raw_query_tables = []
            raw_query_tables_cleansed = []

            parser = Parser(query)
            raw_query_tables = parser.tables
            raw_query_tables = [table.upper() for table in raw_query_tables if table.upper() not in INVALID_TABLE_NAMES]
            raw_query_tables_cleansed = [table.split(".")[-1] for table in raw_query_tables]

            raw_query_tables.sort()
            raw_query_tables_cleansed.sort()

            extracted_tables.append([raw_query_tables, raw_query_tables_cleansed])

        except Exception as e:
            logging.error(f"Error extracting table {query}: {e}")

    logging.info(f"Tables from {QUERY_IDENTIFIER} queries successfully extracted.")

    return extracted_tables

##### `extract_query_columns`

In [10]:
def extract_query_columns(query_df, QUERY_IDENTIFIER):
    """
    Extracts column names from SQL queries in a DataFrame and returns them in a structured format.

    Parameters:
    query_df (pd.DataFrame): DataFrame containing SQL queries.
    QUERY_IDENTIFIER (str): Identifier for the type of queries being processed.

    Returns:
    list: A list of lists, where each inner list contains two lists:
          - The first list contains the raw column names extracted from the SQL queries.
          - The second list contains the cleansed column names (i.e., without table prefixes).

    The extraction steps include:
    - Iterating over the SQL queries in the DataFrame.
    - Using a SQL parser to extract column names from each query.
    - Cleansing column names by removing table prefixes.
    - Sorting and deduplicating the raw and cleansed column names.
    - Appending the sorted lists to the `extracted_columns` list.
    """
    logging.info(f"Started {extract_query_columns.__name__} from {QUERY_IDENTIFIER} queries.")

    extracted_columns = []
    error_queries = []

    for index, query in query_df["SQL"].items():
        try:
            processed_query_columns = []
            processed_query_columns_cleansed = []

            parser = Parser(query)
            processed_query_columns = parser.columns
            processed_query_columns_cleansed = [col.split(".")[-1] for col in processed_query_columns]

            processed_query_columns = sorted(set(processed_query_columns))
            processed_query_columns_cleansed = sorted(set(processed_query_columns_cleansed))

            extracted_columns.append([processed_query_columns, processed_query_columns_cleansed])

        except Exception as e:
            logging.error(f"Error extracting columns in query at index {index}: {e}.")
            error_queries.append({'query_identifier': QUERY_IDENTIFIER, 'index': index, 'query': query, 'error': str(e)})

    logging.info(f"Columns from {QUERY_IDENTIFIER} queries successfully extracted.")

    return extracted_columns

##### `write_query_data_to_json`

In [11]:
def write_query_data_to_json(meta_data, tables, columns, QUERY_IDENTIFIER):
    """
    Writes query metadata, tables, and columns to a JSON file.

    Parameters:
    meta_data (dict): Dictionary containing metadata for each query.
    tables (list): List of lists containing table names and cleansed table names for each query.
    columns (list): List of lists containing column names and cleansed column names for each query.
    QUERY_IDENTIFIER (str): Identifier for the type of queries being processed.

    Returns:
    None

    The writing steps include:
    - Iterating over the metadata dictionary using report names as keys.
    - Updating the metadata with table and column information.
    - Writing the updated metadata to a JSON file at the specified path.
    """
    logging.info(f"Started {write_query_data_to_json.__name__} for {QUERY_IDENTIFIER} queries.")

    JSON_PATH = f"./4_json_results/{QUERY_IDENTIFIER}_query_data.json"

    for report_name, data in meta_data.items():
        index = list(meta_data.keys()).index(report_name)
        data["tables"] = tables[index][0]
        data["tables_cleansed"] = tables[index][1]
        data["columns"] = columns[index][0]
        data["columns_cleansed"] = columns[index][1]

    with open(JSON_PATH, "w", encoding="utf-8") as outfile:
        json.dump(meta_data, outfile, indent=4, ensure_ascii=False)

    logging.info(f"{len(meta_data.keys())} queries successfully written to JSON-file. Path: {JSON_PATH}")