In [546]:
import xml.etree.ElementTree as ET
import logging
import re
import os
import pygraphviz as pgv
from pydantic import BaseModel
from typing import Any, Dict, List, Tuple, TypedDict, Annotated
from textwrap import dedent
from crewai_tools import tool, FileWriterTool
from crewai import Agent, Crew, Process, Task
from langchain_ollama.llms import OllamaLLM
from langchain_groq import ChatGroq
from langchain_google_genai import ChatGoogleGenerativeAI

In [547]:
pentaho_file_path = '../data/benef_transf.ktr'

In [548]:
def parse_ktr_file(file_path) -> ET.Element:
    try:
        tree = ET.parse(file_path)
        root = tree.getroot()
    except ET.ParseError as e:
        logging.error(f"Error parsing KTR file: {e}")
        raise
    return root

In [549]:
def extract_execution_sequence(root: ET.Element) -> List[Tuple[str, str]]:
    hops = []
    for hop in root.findall('.//hop'):
        from_step = hop.find('from').text
        to_step = hop.find('to').text
        enabled = hop.find('enabled').text
        if enabled == 'Y':
            hops.append((from_step, to_step))
    
    return hops    

In [550]:
def create_flow_graph(hops: List[Tuple[str, str]], output_file: str = "flow_graph.png"):
    graph = pgv.AGraph(directed=True)
    
    for from_step, to_step in hops:
        #print(f'from: {from_step} to {to_step}')

        graph.add_node(from_step, shape="box")
        graph.add_node(to_step, shape="box")
        graph.add_edge(from_step, to_step)
    
    graph.layout(prog='dot')
    graph.draw(output_file)


    logging.info(f"Data flow saved in {output_file}")

In [551]:
def extract_sql_queries(steps: List[Tuple[str, str]]) -> Dict[str, Any]:
    active_steps = set(steps for seq in state["sequence"] for step in seq)
    sql_steps = state["root"].findall(".//step")
    queries = []
    
    for step in sql_steps:
        step_name = step.find("name").text
        step_type = step.find("type").text
        if step_type in ['TableInput', 'DBJoin'] and step_name in active_steps:
            sql_element = step.find("sql")
            if sql_element is not None and sql_element.text:
                queries.append({
                    "step_name": step_name,
                    "step_type": step_type,
                    "sql": sql_element.text
                })
    
    state["queries"] = queries
    return state

In [552]:
root = parse_ktr_file(pentaho_file_path)

In [553]:
hops = extract_execution_sequence(root)

In [554]:
active_steps = []
for i in hops:
    active_steps.append(i[0])

In [555]:
list(dict.fromkeys(active_steps))

['Switch / Case LOCAL FATURAMENTO',
 'Switch / Case PESSOA RESPONSÁVEL',
 'Switch / Case TITULAR RESPONSÁVEL',
 'Dummy (do nothing)',
 'Remover colunas',
 'BENEFICIÁRIO',
 'SAM_FAMILIA_TETO_PF',
 'SEM SETOR',
 'Insert / Update - BN_BENEFICIARIO',
 'Filter rows',
 'BUSCA MICROSIGA',
 'HANDLE_BENEFICIARIO',
 'Blocking Step',
 'QTD_INCATU_BN_BENEFICIARIO',
 'Blocking Step 2',
 'CONTRATO',
 'FAMILIA PESSOA RESPONSÁVEL',
 'FAMILIA TITULAR RESPONSÁVEL',
 'Insert / Update - BN_RESP_FINANCEIRO',
 'LOTAÇÃO',
 'QTD_INCATU_BN_RESP_FINANCEIRO']

## Testes

In [556]:
class KTRTransformation():
    def __init__(self):
        self.name: str = ""
        self.order: List[str] = []
        self.steps: List[Step] = []

class Step:
    def __init__(self):
        self.name: str = ""
        self.type: str = ""

