In [47]:
from typing import Optional, Any
from __future__ import annotations
from sqlglot import parse_one, Expression, column, select, condition, alias

In [56]:
class miniconsulta_sql:
    """
        Clase que controla la ejecucion de una consulta 'simple' de SQL usando un LLM.

        Tenga en cuenta que estas consultas solo trabajan sobre una unica tabla. Si
        En la condiciones de la consulta se hace referencia a otra tabla quiere decir
        que esta consulta depende de otra, y por lo tanto el atributo dependencia 
        debe ser distinto de None

        atributos
        ------------

        tabla: Un string que almacena el nombre original de la tabla SQL.

        alias: Un string que almacena el alias relacionado a la tabla SQL.

        proyecciones: Una lista de expresiones las cuales son todas las proyecciones
                      del select de la consulta SQL.

        condiciones: Una lista de expresiones las cuales son todas las condiciones
                     del where de la consulta SQL.
                     
        condiciones_join: Una lista de expresiones las cuales son condiciones que 
                          estaban en algun ON de un JOIN en la consulta original 
                          SQL, estas condiciones deben ir en el where de esta
                          miniconsulta, e indica la forma en la que se debe
                          juntar el resultado de esta consulta con otra.
        
        status: Un string que indica el estado de ejecicion de la peticion a SQL.

        dependencia: Una miniconsulta de la cual depende esta miniconsulta.
    """
    # Toda la información necesaria para construir la 
    # consulta SQL
    tabla:str
    alias: str
    proyecciones: list[Expression]
    condiciones: list[Expression]
    condiciones_join: Optional[list[Expression]]
    
    # Status disponibles: En espera, Ejecutando, Finalizado
    status: str
    dependencia: Optional[miniconsulta_sql]

    def __init__(self, 
                tabla: str, 
                proyecciones: list[Expression],
                condiciones: list[Expression],
                alias:str = '',
                condiciones_join: Optional[list[Expression]] = None,
                dependencia: Optional[miniconsulta_sql] = None):
        
        self.tabla = tabla
        self.alias = alias
        self.proyecciones = proyecciones
        self.condiciones = condiciones
        self.condiciones_join = condiciones_join
        self.dependencia = dependencia
        self.status = "En espera"


    def crear_prompt(self):
        pass
    
    def _crear_representacion_SQL(self) -> str:
        condicion = condition(self.condiciones[0])
        for otra_condicion in self.condiciones[1:] + self.condiciones_join:
            condicion = condicion.and_(otra_condicion)

        tabla_form = self.tabla
        
        if self.alias != '':
            tabla_form = alias(f'{self.tabla} AS {self.alias}',self.alias,dialect='postgres',)

        return select(*self.proyecciones).from_(tabla_form).where(condicion).sql(dialect='postgres')

    def __str__(self) -> str:
        return self._crear_representacion_SQL()
    
    def __repr__(self) -> str:
        return self._crear_representacion_SQL()

In [4]:
def obtener_tablas(consulta_sql_ast: Expression) -> tuple[list[str], dict[str,str]]:
    """
        Dada un ast de una consulta SQL postgres obtiene todas las tablas 
        de la consulta. Esta función tiene en cuenta el form y los joins. 
        Ademas tiene en cuenta los alias

        Parametros
        --------------
        consulta_sql_ast: Un objeto Expression de sqlglot. Representa un 
                          ast de una consulta SQL

        Retorna
        --------------
            Una lista con el nombre de todas las tablas de la consulta.

            Un diccionario cuyos key son los alias de cada tabla y los 
            valores son el nombre original de la tabla
    """
    
    tablas = []
    tablas_alias = {}

    if consulta_sql_ast.key != 'select':
        raise Exception('La consulta SQL necesita tener un "SELECT"')

    if consulta_sql_ast.args.get('from') == None:
        raise Exception('La consulta SQL necesita tener un "FORM"')

    # Obtenemos la tabla que esta en el from   
    elementos_a_revisar = [consulta_sql_ast.args['from']]

    # Si tiene joins tenemos en cuenta esas tablas
    if consulta_sql_ast.args.get('joins') != None:
        elementos_a_revisar += consulta_sql_ast.args['joins']
    
    # Conseguimos los nombres originales de las tablas y sus alias
    # si es que tienen 
    
    for elemento in elementos_a_revisar:
        if elemento.key == 'from' or elemento.key == 'join':
            
            nombre_tabla = elemento.this.this.this
            alias_tabla = elemento.this.alias
            
            if nombre_tabla not in tablas:
                tablas.append(nombre_tabla)

            if alias_tabla != '':
                tablas_alias[alias_tabla] = nombre_tabla
    return tablas, tablas_alias

