# Pruebas y explicaciones de Extract

Hago aquí las cosas porque es mucho más cómodo para testear, luego lo pasaré al extract.py

La siguiente sección es la dada por el profe. Importa paquetes y se conecta a la base de datos mediante la variable `conn`

In [7]:
from pathlib import Path
import psycopg2
import pandas as pd
# https://pygrametl.org
from pygrametl.datasources import CSVSource, SQLSource
from typing import Generator

In [8]:
# Connect to the PostgreSQL source
path = Path("db_conf.txt")
if not path.is_file():
    raise FileNotFoundError(f"Database configuration file '{path.absolute()}' not found.")
try:
    parameters = {}
    # Read the database configuration from the provided txt file, line by line
    with open(path, 'r') as f:
        lines = f.readlines()
        for line in lines:
            parameters[line.split('=', 1)[0]] = line.split('=', 1)[1].strip()
    conn = psycopg2.connect(
        dbname=parameters['dbname'],
        user=parameters['user'],
        password=parameters['password'],
        host=parameters['ip'],
        port=parameters['port']
    )
    cursor = conn.cursor()
    print("¡Conectado!")

    
except psycopg2.Error as e:
    print(e)
    raise ValueError(f"Unable to connect to the database: {parameters}")
except Exception as e:
    print(e)
    raise ValueError(f"Database configuration file '{path.absolute()}' not properly formatted (check file 'db_conf.example.txt'.")

¡Conectado!


Una vez hecho esto tenemos dos variables importantes: 
- `conn` representa la conexión a la base de datos
- `cursor` es el objeto mediante el cual hacemos las queries

Este código siguiente te dice qué tablas hay dentro del AIMS y el AMOS

In [9]:
cursor.execute( """ 
                SELECT table_schema, table_name
                FROM information_schema.tables
                WHERE table_schema IN ('AIMS', 'AMOS')
                ORDER BY table_schema, table_name
                """
)

tablas = cursor.fetchall()
    
print("Tablas que tienes:")
for tabla in tablas: print(tabla)

Tablas que tienes:
('AIMS', 'flights')
('AIMS', 'maintenance')
('AIMS', 'slots')
('AMOS', 'attachments')
('AMOS', 'forecastedorders')
('AMOS', 'maintenanceevents')
('AMOS', 'operationinterruption')
('AMOS', 'postflightreports')
('AMOS', 'technicallogbookorders')
('AMOS', 'workorders')
('AMOS', 'workpackages')


Este código siguiente sirve para extraer elementos de la tabla `AIMS.flights`. Las comillas esas raras `\"` sirven para que el programa detecte bien mayúsculas y minúsculas. 

In [5]:

cursor.execute("""SELECT * FROM \"AIMS\".\"flights\" """) #Select de todos los elementos
batch = cursor.fetchmany(10) #Función iteradora que va devolviendo elementos cada vez que la llamas
print("Primeros 10 datos extraídos")

Primeros 10 datos extraídos