class SQLStep(Step):
    def __init__(self):
        super().__init__()
        self.sql: str = ""
        self.parameters: List[Parameter] = []

class Parameter:
    def __init__(self):
        self.name: str = ""
        self.type: str = ""

class FilterStep(Step):
    def __init__(self):
        super().__init__()
        self.send_true_to: str = ""
        self.send_false_to: str = ""
        self.conditions: List[Condition] = []

class Condition:
    def __init__(self):
        self.leftvalue: str = ""
        self.operator: str = ""
        self.rightvalue: str = ""

class InsertUpdateStep(Step):
    def __init__(self):
        super().__init__()
        self.table: str = ""
        self.keys: List[KeyValue] = []
        self.values: List[KeyValue] = []

class KeyValue:
    def __init__(self):
        self.name: str = ""
        self.field: str = ""

In [557]:
def parse_ktr_file(file_path: str) -> KTRTransformation:
    try:
        tree = ET.parse(file_path)
        root = tree.getroot()
    except ET.ParseError as e:
        logging.error(f"Error parsing KTR file: {e}")
        raise

    transformation = KTRTransformation()
    
    # Extract transformation name
    transformation.name = root.find("info/name").text

    # Extract order and set active steps
    active_steps = set()
    order_element = root.find("order")
    if order_element is not None:
        for hop in order_element.findall("hop"):
            from_step = hop.find("from").text
            to_step = hop.find("to").text
            transformation.order.append((from_step, to_step))
            active_steps.add(from_step)
            active_steps.add(to_step)            

    # Extract steps
    for step_element in root.findall("step"):
        step_name = step_element.find("name").text
        step_type = step_element.find("type").text

        # Only process steps that are in the active_steps set
        if step_name not in active_steps:
            continue

        if step_type == "DBJoin" or step_type == "TableInput":
            step = SQLStep()
            step.name = step_name
            step.type = step_type
            step.sql = step_element.find("sql").text
            for param in step_element.findall("parameter/field"):
                parameter = Parameter()
                parameter.name = param.find("name").text
                parameter.type = param.find("type").text
                step.parameters.append(parameter)

        elif step_type == "FilterRows":
            step = FilterStep()
            step.name = step_name
            step.type = step_type
            step.send_true_to = step_element.find("send_true_to").text
            step.send_false_to = step_element.find("send_false_to").text
            for condition in step_element.findall("conditions"):
                cond = Condition()
                cond.leftvalue = condition.find("leftvalue").text
                cond.operator = condition.find("function").text
                cond.rightvalue = condition.find("value/text").text
                step.conditions.append(cond)

        elif step_type == "InsertUpdate":
            step = InsertUpdateStep()
            step.name = step_name
            step.type = step_type
            step.table = step_element.find("lookup/table").text
            for key in step_element.findall("lookup/key"):
                kv = KeyValue()
                kv.name = key.find("name").text
                kv.field = key.find("field").text
                step.keys.append(kv)
            for value in step_element.findall("lookup/value"):
                kv = KeyValue()
                kv.name = value.find("name").text
                kv.field = value.find("rename").text
                step.values.append(kv)

        else:
            step = Step()
            step.name = step_name
            step.type = step_type

        transformation.steps.append(step)

    return transformation

In [558]:
def step_to_markdown(step: Step) -> str:
    md = f"## Step: {step.name}\n\n"
    md += f"Type: {step.type}\n\n"

    if isinstance(step, SQLStep):
        md += "### SQL Query\n\n"
        md += f"```sql\n{step.sql}\n```\n\n"
        if step.parameters:
            md += "Parameters:\n"
            for param in step.parameters:
                md += f"- {param.name} ({param.type})\n"

    elif isinstance(step, FilterStep):
        md += f"Send True To: {step.send_true_to}\n"
        md += f"Send False To: {step.send_false_to}\n\n"
        md += "Conditions:\n"
        for condition in step.conditions:
            md += f"- {condition.leftvalue} {condition.operator} {condition.rightvalue}\n"

    elif isinstance(step, InsertUpdateStep):
        md += f"Table: {step.table}\n\n"
        md += "Keys:\n"
        for key in step.keys:
            md += f"- {key.name}: {key.field}\n"
        md += "\nValues:\n"
        for value in step.values:
            md += f"- {value.name}: {value.field}\n"

    return md