In [5]:
def obtener_proyecciones(consulta_sql_ast: Expression, 
                         tablas: list[str], 
                         tablas_alias: dict[str, str]) -> dict[str, list[column]]:
    """
        Dada un ast de una consulta SQL postgres obtiene todas las proyecciones
        que hay en el SELECT de la consulta.

        Tenga en cuenta que si la consulta tiene un * como unica proyeccion
        la funcion lanzara un error.

        Parametros
        --------------
        consulta_sql_ast: Un objeto Expression de sqlglot. Representa un 
                          ast de una consulta SQL
        
        tabla: Una lista con el nombre de todas las tablas de la consulta

        tablas_alias: Un diccionario cuyos key son los alias de cada tabla y los 
            valores son el nombre original de la tabla 

        Retorna
        --------------
            Un diccionario cuyas key son la tabla (o alias de tablas) con la que 
            esta relacionado una o varias proyecciones en el select. Y los valores
            son una lista de columnas de la tabla.
    """
    proyecciones = {}

    # Revisamos si la unica proyeccion es un *
    if (len(consulta_sql_ast.args['expressions']) == 1 and 
        consulta_sql_ast.args['expressions'][0].key == 'star'):
        raise Exception('El select tiene un * que haremos?')
    
    for proyeccion in consulta_sql_ast.args['expressions']:
        if proyeccion.key == 'column':
            if (proyeccion.table not in tablas and 
                tablas_alias.get(proyeccion.table) == None):
                raise Exception(f'No existe la tabla o alias de tabla "{proyeccion.table}"')
            
            if proyecciones.get(proyeccion.table) == None:
                proyecciones[proyeccion.table] = []

            proyecciones[proyeccion.table].append(proyeccion)
    
    return proyecciones