In [6]:
df = pd.DataFrame.from_records(batch) #Transformarlo a pandas y printearlo bonitamente
df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
0,1,XY-ACK,2023-01-01 03:23:56,2023-01-01 04:23:56,Flight,230101-TIA-GVA-4486-XY-ACK,TIA,GVA,2023-01-01 03:28:56,2023-01-01 04:28:56,False,,138,3,3
1,2,XY-ACK,2023-01-01 06:36:40,2023-01-01 08:36:40,Flight,230101-VRN-AOI-1291-XY-ACK,VRN,AOI,2023-01-01 06:38:40,2023-01-01 08:38:40,False,,132,3,2
2,3,XY-ACK,2023-01-02 04:42:43,2023-01-02 08:42:43,Flight,230102-LGW-ANR-8878-XY-ACK,LGW,ANR,2023-01-02 04:42:43,2023-01-02 08:42:43,False,,103,4,2
3,4,XY-ACK,2023-02-17 20:19:21,2023-02-17 23:19:21,Flight,230217-GYD-TBS-9247-XY-ACK,GYD,TBS,2023-02-17 20:23:21,2023-02-17 23:23:21,False,,142,4,2
4,5,XY-ACK,2023-02-18 11:31:17,2023-02-18 13:31:17,Flight,230218-CHQ-PEG-4126-XY-ACK,CHQ,PEG,2023-02-18 11:46:17,2023-02-18 13:46:17,False,,111,3,2
5,6,XY-ACK,2023-02-19 04:43:46,2023-02-19 08:43:46,Flight,230219-EMA-PDL-9646-XY-ACK,EMA,PDL,2023-02-19 06:01:46,2023-02-19 10:01:46,False,48.0,111,4,2
6,7,XY-ACK,2023-02-19 11:10:11,2023-02-19 14:10:11,Flight,230219-FCO-FAE-9628-XY-ACK,FCO,FAE,2023-02-19 14:12:11,2023-02-19 11:12:11,False,,91,4,3
7,8,XY-ACK,2023-02-19 19:41:26,2023-02-19 20:41:26,Flight,230219-KGS-BHX-9784-XY-ACK,KGS,BHX,2023-02-19 19:45:26,2023-02-19 20:45:26,False,,148,4,2
8,9,XY-ACK,2023-02-20 05:52:45,2023-02-20 08:52:45,Flight,230220-TLL-KRK-3745-XY-ACK,TLL,KRK,2023-02-20 06:07:45,2023-02-20 09:07:45,False,,113,4,3
9,10,XY-ACK,2023-02-20 19:39:03,2023-02-20 20:39:03,Flight,230220-BIO-AMS-2423-XY-ACK,BIO,AMS,2023-02-20 19:40:03,2023-02-20 20:40:03,False,,129,4,3


Ahora queremos hacer una función que devuelva un objeto iterador que vaya dando filas elemento por elemento. Esta función se la daremos al transform para que pueda ir extrayendo los datos a su ritmo, en forma de flujo, sin tener que cargarlo todo de golpe en la memoria local. 

In [10]:

def yield_rows(cursor: psycopg2.extensions.cursor, batch_size:int, esquema:str, tabla:str, columns:str = "*"):
    '''Extrae y devuelve los elementos de la tabla {esquema}.{tabla} de forma iterativa. '''

    cursor.execute( f"""SELECT {columns} FROM \"{esquema}\".\"{tabla}\" """) #Hacer el select de todo

    while True: #Ir extrayendo y devolviendo batches de elementos 
        
        batch = cursor.fetchmany(batch_size)
        if not batch: break  # No hay más datos
        yield batch


Esto está bien pero se puede mejorar. El principal problema es que no se cierra el cursor. El programa puede terminar por el break, por otra interrupción o porque el usuario deje de usar el iterador o lo elimine, pero en todo caso se tiene que cerrar. Esta implementación sí lo hace: 

In [11]:
def yield_rows(cursor: psycopg2.extensions.cursor, batch_size:int, esquema:str, tabla:str, columns:str = "*") -> Generator:
    '''Extrae y devuelve los elementos de la tabla {esquema}.{tabla} de forma iterativa. '''

    try:
        cursor.execute(f"""SELECT {columns} FROM "{esquema}"."{tabla}" """)

        while True:
            batch = cursor.fetchmany(batch_size)
            if not batch: break  # No hay más datos
            yield batch
    
    finally:
        cursor.close()
        print(f"Cursor cerrado para {esquema}.{tabla}")


Utilización de la función anterior:

In [12]:
iterator_flights = yield_rows(cursor, 10, "AIMS", "flights")