def convert_to_filename(input_string):
    s = re.sub(r'[^a-zA-Z0-9_\-\.~]', '_', input_string)
    return s.lower()


def transformation_to_markdown(transformation: KTRTransformation) -> str:
    md = f"# Transformation: {transformation.name}\n\n"
    md += "## Execution Order\n\n"
    for step_name in transformation.order:
        md += f"- {step_name}\n"
    md += "\n## Steps\n\n"
    for step in transformation.steps:
        md += step_to_markdown(step) + "\n"
    # return md

    md_file_name = convert_to_filename(transformation.name)+'.md'

    try:
        with open(md_file_name, 'w', encoding='utf-8') as f:
            f.write(md)
        logging.info(f"Markdown documentation exported to {md_file_name}")
    except Exception as e:
        logging.error(f"Error exporting markdown to file: {e}")
        raise
    return md

In [559]:
T = parse_ktr_file(pentaho_file_path)

In [560]:
transformation_to_markdown(T)

"# Transformation: BN_BENEFICIARIO - insert/update\n\n## Execution Order\n\n- ('Switch / Case LOCAL FATURAMENTO', 'Switch / Case TITULAR RESPONSÁVEL')\n- ('Switch / Case PESSOA RESPONSÁVEL', 'Dummy (do nothing) 2')\n- ('Switch / Case PESSOA RESPONSÁVEL', 'FAMILIA PESSOA RESPONSÁVEL')\n- ('Switch / Case TITULAR RESPONSÁVEL', 'FAMILIA TITULAR RESPONSÁVEL')\n- ('Switch / Case TITULAR RESPONSÁVEL', 'Switch / Case PESSOA RESPONSÁVEL')\n- ('Dummy (do nothing)', 'Remover colunas')\n- ('Remover colunas', 'Switch / Case LOCAL FATURAMENTO')\n- ('BENEFICIÁRIO', 'SAM_FAMILIA_TETO_PF')\n- ('SAM_FAMILIA_TETO_PF', 'Dummy (do nothing)')\n- ('SEM SETOR', 'Insert / Update - BN_BENEFICIARIO')\n- ('Insert / Update - BN_BENEFICIARIO', 'QTD_INCATU_BN_BENEFICIARIO')\n- ('Filter rows', 'BUSCA MICROSIGA')\n- ('BUSCA MICROSIGA', 'Insert / Update - BN_BENEFICIARIO')\n- ('HANDLE_BENEFICIARIO', 'BENEFICIÁRIO')\n- ('Filter rows', 'SEM SETOR')\n- ('Blocking Step', 'Set Variables')\n- ('QTD_INCATU_BN_BENEFICIARIO', '

# CrewAI

In [561]:
# model = ChatGroq(
#     model="llama3-8b-8192",
#     temperature = 0,
# )

In [562]:
model = ChatGoogleGenerativeAI(
    model='gemini-1.5-pro-exp-0801',
    temperature=0
)

In [563]:
type(T)

__main__.KTRTransformation

In [564]:
import json
import csv