In [6]:
def obtener_condiciones(consulta_sql_ast: Expression, 
                        tablas: list[str], 
                        tablas_alias: dict[str, str]) -> dict[str, list[Expression]]:
    """
        Dada un ast de una consulta SQL postgres obtiene todas las condiciones
        del WHERE de la consulta

        Tenga en cuenta que esta funcion espera que en el WHERE solo hayan operadores
        AND

        TODO
        ------------
            - Hacer que pueda tener OR en el WHERE

        Parametros
        --------------
        consulta_sql_ast: Un objeto Expression de sqlglot. Representa un 
                          ast de una consulta SQL
        
        tabla: Una lista con el nombre de todas las tablas de la consulta

        tablas_alias: Un diccionario cuyos key son los alias de cada tabla y los 
            valores son el nombre original de la tabla 

        Retorna
        --------------
            Un diccionario cuyas key son la tabla (o alias de tablas) con la que 
            esta relacionado una o varias condicones en el WHERE. Y los valores
            son una lista de dichas condiciones.
    """
    
    # Obtenemos todas las condiciones
    # Ten en cuenta que el and asocia a izquierda esta vez
    if consulta_sql_ast.args.get('where') == None:
         raise Exception('La consulta debe tener un WHERE')
    
    conectores = [consulta_sql_ast.args['where'].this]
    condiciones = []

    # Pasamos recursivamente por todos los operadores del WHERE
    # Y obtenemos todas las condiciones
    while conectores != []:
        conector_actual = conectores.pop(0)
        
        # Caso base
        if conector_actual.key != 'and':
            condiciones.append(conector_actual)
            break

        # Revisamos la parte izquierda del and
        if conector_actual.this.key != 'and':
            condiciones.append(conector_actual.this)
        else:
            conectores.append(conector_actual.this)
        
        # Agregamos la parte derecha del and
        if conector_actual.args['expression'].key != 'and':
            condiciones.append(conector_actual.args['expression'])
    
    # Clasificamos las condiciones
    # Si una condicion depende de dos tablas lo clasificaremos con la tabla de la 
    # izquierda
    condiciones_por_tablas = {}
    for condicion in condiciones:
        tabla_izquierda = ''
        tabla_derecha = ''

        nodo_izquierdo = condicion.this

        nodo_derecho = condicion.args['expression']

        if nodo_izquierdo.key == 'column':
            tabla_izquierda = nodo_izquierdo.table
        
        if nodo_derecho.key == 'column':
            tabla_derecha = nodo_derecho.table

        if tabla_derecha == '' and tabla_izquierda == '':
            raise Exception(f'La condicion {condicion} no es valida')
        
        # verificamos que las tablas del lado izquierdo y derecho existan
        if (tabla_izquierda != '' and
            tabla_izquierda not in tablas and 
            tablas_alias.get(tabla_izquierda) == None):
                raise Exception(f'No existe la tabla o alias de tabla "{tabla_izquierda}"')

        if (tabla_derecha != '' and
            tabla_derecha not in tablas and 
            tablas_alias.get(tabla_derecha) == None):
                raise Exception(f'No existe la tabla o alias de tabla "{tabla_derecha}"')
    
        if tabla_izquierda != '':
            if condiciones_por_tablas.get(tabla_izquierda) == None:
                condiciones_por_tablas[tabla_izquierda] = []
            
            condiciones_por_tablas[tabla_izquierda].append(condicion)
            continue
        
        if tabla_derecha != '':
            if condiciones_por_tablas.get(tabla_derecha) == None:
                condiciones_por_tablas[tabla_derecha] = []
            
            condiciones_por_tablas[tabla_derecha].append(condicion)
            continue 
    
    return condiciones_por_tablas