In [18]:
#Si le das varias veces a este bloque verás que te va devolviendo datos nuevos cada vez
pd.DataFrame.from_records(next(iterator_flights))

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
0,51,XY-ACK,2023-07-16 19:29:36,2023-07-16 23:29:36,Flight,230716-AOI-GRO-6767-XY-ACK,AOI,GRO,2023-07-16 19:38:36,2023-07-16 23:38:36,False,,102,4,2
1,52,XY-ACK,2023-07-17 15:56:27,2023-07-17 16:56:27,Flight,230717-PDL-ZTH-3881-XY-ACK,PDL,ZTH,NaT,NaT,True,,118,4,3
2,53,XY-ACK,2023-07-18 04:15:32,2023-07-18 08:15:32,Flight,230718-TRD-PRG-2217-XY-ACK,TRD,PRG,2023-07-18 04:20:32,2023-07-18 08:20:32,False,,135,4,3
3,54,XY-ACK,2023-07-18 08:36:03,2023-07-18 10:36:03,Flight,230718-IAS-OVD-9782-XY-ACK,IAS,OVD,2023-07-18 08:51:03,2023-07-18 10:51:03,False,,130,4,3
4,55,XY-ACK,2023-07-18 22:28:07,2023-07-19 03:28:07,Flight,230718-OPO-TKU-9548-XY-ACK,OPO,TKU,2023-07-18 22:39:07,2023-07-19 03:39:07,False,,101,3,3
5,56,XY-ACK,2023-07-19 06:58:11,2023-07-19 07:58:11,Flight,230719-MXP-BOO-4496-XY-ACK,MXP,BOO,2023-07-19 07:08:11,2023-07-19 08:08:11,False,,121,3,3
6,57,XY-ACK,2023-07-19 11:26:11,2023-07-19 15:26:11,Flight,230719-BOD-INI-9792-XY-ACK,BOD,INI,2023-07-19 11:31:11,2023-07-19 15:31:11,False,,176,4,2
7,58,XY-ACK,2023-09-24 21:49:52,2023-09-24 22:49:52,Flight,230924-ADA-PDL-9733-XY-ACK,ADA,PDL,2023-09-24 21:53:52,2023-09-24 22:53:52,False,,162,4,2
8,59,XY-ACK,2023-09-25 02:43:09,2023-09-25 06:43:09,Flight,230925-CHQ-PRN-7492-XY-ACK,CHQ,PRN,2023-09-25 03:33:09,2023-09-25 07:33:09,False,2.0,107,4,3
9,60,XY-ACK,2023-09-25 09:34:54,2023-09-25 13:34:54,Flight,230925-FMO-RTM-4492-XY-ACK,FMO,RTM,2023-09-25 09:38:54,2023-09-25 13:38:54,False,,97,4,2


Ahora queremos hacer la función final de extract. Usaremos la función yield_rows. Tiene que hacer esto: 

- Crear el cursor para conectarse a la base de datos original de DBeaver (AIMS y AMOS). Al final con un cursor no basta porque varios iteradores no pueden tener el mismo cursor sin interferir. Entonces cada tabla tiene que tener su propio cursor. 
- Por cada tabla relevante, generará un iterador que devuelva las cosas relevantes mediante la función `yield_rows`. Se necesita una forma de configurar qué tablas y columnas son relevantes, es por esto que la funció stc guarda todas las tuplas de (Schema, Table, Column) que se consultarán. Si un cierto elemento de `stc` fuera `('AIMS', 'slots', 'schedulearrival, kind')` entonces solo se extraerían estas dos columnas de esta tabla. 
- Finalmente devolverá un diccionario con todos los iteradores. La key del diccionario será el nombre de la tabla correspondiente
- Hará algo parecido para los archivos .csv pero eso está por ver 

In [19]:


def extract() -> dict[str, Generator]:

    iterators_dict:dict[str, Generator] = {}
    batch_size:int = 100

    #Schemas, Tables and Columns
    stc:list[tuple[str, str, str]] = [  ('AIMS', 'flights', '*'),
                                        ('AIMS', 'maintenance', '*'),
                                        ('AIMS', 'slots', '*'),
                                        ('AMOS', 'attachments', '*'),
                                        ('AMOS', 'forecastedorders', '*'),
                                        ('AMOS', 'maintenanceevents', '*'),
                                        ('AMOS', 'operationinterruption', '*'),
                                        ('AMOS', 'postflightreports', '*'),
                                        ('AMOS', 'technicallogbookorders', '*'),
                                        ('AMOS', 'workorders', '*'),
                                        ('AMOS', 'workpackages', '*')  ]
    
    for schema, table, column in stc: 
        cursor = conn.cursor()
        iterators_dict[ f"{schema}.{table}" ] = yield_rows(cursor, batch_size, schema, table, column)
    
    return iterators_dict



Uso de la función anterior. Aquí obtenemos todos los iteradores y usamos uno de ellos. 

In [22]:
diccionario = extract()

diccionario

