# Fase 1: Conexión API

In [34]:
import requests
# Coordenadas Argentina: 34.6037° S, 58.3816° W
url = "https://www.7timer.info/bin/astro.php"
params = {
    "lon": "-58.38",
    "lat": "34.60",
    "ac": "0",
    "unit": "metric",
    "output": "xml",
    "tzshift": "0"
}
response = requests.get(url, params=params)
xml_data = response.text

# Fase 2: Parsear el XML

In [36]:
# Parsing el XML
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_data)
root

<Element 'product' at 0x0000017861055D10>

# Fase 3: Convertir a dict

In [37]:
import json
# Inicializar el diccionario para alamacenar la data extraida
data_dict = {}

# Iterar sobre cada elemento 'data'
for data_element in root.findall('.//data'):
    timepoint = data_element.get('timepoint') # Obtener el id timepoint
    data_dict[timepoint] = {}

    # Iterar osbre cada elemento hijo de data
    for child_element in data_element:
        variable = child_element.tag
        value = child_element.text
        data_dict[timepoint][variable] = value

# Convertir el diccionario en json
json_data = json.dumps(data_dict, indent=4)

# Mostrar el json
print(data_dict.keys())
data_dict["3h"]
#print(json_data.keys())

dict_keys(['3h', '6h', '9h', '12h', '15h', '18h', '21h', '24h', '27h', '30h', '33h', '36h', '39h', '42h', '45h', '48h', '51h', '54h', '57h', '60h', '63h', '66h', '69h', '72h'])


{'cloudcover': '1',
 'seeing': '6',
 'transparency': '5',
 'lifted_index': '-1',
 'rh2m': '11',
 'wind10m_direction': 'E',
 'wind10m_speed': '3',
 'temp2m': '26',
 'prec_type': 'none'}

# Fase 4: Convertir a dataframe el dict

In [44]:
# Obtener las columnas necesarios para insertar en la tabla
import pandas as pd
from datetime import date
df=pd.DataFrame.from_dict(data_dict, orient='index').reset_index().rename(columns={"index":'Pronostico'})
# Agregar la fecha del pronostico
df['Year']=date.today().year
df['Month']=date.today().month
df['Day']=date.today().day
df

Unnamed: 0,Pronostico,cloudcover,seeing,transparency,lifted_index,rh2m,wind10m_direction,wind10m_speed,temp2m,prec_type,Year,Month,Day
0,3h,1,6,5,-1,11,E,3,26,none,2023,7,16
1,6h,1,6,5,-1,11,SE,3,26,none,2023,7,16
2,9h,1,6,5,-1,11,SE,3,26,none,2023,7,16
3,12h,1,6,4,2,11,SE,3,26,none,2023,7,16
4,15h,1,6,4,-1,11,SE,3,26,rain,2023,7,16
5,18h,1,6,5,-1,11,SE,3,26,rain,2023,7,16
6,21h,1,6,4,-1,11,S,3,26,rain,2023,7,16
7,24h,1,6,4,-1,11,SW,3,26,rain,2023,7,16
8,27h,2,6,4,-1,11,S,3,26,none,2023,7,16
9,30h,1,6,4,-1,11,SW,3,26,none,2023,7,16


# Fase 5: Creacion tabla de destino en redshift

In [46]:
!pip install -q psycopg2


[notice] A new release of pip is available: 23.1.2 -> 23.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [48]:
import psycopg2

url="localhost" # #url="data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws"
data_base="FIN_MUNDO"
user="postgres"
with open("C:/Users/Windows/Downloads/pwd_postgre.txt",'r') as f:
    pwd= f.read()

try:
    conn = psycopg2.connect(
        host=url,
        dbname=data_base,
        user=user,
        password=pwd,
        port='5432'#'5439'
    )
    print("Conectado a Postgres")
    
except Exception as e:
    print("No es posible conectarse a Postgres")
    print(e)

Conectado a Postgres


# Fase 6: Funcion ETL

In [49]:
df.dtypes

Pronostico           object
cloudcover           object
seeing               object
transparency         object
lifted_index         object
rh2m                 object
wind10m_direction    object
wind10m_speed        object
temp2m               object
prec_type            object
Year                  int64
Month                 int64
Day                   int64
dtype: object

In [51]:
from psycopg2.extras import execute_values

def cargar_en_postgres(conn, table_name, dataframe):
    dtypes= dataframe.dtypes
    cols= list(dtypes.index )
    tipos= list(dtypes.values)
    type_map = {'int64': 'INT','int32': 'INT','float64': 'FLOAT','object': 'VARCHAR(50)','bool':'BOOLEAN'}
    sql_dtypes = [type_map[str(dtype)] for dtype in tipos]
    # Definir formato SQL VARIABLE TIPO_DATO
    column_defs = [f"{name} {data_type}" for name, data_type in zip(cols, sql_dtypes)]
    # Combine column definitions into the CREATE TABLE statement
    table_schema = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            {', '.join(column_defs)}
        );
        """
    # Crear la tabla
    cur = conn.cursor()
    cur.execute(table_schema)
    # Generar los valores a insertar
    values = [tuple(x) for x in dataframe.to_numpy()]
    # Definir el INSERT
    insert_sql = f"INSERT INTO {table_name} ({', '.join(cols)}) VALUES %s"
    # Execute the transaction to insert the data
    cur.execute("BEGIN")
    execute_values(cur, insert_sql, values)
    cur.execute("COMMIT")
    print('Proceso terminado')

# Fase 7: Trigger el ETL

In [52]:
cargar_en_postgres(conn=conn, table_name='tabla_pronostico', dataframe=df)

Proceso terminado