In [7]:
def obtener_condiciones_joins(consulta_sql_ast: Expression, 
                              tablas: list[str], 
                              tablas_alias: dict[str, str],
                              condiciones:list[Expression])-> dict[str, list[Expression]]:
    """
        Dada un ast de una consulta SQL postgres obtiene todas las condiciones
        de los distintos JOINs

        Esta función solo tiene en cuenta las condiciones que 
        son de igualdad

        Tenga en cuenta que esta funcion tiene en cuenta el numero de 
        condiciones del WHERE que este relacionado a una tabla para 
        saber a que tabla debe asignar la condicion del JOIN. 

        Esta funcion asigna la condicion del JOIN a la tabla que tenga
        menos condiciones (contando las condiciones del WHERE o del JOIN si 
        ya se el asigno alguna)

        TODO:
            - Manejar los casos donde las condiciones no son de igualdad

        Parametros
        --------------
        consulta_sql_ast: Un objeto Expression de sqlglot. Representa un 
                          ast de una consulta SQL
        
        tabla: Una lista con el nombre de todas las tablas de la consulta

        tablas_alias: Un diccionario cuyos key son los alias de cada tabla y los 
            valores son el nombre original de la tabla 

        condiciones: Un diccionario cuyas key son la tabla (o alias de tabla) con la que 
                     esta relacionado una o varias condicones en el WHERE. Y los valores
                     son una lista de dichas condiciones.

        Retorna
        --------------
            Un diccionario cuyas key son la tabla (o alias de tablas) con la que 
            esta relacionado una o varias condiciones en los JOINs. Y los valores
            son una lista de dichas condiciones.
    """
    
    elementos_a_revisar = []

    if consulta_sql_ast.args.get('joins') != None:
        elementos_a_revisar = consulta_sql_ast.args['joins']
    
    condiciones_joins = []
    for elemento in elementos_a_revisar:
        if elemento.args.get('on') == None:
            raise Exception('Todo JOIN debe tener un ON')
        
        condiciones_joins.append(elemento.args['on'])
    
    condiciones_por_tablas = {}
    
    for condicion in condiciones_joins:
        for nodo in [condicion.this, condicion.args['expression']]:
            if nodo.key != 'column':
                raise Exception(f'La condicion de JOIN {condicion} debe involucrar dos tablas')
                        
            if nodo.table == '':
                raise Exception(f'La condicion {condicion} no es valida')
            
            if (nodo.table not in tablas and 
                tablas_alias.get(nodo.table) == None):
                raise Exception(f'No existe la tabla o alias de tabla "{nodo.table}"')

        # Calculamos cuantas condiciones estan relacionada con cada una de las tablas
        # que estan involucaradas en la condicion        
        numero_condiciones_tabla_izquierda = len(condiciones[condicion.this.table])
        if condiciones_por_tablas.get(condicion.this.table) != None:
            numero_condiciones_tabla_izquierda += len(condiciones_por_tablas[condicion.this.table])

        numero_condiciones_tabla_derecha = len(condiciones[condicion.args['expression'].table])
        if condiciones_por_tablas.get(condicion.args['expression'].table) != None:
            numero_condiciones_tabla_derecha += len(condiciones_por_tablas[condicion.args['expression'].table])

        # Le añadimos la condicion a la tabla que tenga menos condiciones, para asi 
        # acotar mas el dominio de la consulta
        if numero_condiciones_tabla_izquierda < numero_condiciones_tabla_derecha:
            if condiciones_por_tablas.get(condicion.this.table) == None:
                condiciones_por_tablas[condicion.this.table] = []

            condiciones_por_tablas[condicion.this.table].append(condicion)
        else:
            if condiciones_por_tablas.get(condicion.this.table) == None:
                condiciones_por_tablas[condicion.args['expression'].table] = []

            condiciones_por_tablas[condicion.args['expression'].table].append(condicion)
        
    return condiciones_por_tablas

In [8]:
def obtener_proyecciones_joins(consulta_sql_ast: Expression, 
                              tablas: list[str], 
                              tablas_alias: dict[str, str]) -> dict[str, list[Expression]]:
    """
        Dada un ast de una consulta SQL postgres obtiene todas las condiciones
        de los distintos JOINs y devuelve las columnas de las tablas utilizadas
        en alguna de estas condiciones


        Parametros
        ------------
        consulta_sql_ast: Un objeto Expression de sqlglot. Representa un 
                          ast de una consulta SQL
        
        tabla: Una lista con el nombre de todas las tablas de la consulta

        tablas_alias: Un diccionario cuyos key son los alias de cada tabla y los 
            valores son el nombre original de la tabla 

        Retorna
        -----------

        Un diccionario cuya key son las distintas tablas utilizadas en alguna 
        condicion de un JOIN. Y sus valores son una lista con las distintas 
        columnas de dicha tabla los cuales fueron utilizados en alguna condición
        de JOIN

    """
    elementos_a_revisar = []

    if consulta_sql_ast.args.get('joins') != None:
        elementos_a_revisar = consulta_sql_ast.args['joins']
    
    condiciones_joins = []
    for elemento in elementos_a_revisar:
        if elemento.args.get('on') == None:
            raise Exception('Todo JOIN debe tener un ON')
        
        condiciones_joins.append(elemento.args['on'])
    
    proyecciones_por_tablas = {}
    
    for condicion in condiciones_joins:
        for nodo in [condicion.this, condicion.args['expression']]:
            if nodo.key != 'column':
                raise Exception(f'La condicion de JOIN {condicion} debe involucrar dos tablas')
                        
            if nodo.table == '':
                raise Exception(f'La condicion {condicion} no es valida')
            
            if (nodo.table not in tablas and 
                tablas_alias.get(nodo.table) == None):
                raise Exception(f'No existe la tabla o alias de tabla "{nodo.table}"')
            
            tabla = nodo.table

            if proyecciones_por_tablas.get(tabla) == None:
                proyecciones_por_tablas[tabla] = []
            
            if nodo not in proyecciones_por_tablas[tabla]:
                proyecciones_por_tablas[tabla].append(nodo)
                
    return proyecciones_por_tablas