{'AIMS.flights': <generator object yield_rows at 0x000002ACDFE23400>,
 'AIMS.maintenance': <generator object yield_rows at 0x000002ACDFE232E0>,
 'AIMS.slots': <generator object yield_rows at 0x000002ACDFE23520>,
 'AMOS.attachments': <generator object yield_rows at 0x000002ACDFE23640>,
 'AMOS.forecastedorders': <generator object yield_rows at 0x000002ACDFE23760>,
 'AMOS.maintenanceevents': <generator object yield_rows at 0x000002ACDFE23880>,
 'AMOS.operationinterruption': <generator object yield_rows at 0x000002ACDFE239A0>,
 'AMOS.postflightreports': <generator object yield_rows at 0x000002ACDFE23AC0>,
 'AMOS.technicallogbookorders': <generator object yield_rows at 0x000002ACDFE23BE0>,
 'AMOS.workorders': <generator object yield_rows at 0x000002ACDFE23D00>,
 'AMOS.workpackages': <generator object yield_rows at 0x000002ACDFE23E20>}

In [23]:
next(diccionario['AMOS.attachments'])

[('018c772f-f87a-410b-96f9-0762d6adbd25', 20001),
 ('581bb3bb-b9f0-4fae-8b8c-116d1bf85c51', 20001),
 ('a84edda3-fbbb-4e45-bbb1-54c5ab8c01b4', 20004),
 ('99a57b4e-956a-4104-9024-d31969c10f5c', 20006),
 ('ffd27c78-84d7-484a-acd0-e9bbbfe3dec7', 20009),
 ('2038ca98-5888-4982-8721-6410bf570468', 20009),
 ('0cd8c42a-6c70-471b-910a-68cdc124fcb5', 20010),
 ('9217ca61-be67-4f8d-ab1f-3d826dc5133f', 20010),
 ('c5d98231-39f5-48b7-85ab-6c63eeddb9b5', 20011),
 ('32884f14-6a6a-4d24-8744-b5975d42a6df', 20011),
 ('59ff0947-d3e8-4051-922f-6d047665a211', 20015),
 ('313cadb7-86a6-43b8-a035-d031138c0952', 20015),
 ('45c633da-3449-4b58-b661-fb4f8a8ffab7', 20016),
 ('dca42a75-2415-4346-ae31-394a0ff09aa2', 20016),
 ('ae5f885f-337d-4662-a7ef-0385447d2f67', 20017),
 ('a5ce4e2b-3850-4f00-b3bc-705e6d234832', 20017),
 ('54f375e6-ad7d-4e6f-af47-a037622dccd2', 20018),
 ('347bd16b-9da1-4f31-b068-a39ffd35cb01', 20019),
 ('66f46974-909e-46dc-be5c-9e99433d85ad', 20022),
 ('f4147799-3ff0-4548-b34e-276989bd5f24', 20022),


# Nuevo extract con SQLSource

Resulta que SQLSource hace más o menos lo mismo que yield_rows, así que lo implementaremos así. 

In [61]:
def extract() -> dict[str, SQLSource]:
    
    iterators_dict: dict[str, SQLSource] = {}

    # Schemas, Tables and Columns
    stc: list[tuple[str, str, str]] = [
        ('AIMS', 'flights', '*'),
        ('AIMS', 'maintenance', '*'),
        ('AIMS', 'slots', '*'),
        ('AMOS', 'attachments', '*'),
        ('AMOS', 'forecastedorders', '*'),
        ('AMOS', 'maintenanceevents', '*'),
        ('AMOS', 'operationinterruption', '*'),
        ('AMOS', 'postflightreports', '*'),
        ('AMOS', 'technicallogbookorders', '*'),
        ('AMOS', 'workorders', '*'),
        ('AMOS', 'workpackages', '*')
    ]
    
    for schema, table, columns in stc:
        # Crear SQLSource - ¡MANEJA AUTOMÁTICAMENTE el cursor y streaming!
        query = f'SELECT {columns} FROM "{schema}"."{table}"'
        iterators_dict[f"{schema}.{table}"] = SQLSource(connection=conn, query=query)
    
    return iterators_dict

In [65]:
diccionario = extract()

diccionario

{'AIMS.flights': <pygrametl.datasources.SQLSource at 0x2ace05829d0>,
 'AIMS.maintenance': <pygrametl.datasources.SQLSource at 0x2ace03c8410>,
 'AIMS.slots': <pygrametl.datasources.SQLSource at 0x2ace060e650>,
 'AMOS.attachments': <pygrametl.datasources.SQLSource at 0x2ace0587850>,
 'AMOS.forecastedorders': <pygrametl.datasources.SQLSource at 0x2ace0586ad0>,
 'AMOS.maintenanceevents': <pygrametl.datasources.SQLSource at 0x2ace0585690>,
 'AMOS.operationinterruption': <pygrametl.datasources.SQLSource at 0x2ace05f6e50>,
 'AMOS.postflightreports': <pygrametl.datasources.SQLSource at 0x2acdfd6e3d0>,
 'AMOS.technicallogbookorders': <pygrametl.datasources.SQLSource at 0x2ace0387e50>,
 'AMOS.workorders': <pygrametl.datasources.SQLSource at 0x2ace0584bd0>,
 'AMOS.workpackages': <pygrametl.datasources.SQLSource at 0x2acc7937fd0>}

In [35]:
source = iter(diccionario['AMOS.attachments'])

In [36]:
next(source)

{'file': '018c772f-f87a-410b-96f9-0762d6adbd25', 'event': 20001}

In [76]:


# Ver primeras 5 filas
for i, fila in enumerate(source):
    if i >= 5:
        break
    print(f"Fila {i}: {fila}")

Fila 0: {'file': '630fe605-19f3-4d48-bf65-584a6d24b6e6', 'event': 20105}
Fila 1: {'file': 'c368e2ac-a9e2-45e6-af93-cc9e4345d3ab', 'event': 20108}
Fila 2: {'file': 'eb489907-e27b-4cdb-935a-87c7ea104b03', 'event': 20108}
Fila 3: {'file': '37e8c339-c2f8-4bdc-b57f-3f9f6cdcdb72', 'event': 20109}
Fila 4: {'file': '28170eaf-fdac-4d7e-9fc1-63e3f191b5b4', 'event': 20111}


In [71]:
from datetime import datetime

## Contar cantidad de vuelos, aviones y días

In [74]:
diccionario = extract()
source_a = diccionario['AIMS.flights']

count = 0
aircrafts = set()
days = set()

for fila in source_a:
    count +=1
    #print(type(fila['scheduleddeparture']))
    aircrafts.add( fila['aircraftregistration'] )
    days.add( fila['scheduleddeparture'].date() )

print(f"Hay {count} vuelos")
print(f"Hay {len(aircrafts)} aircrafts")
print(f"Hay {len(days)} días")

Hay 69095 vuelos
Hay 283 aircrafts
Hay 730 días


In [77]:
diccionario = extract()
source_a = diccionario['AIMS.flights']

count = 0
aircrafts = set()
days = set()

for fila in source_a:
    count +=1
    print(fila)
    if count>5: break

{'id': 1, 'aircraftregistration': 'XY-ACK', 'scheduleddeparture': datetime.datetime(2023, 1, 1, 3, 23, 56), 'scheduledarrival': datetime.datetime(2023, 1, 1, 4, 23, 56), 'kind': 'Flight', 'flightid': '230101-TIA-GVA-4486-XY-ACK', 'departureairport': 'TIA', 'arrivalairport': 'GVA', 'actualdeparture': datetime.datetime(2023, 1, 1, 3, 28, 56), 'actualarrival': datetime.datetime(2023, 1, 1, 4, 28, 56), 'cancelled': False, 'delaycode': None, 'passengers': 138, 'cabincrew': 3, 'flightcrew': 3}
{'id': 2, 'aircraftregistration': 'XY-ACK', 'scheduleddeparture': datetime.datetime(2023, 1, 1, 6, 36, 40), 'scheduledarrival': datetime.datetime(2023, 1, 1, 8, 36, 40), 'kind': 'Flight', 'flightid': '230101-VRN-AOI-1291-XY-ACK', 'departureairport': 'VRN', 'arrivalairport': 'AOI', 'actualdeparture': datetime.datetime(2023, 1, 1, 6, 38, 40), 'actualarrival': datetime.datetime(2023, 1, 1, 8, 38, 40), 'cancelled': False, 'delaycode': None, 'passengers': 132, 'cabincrew': 3, 'flightcrew': 2}
{'id': 3, 'air