In [565]:
class ResultCollector:
    def __init__(self, output_directory='output'):
        # self.results = []
        self.result = ""
        self.output_directory = output_directory
        os.makedirs(self.output_directory, exist_ok=True)

    def add_result(self, task_name, result, step_name):
        print(f"Adding result for task: {task_name}, step: {step_name}")
        #self.results.append({"result": result})
        self.result = result
        self.export_to_csv(f"{self.output_directory}/{step_name}.csv")

    def export_to_json(self, filename):
        with open(filename, 'w') as f:
            json.dump(self.results, f, indent=2)

    def export_to_csv(self, filename):
        if not self.result:
            print("No results to export.")
            return

        #keys = self.results[-1].keys()  # Use the last result to get the keys
        print(f"Exporting result to {filename}")
        with open(filename, 'w', newline='') as f:
            #writer = csv.DictWriter(f, fieldnames=keys)
            writer = csv.writer(f)
            #writer.writeheader()
            #writer.writerow(self.results[-1])  # Write only the last result
            writer.writerow(self.result)

In [566]:
result_collector = ResultCollector()

In [567]:
T.steps

[<__main__.SQLStep at 0x7f1d982c4e20>,
 <__main__.SQLStep at 0x7f1d80078130>,
 <__main__.Step at 0x7f1d80079d20>,
 <__main__.Step at 0x7f1d8007b490>,
 <__main__.SQLStep at 0x7f1d8007a3e0>,
 <__main__.Step at 0x7f1d8007a2f0>,
 <__main__.Step at 0x7f1d8007b1c0>,
 <__main__.SQLStep at 0x7f1d8007b1f0>,
 <__main__.SQLStep at 0x7f1d80078070>,
 <__main__.FilterStep at 0x7f1d800792a0>,
 <__main__.SQLStep at 0x7f1d8007a440>,
 <__main__.InsertUpdateStep at 0x7f1d800780a0>,
 <__main__.InsertUpdateStep at 0x7f1d8007b6d0>,
 <__main__.SQLStep at 0x7f1d800c4c10>,
 <__main__.Step at 0x7f1d800e0340>,
 <__main__.Step at 0x7f1d800e07f0>,
 <__main__.Step at 0x7f1d800e05e0>,
 <__main__.SQLStep at 0x7f1d800e09d0>,
 <__main__.SQLStep at 0x7f1d800e06a0>,
 <__main__.Step at 0x7f1d800e1f00>,
 <__main__.Step at 0x7f1d800e19f0>,
 <__main__.Step at 0x7f1d800e1a50>,
 <__main__.Step at 0x7f1d800e0b50>,
 <__main__.Step at 0x7f1d800e0790>]

In [568]:
def query_list(steps: list):
    """ 
    Input a tranformed ktr structure, output a list of queries.
    """
    queries = []  

    for step in T.steps:
        if step.type in ['DBJoin', 'TableInput']:
            queries.append({
                'step_name': step.name,
                'sql_code': step.sql 
            })  
    return queries

In [569]:
list_of_queries = query_list(T.steps)

In [570]:
list_of_queries