In [9]:
def dividir_consulta_sql(consulta_sql: str) -> dict[str, dict[str, Any]]:
    """
        Dado una consulta SQL obtiene la informacion suficiente 
        para crear una o mas consultas con complejidad igual o menor.

        Parametros
        -----------------

         consulta_sql: Un string con la consulta SQL

         Retorna
         ------------

         Un diccionario cuya claves son el alias (o nombre) de una tabla y los valores
         son otros diccionarios cuya claves son el nombre de la informacion de esa tabla
         y los valores son la informacion necesaria.
    """
    
    consulta_sql_ast = parse_one(consulta_sql, dialect='postgres')
    
    tablas, tablas_alias = obtener_tablas(consulta_sql_ast)
    proyecciones = obtener_proyecciones(consulta_sql_ast, tablas, tablas_alias)
    condiciones_por_tablas = obtener_condiciones(consulta_sql_ast, tablas, tablas_alias)
    condiciones_joins_por_tablas = obtener_condiciones_joins(consulta_sql_ast, tablas, tablas_alias, condiciones_por_tablas)
    proyecciones_joins = obtener_proyecciones_joins(consulta_sql_ast, tablas, tablas_alias)

    aliases = tablas_alias.keys()

    datos_miniconsultas = {}
    for alias in aliases:
        datos_miniconsultas[alias] = {'tabla': tablas_alias[alias]}
        
        if proyecciones.get(alias) != None:
            datos_miniconsultas[alias]['proyecciones'] = proyecciones[alias]
        else: 
            datos_miniconsultas[alias]['proyecciones'] = []
        
        if proyecciones_joins.get(alias) != None:
            datos_miniconsultas[alias]['proyecciones'] += proyecciones_joins[alias]
        
        if condiciones_por_tablas.get(alias) != None:
            datos_miniconsultas[alias]['condiciones'] = condiciones_por_tablas[alias]
        else:
            datos_miniconsultas[alias]['condiciones'] = []

        if condiciones_joins_por_tablas.get(alias) != None:
            datos_miniconsultas[alias]['condiciones_joins'] = condiciones_joins_por_tablas[alias]
        else:
            datos_miniconsultas[alias]['condiciones_joins'] = []

    return datos_miniconsultas

In [10]:
def obtener_dependencia(tabla:str, condiciones: list[Expression]):
    """
        Dada una tabla y una lista de condiciones revisa si existe 
        alguna condicion donde se relacione a esta tabla con otra. Lo
        que quiere decir que la primera tabla depende de otra.

        Tenga en cuenta que esta funcion supone que una tabla X solo
        puede depender de otra tabla Y. No es posible (por el momento)
        que X depende de Y y de otra tabla Z al mismo tiempo.

        TODO
        -----------------
        Modificar esta funcion para que maneje el caso de que una tabla
        X pueda depender de una tabla Y y otra Z

        Parametros
        ------------
        tabla: Un string que el alias (o nombre original) de la tabla la cual 
               se quiere verificar si depende de otra.
        
        condiciones: Una lista de expresiones de sqlglot. Estas expresiones 
                     representan condiciones
        
        Retorna
        ---------

        Un string vacio si la tabla no depende de ninguna otra tabla. O Un string
        con el nombre de la tabla de la que depende.
    """
    dependencia = ''
    for condicion in condiciones:
        for nodo in [condicion.this, condicion.args['expression']]:
            if nodo.key == 'column' and nodo.table != tabla:
                dependencia = nodo.table
            
    return dependencia

In [51]:
def obtener_lista_miniconsultas(consulta_sql: str) -> list[miniconsulta_sql]:
    """
        Dado una consulta SQL lo divide en consultas mas simples de forma tal
        que despues se pueda usar la informacion de estas nuevas consultas
        mas pequeñas para hacerle preguntas a algun LLM.

        Tenga en cuenta que el resultado de esta funcion es una lista donde 
        las primeras consultas son consultas que dependen de otras, y las 
        ultimas son consultas que no depende de ninguna otra.

        Parametros
        -----------------

        consulta_sql: Un string con la consulta SQL

        Retorna
        ------------

        Una lista con las distintas consultas mas simples a realizar para devolver
        la información que requiere la consulta original.
    """
    datos_miniconsultas = dividir_consulta_sql(consulta_sql)
    datos_miniconsultas_dependientes = {}
    
    aliases = datos_miniconsultas.keys()
    
    miniconsultas_independientes = {}
    miniconsultas = []
    
    for alias in aliases:
        dependencia = obtener_dependencia(alias, datos_miniconsultas[alias]['condiciones_joins'])

        if dependencia != '':
            datos_miniconsultas_dependientes[alias] = datos_miniconsultas[alias]
            datos_miniconsultas_dependientes[alias]['dependencia'] = dependencia     

    for alias in aliases:
        if alias in datos_miniconsultas_dependientes.keys():
            continue

        miniconsultas_independientes[alias] = miniconsulta_sql(tabla = datos_miniconsultas[alias]['tabla'], 
                                                               alias = alias,
                                                               proyecciones = datos_miniconsultas[alias]['proyecciones'], 
                                                               condiciones = datos_miniconsultas[alias]['condiciones'],
                                                               condiciones_join = datos_miniconsultas[alias]['condiciones_joins'])
    
    for alias, datos in datos_miniconsultas_dependientes.items():
        miniconsultas.append(miniconsulta_sql(tabla = datos_miniconsultas_dependientes[alias]['tabla'], 
                                              alias = alias,
                                              proyecciones = datos_miniconsultas_dependientes[alias]['proyecciones'], 
                                              condiciones = datos_miniconsultas_dependientes[alias]['condiciones'],
                                              condiciones_join = datos_miniconsultas_dependientes[alias]['condiciones_joins'],
                                              dependencia = miniconsultas_independientes[datos_miniconsultas_dependientes[alias]['dependencia']]))
    return miniconsultas + list(miniconsultas_independientes.values())

In [57]:
consulta_sql = """
                Select T2.language
                from country as T1
                join CountryLanguage As T2 on T1.Code = T2.CountryCode
                where 'Beatrix' = T1.HeadOfState and T2.IsOfficial = 'T'
            """

aliases, tablas_alias = obtener_tablas(parse_one(consulta_sql, dialect='postgres'))

# obtener_proyecciones(parse_one(consulta_sql, dialect='postgres'),tablas, tablas_alias)
# obtener_condiciones(parse_one(consulta_sql, dialect='postgres'),tablas, tablas_alias)
# obtener_condiciones_joins(parse_one(consulta_sql, dialect='postgres'),tablas, tablas_alias)
# obtener_proyecciones_joins(parse_one(consulta_sql, dialect='postgres'),tablas, tablas_alias)
# dividir_consulta_sql(consulta_sql)
print(obtener_lista_miniconsultas(consulta_sql))


[SELECT T2.language, T2.CountryCode FROM CountryLanguage AS T2 WHERE T2.IsOfficial = 'T' AND T1.Code = T2.CountryCode, SELECT T1.Code FROM country AS T1 WHERE 'Beatrix' = T1.HeadOfState]