[{'step_name': 'BENEFICIÁRIO',
  'sql_code': "SELECT \n       BEN.HANDLE                                                                         AS ID_BENEFICIARIO\n      ,FAM.HANDLE                                                                         AS ID_FAMILIA\n      ,CON.HANDLE                                                                         AS ID_CONTRATANTE\n      ,NVL(LOT.HANDLE,0)                                                                  AS ID_CONTRATANTE_LOT\n      ,PLA.HANDLE                                                                         AS ID_PLANO\n      ,(SELECT MAX(BEN_TIT.HANDLE) \n        FROM   SAM_BENEFICIARIO BEN_TIT \n        WHERE  BEN_TIT.FAMILIA   = BEN.FAMILIA \n        AND    BEN_TIT.EHTITULAR = 'S')                                                   AS ID_BENEFICIARIO_RESP\n      ,CASE\n         WHEN CON.LOCALFATURAMENTO = 'C' THEN CPES.HANDLE --CPES\n         WHEN CON.LOCALFATURAMENTO = 'L' THEN LPES.HANDLE --LPES\n         WHEN CON

In [571]:
file_write_tool = FileWriterTool()

In [572]:
sql_analyst = Agent(
    role = "Senior Data Analyst",
    goal = "Analyse and complex sql queries and extract table name vs column name relation from all tables on the querie",
    backstory=dedent(
        """
        You're a highly specialized developed to dissect and understand complex SQL queries,
        you could quickly and accurately extract essential information from intricate SQL statements.
        Your key traits are Analytical prowess, Attention to detail, Vast knowledge of SQL syntax 
        across multiple database systems.
        """
    ),
    llm=model,
    allow_delegation=False
)

In [573]:
report_writer = Agent(
    role = "Senior Business Analyst",
    goal = "Write a document based on the work of analyst",
    backstory = dedent(
        """
        You writing still is well known for clear and effective communication.
        You summarize Pentahor workflow artifacts and sql queries business rules into bullet point contain de most import details.
        """
    ),
    llm=model,
    allow_delegation=False
)

In [574]:
extract_tables_columns = Task(
    description=dedent(
        """
        Analyse this SQL querie {sql_code}.
        Is very important use table name, not its nick name. Then extract tables e columns names.
        Get only table name and column name following this patterns:
        columns_name;table_name
        table1;columnName_n1
        table1;columnName_n2
        table2;columnName_n1
        table2;columnName_n2
        tableN;columnName_n1
        tableN;columnName_n1
        """
    ),
    expected_output="CSV file",
    agent=sql_analyst,
    #callback=lambda result: result_collector.add_result(result)
)

In [575]:
export_tables_columns = Task(
    description = "Get only table name and column.",
    expected_output = "CSV file",
    agent = sql_analyst,
    tools=[file_write_tool],
    context = [extract_tables_columns],
    #output_file='output/{step_name}.csv',
    #create_directory = True,
    #callback=lambda result: result_collector.add_result("export_tables_columns", result)
)

In [576]:
analyze_data = Task(
    description = "Analyze the query {sql_code} and write an analysis.",
    expected_output = "Detailed analysis from text for non technical public.",
    agent = sql_analyst,

)

In [577]:
write_report = Task(
    description=dedent(
        """
        Write an tecnical report from previous analysis, this document will be used 
        to migrate the actual project to cloud enviroment. 
        """
    ),
    expected_output = "Markdown report",
    agent=report_writer,
    context=[analyze_data]
)

In [578]:
# crew = Crew(
#     agents = [sql_analyst, report_writer],
#     tasks = [extract_tables_columns,format_queries,analyze_data,write_report],
#     process = Process.sequential,
#     verbose=1,
#     memory=False,
#     output_log_file="crew.log",
# )

In [579]:
crew = Crew(
    agents = [sql_analyst],
    tasks = [extract_tables_columns],
    process = Process.sequential,
    verbose = 0,
    memory=False,
    output_log_file="crew.log",
)



In [580]:
list_of_queries

[{'step_name': 'BENEFICIÁRIO',
  'sql_code': "SELECT \n       BEN.HANDLE                                                                         AS ID_BENEFICIARIO\n      ,FAM.HANDLE                                                                         AS ID_FAMILIA\n      ,CON.HANDLE                                                                         AS ID_CONTRATANTE\n      ,NVL(LOT.HANDLE,0)                                                                  AS ID_CONTRATANTE_LOT\n      ,PLA.HANDLE                                                                         AS ID_PLANO\n      ,(SELECT MAX(BEN_TIT.HANDLE) \n        FROM   SAM_BENEFICIARIO BEN_TIT \n        WHERE  BEN_TIT.FAMILIA   = BEN.FAMILIA \n        AND    BEN_TIT.EHTITULAR = 'S')                                                   AS ID_BENEFICIARIO_RESP\n      ,CASE\n         WHEN CON.LOCALFATURAMENTO = 'C' THEN CPES.HANDLE --CPES\n         WHEN CON.LOCALFATURAMENTO = 'L' THEN LPES.HANDLE --LPES\n         WHEN CON

In [581]:
def convert_to_filename(input_string):
    s = re.sub(r'[^a-zA-Z0-9_\-\.~]', '_', input_string)
    return s.lower()

In [582]:
##self.export_to_csv()

def save_to_csv(data, filename):
    # Split the string by newlines to get rows
    rows = data.split('\n')
    
    # Split each row by semicolons to get columns
    formatted_data = [row.split(';') for row in rows]
    
    # Output filename
    output_filename = f"{'output'}/{filename}.csv"

    # Write the formatted data to a CSV file
    with open(output_filename, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Column', 'Table'])  # Write header
        writer.writerows(formatted_data)

In [583]:
for query in list_of_queries:
    print(query['step_name'])
    file_name = convert_to_filename(query['step_name'])
    result = crew.kickoff(inputs=query)
    output = extract_tables_columns.output
    save_to_csv(output.raw, file_name)
    #result_collector.add_result('export_tables_columns',output.raw,file_name)

BENEFICIÁRIO
BUSCA MICROSIGA
CONTRATO
FAMILIA PESSOA RESPONSÁVEL
FAMILIA TITULAR RESPONSÁVEL
HANDLE_BENEFICIARIO
LOTAÇÃO




KeyboardInterrupt: 

In [529]:
output = extract_tables_columns.output

In [530]:
output.raw

'SETOR_UNIMED;SAM_BENEFICIARIO\nENDERECORESIDENCIAL;SAM_BENEFICIARIO\nHANDLE;SAM_BENEFICIARIO\nDDD1;SAM_ENDERECO\nPREFIXO1;SAM_ENDERECO\nNUMERO1;SAM_ENDERECO\nTELEFONE;SAM_ENDERECO'

In [347]:
#result = crew.kickoff(inputs=list_of_queries)

Adding result for task: extract_tables_columns, step: my_step_name
Exporting result to output/my_step_name.csv


In [339]:
#result = crew.kickoff_for_each(inputs=list_of_queries)

In [None]:
result_collector.results[3]

{'task': 'extract_tables_columns',
 'result': TaskOutput(description="\nAnalyse SELECT --DISTINCT\n  'P' AS TIPO_RESPONSAVEL,\n  FPES.HANDLE AS ID_RESP_FINANCEIRO,\n  FPES_ENDC.BAIRRO AS BAIRRO,\n  FPES_ENDC.CEP AS CEP,\n  FPES_MUNENDC.NOME AS CIDADE,\n  FPES.CNPJCPF AS CNPJ_CPF,\n  FPES_ENDC.COMPLEMENTO AS COMPLEMENTO,\n  FPES.NOME AS CONTRAT_PAGADOR_NOME,\n  FPES.DATASAIDA AS DATA_EXCLUSAO,\n  FPES.DATAENTRADA AS DATA_INCLUSAO,\n  FPES.DATANASCIMENTO AS DATA_NASCIMENTO,\n  FPES_ENDC.DDDCELULAR AS DDD_CELULAR,\n  FPES_ENDC.DDD1 AS DDD,\n  FPES_ENDC.DDD1 AS DDD_COMERCIAL,\n  CASE\n    WHEN (SELECT 1 FROM SFN_CONTAFIN FPES_CFIN, SFN_CONTAFIN_COMPLEMENTO FPES_CFIC WHERE FPES_CFIN.PESSOA = FPES.HANDLE AND FPES_CFIN.HANDLE = FPES_CFIC.CONTAFINANCEIRA AND FPES_CFIC.DATAINICIAL <= TRUNC(SYSDATE) AND ( FPES_CFIC.DATAFINAL IS NULL OR FPES_CFIC.DATAFINAL >= TRUNC(SYSDATE) ) AND NVL(FPES_CFIC.CONTACORRENTE, '0') <> '0' AND ROWNUM = 1) = 1 THEN 'S' \n    ELSE 'N'\n  END AS DEBITO_AUT,\n  (SELECT 

In [102]:
teste2 = teste['result']

In [107]:
teste2.csv

AttributeError: 'TaskOutput' object has no attribute 'csv'