# UTN - Data Engineering - Modulo 2 - TP2
## Procesamiento de datos y almacenamiento OLAP.

En el TP1 se extrajo de una API la temperatura, presion y velocidad del viento almacenandolos en forma cruda.
A continuacion en este TP se van a realizar una seria de transformaciones (a pedido del cliente) a estos datos.

El Cliente solicito trabajar por ciudad, tal cual fueron almacenados los datos crudos en sistema Parquet.
Los datos de cada ciudad necesitan ser trabajados para luego pode analizarlos. Las transformaciones solicitadas fueron las siguientes:

1. Eliminar los datos duplicados, dejando el ultimo valor encontrado.
2. Completar si existen los valores nulos con 99999.
3. Poner las fechas en formato time.
4. Generar nuevas columnas con el 'Año', 'Mes', 'Hora' de los datos obtenidos.
5. Renombrar las columnas a un formato standar seleccionado por el cliente.
6. Armar tres tablas de la siguiente forma:
    1. Tabla 1 - Los datos completos procesados anteriormente.
    2. Tabla 2 - Los datos que corresponden a un rango horario diurno.
    3. Tabla 3 - Los datos que corresponden a un rango harario nocturno.

Las tablas las silocita el cliente de esta forma porque necesita analizar el comportamiento del viento en superficie (como fn. de la temp y la pres) segun sea de dia o de noche. A su vez necesita analizar la data comleta para comparar resultados.


### Se instalan las librerias principales

In [None]:
#Se instalan las librerias utilizadas para la conexioncon la DB y se actualiza Pandas
!pip install psycopg2-binary
!pip install sqlalchemy
!pip install pandas

### Se importan las Librerias

Importo Pandas para el manejo de Data Frames.

Importo datetime para el manejo de las Fechas y las horas. Sobre todo en el split (%Y, %M, %H) que solicita el cliente.

Importo sqlalchemy para la coneccion con la Base de Datos y las tablas.


In [3]:
import pandas as pd
import os.path
from datetime import datetime
from sqlalchemy import create_engine, MetaData, Table, Column, String, Float, Integer


### Seccion de Funciones soporte.

In [16]:
#Creo una funcion para verificar que la ciudad exista y tenga un archivo data en bronce.
#Si el archivo existe se carga en un DataFrame.
#Si no existe se emite un msg avisando que la ciudad no existe.
def loadData(city):
    directoryPath = 'weather/' + city + '/bronce/data.parquet'

    if os.path.exists(directoryPath):
        df = pd.read_parquet(directoryPath)
        print(f"Los datos de la ciudad '{city}' fueron cargados con exito.")
    else:
        print(f"La ciudad '{city}' no existe.")
        df = pd.DataFrame()
    return df

### Seccion Recupero de la informacion extraida de la API.

En este paso se recupera la informacion guardada en formato Parquet para una ciudad en particular.

In [31]:
#En esta variable se solicita la ciudad en particular que deseamos levantar la informacion.
city = 'Pico Truncado'
#Se envia la ciudad a la Funcion de verificacion y carga de los datos
df = loadData(city)
#Se muestra en encabezado del df.
df.head()

Los datos de la ciudad 'Pico Truncado' fueron cargados con exito.


Unnamed: 0,time,temperature_2m,wind_speed_10m,surface_pressure
0,2024-01-01T00:00,22.6,28.6,976.6
1,2024-01-01T01:00,20.0,23.1,977.4
2,2024-01-01T02:00,18.2,17.0,978.3
3,2024-01-01T03:00,16.7,13.3,978.7
4,2024-01-01T04:00,15.3,9.1,979.3


### Seccion de Transformacion de la informacion

In [32]:
#Se eliminan los datos duplicados dejando el ultimo bajado.
#No se realiza un sort, porque lo que se busca es dejar el ultimo dato duplicado que se haya bajado de la API.
#Ej. un dia se bajan xx cantidad de datos hasta una fecha. Y luego otro dia despues se bajan las mismas fechas.
#Como son pronosticos del clima, el Cliente considera que es de esperar que los ultimos datos tengan mayor certeza.

df.drop_duplicates(subset=None, keep='last', inplace=True, ignore_index=False)

In [33]:
#Si existen se reemplazan los valores nulos por 99999.

df.fillna(value=99999, method=None, axis=None, inplace=True)

In [34]:
#Aca se convierte la columna FechaTHora en formato Fecha Hora para cargar en la Base de Datos.

df['time'] = pd.to_datetime(df['time'])

#Se hace un split de esa fecha en Año, Mes y hora de cada dato registrado.
#Se crean nuevas columnas.
df['year'] = df['time'].dt.strftime('%Y')
df['month'] = df['time'].dt.strftime('%m')
df['hour'] = df['time'].dt.strftime('%H').astype(int)


In [35]:
#Se renombran las columnas en un formato standar a pedido del cliente.
df.rename(columns={'temperature_2m': 'temp', 'wind_speed_10m': 'speed', 'surface_pressure': 'pres'}, inplace=True)


In [39]:
#Se crean dos nuevos DataFrames uno con los datos correspondientes al horario diurno.
#Otro con los datos correspondientes a la noche.

df_day = df.loc[df['hour'].between(8, 19)]
df_night = df.loc[~df['hour'].between(8, 19)]

In [41]:
df_day

Unnamed: 0,time,temp,speed,pres,year,month,hour
8,2024-01-01 08:00:00,11.5,4.0,980.6,2024,01,8
9,2024-01-01 09:00:00,11.2,3.8,980.7,2024,01,9
10,2024-01-01 10:00:00,12.1,4.0,981.0,2024,01,10
11,2024-01-01 11:00:00,14.5,4.3,981.4,2024,01,11
12,2024-01-01 12:00:00,16.9,5.1,981.4,2024,01,12
...,...,...,...,...,...,...,...
2080,2024-03-27 16:00:00,17.9,7.7,983.6,2024,03,16
2081,2024-03-27 17:00:00,18.7,11.5,983.3,2024,03,17
2082,2024-03-27 18:00:00,19.1,14.5,983.0,2024,03,18
2083,2024-03-27 19:00:00,19.6,15.9,982.7,2024,03,19


In [None]:

# Conecta a la base de datos PostgreSQL
#engine = create_engine('postgresql://avnadmin:AVNS_YWr_pRTqSoTZq41JlYy@pg-214ccdb6-self-01.a.aivencloud.com:27648/defaultdb?sslmode=require')
engine = create_engine('postgresql://avnadmin:AVNS_YWr_pRTqSoTZq41JlYy@pg-214ccdb6-self-01.a.aivencloud.com:27648/defaultdb')
# Define la estructura de la tabla
metadata = MetaData()
ejemplo_table = Table('ejemplo4', metadata,
    Column('time', String(50), nullable=False), #primary_key=True),
    Column('temp', Float, nullable=False),
    Column('speed', Float, nullable=False),
    Column('pres', Float, nullable=False),
)
df.to_sql('ejemplo4', engine, if_exists='replace', method='multi')
# Crea la tabla en la base de datos
metadata.create_all(engine)

print("Tabla 'ejemplo' creada exitosamente en PostgreSQL.")

In [None]:
engine = create_engine('postgresql://avnadmin:AVNS_YWr_pRTqSoTZq41JlYy@pg-214ccdb6-self-01.a.aivencloud.com:27648/defaultdb', fast_executemany=True)
df.to_sql('ejemplo4', engine, if_exists='replace', method='multi')

In [7]:
from sqlalchemy import create_engine, MetaData, Table, Column, String, Float

# Crear una instancia de motor para la base de datos
# Reemplaza 'postgresql://usuario:contraseña@host:puerto/nombre_base_de_datos' con tus propios valores
#engine = create_engine('postgresql://usuario:contraseña@host:puerto/nombre_base_de_datos')

# Conectarse a la base de datos
connection = engine.connect()

# Crear un objeto MetaData
metadata = MetaData()

# Definir la estructura de la tabla
ejemplo_table = Table('ejemplo8', metadata,
    Column('time', String(50), nullable=False),
    Column('temp', Float, nullable=False),
    Column('speed', Float, nullable=False),
    Column('pres', Float, nullable=False),
)

# Crear la tabla en la base de datos
metadata.create_all(engine)

print("La tabla 'ejemplo4' se ha creado exitosamente en la base de datos PostgreSQL.")

La tabla 'ejemplo4' se ha creado exitosamente en la base de datos PostgreSQL.


In [None]:
# Nombre de la tabla en la base de datos
nombre_tabla = 'ejemplo8'  # Reemplaza 'nombre_tabla' con el nombre de tu tabla en la base de datos

# Crear una conexión a la base de datos
conn = engine.connect()

# Iterar sobre las filas del DataFrame e insertarlas en la tabla
for index, row in df.iterrows():
    insert_query = f"INSERT INTO {nombre_tabla} (time, temp, speed, pres) VALUES ('{row['time']}', {row['temp']}, {row['speed']}, {row['pres']})"
    conn.execute(insert_query)

# Cerrar la conexión
conn.close()

print("Los datos se han insertado exitosamente en la tabla en la base de datos PostgreSQL.")

In [None]:

# Crear una conexión a la base de datos
conn = engine.connect()

df.to_sql('ejemplo4', conn, if_exists='replace', method='multi')

# Cerrar la conexión
conn.close()

In [None]:
from sqlalchemy import create_engine

engine = create_engine('postgres://avnadmin:AVNS_YWr_pRTqSoTZq41JlYy@pg-214ccdb6-self-01.a.aivencloud.com:27648/defaultdb?sslmode=require')

conn = engine.connect()

conn.execute("commit")

with engine.begin() as conn:
    conn.execute('CRATE TABLE IF NOT EXISTS ....;')

conn.close()

In [46]:
!pip install psycopg2-binary


Collecting psycopg2-binary


  Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x000001E9EE5AD6A0>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')': /simple/psycopg2-binary/
  Retrying (Retry(total=3, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x000001E9EE5AD860>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')': /simple/psycopg2-binary/
  Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x000001E9EE5AD2B0>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')': /simple/psycopg2-binary/
  Retrying (Retry(total=1, connect=None, read=None, 

In [None]:
import psycopg2

# Datos de conexión (reemplaza con tus propios valores)
db_name = "nombre_de_la_base_de_datos"
db_user = "usuario"
db_password = "contraseña"

# Conexión a la base de datos
connection = psycopg2.connect(database=db_name, user=db_user, password=db_password)
cursor = connection.cursor()

# Crear la tabla (reemplaza con tus propios nombres de columna y tipos de datos)
create_table_query = """
CREATE TABLE mi_tabla (
    id SERIAL PRIMARY KEY,
    columna1 VARCHAR(255),
    columna2 INTEGER,
    columna3 FLOAT
);
"""
cursor.execute(create_table_query)
connection.commit()

for index, row in df.iterrows():
    insert_query = """
    INSERT INTO mi_tabla (columna1, columna2, columna3)
    VALUES (%s, %s, %s);
    """
    cursor.execute(insert_query, (row['columna1'], row['columna2'], row['columna3']))

connection.commit()

connection.close()

In [None]:
import pandas as pd
import psycopg2

# Datos de conexión (reemplaza con tus propios valores)
db_name = "nombre_de_la_base_de_datos"
db_user = "usuario"
db_password = "contraseña"

# Crear un DataFrame de ejemplo (puedes reemplazar esto con tus datos)
data = {'columna1': [5, 12, 8, 15, 22, 10, 18, 4, 6, 11]}
df = pd.DataFrame(data)

# Conexión a la base de datos
connection = psycopg2.connect(database=db_name, user=db_user, password=db_password)

# Nombre de la tabla en la base de datos
table_name = "mi_tabla"

# Escribir los registros almacenados en el DataFrame en la tabla
df.to_sql(name=table_name, con=connection, if_exists='replace', index=False)

# Cerrar la conexión
connection.close()

print(f"Los datos se han cargado correctamente en la tabla '{table_name}'.")

In [44]:
!pip install psycopg2

Collecting psycopg2


  Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x0000022946DB69B0>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')': /simple/psycopg2/
  Retrying (Retry(total=3, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x0000022946DB6240>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')': /simple/psycopg2/
  Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x0000022946DA7B70>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed')': /simple/psycopg2/
  Retrying (Retry(total=1, connect=None, read=None, redirect=None, status

In [None]:
import pandas as pd
import psycopg2

# Datos de conexión (reemplaza con tus propios valores)
db_name = "nombre_de_la_base_de_datos"
db_user = "usuario"
db_password = "contraseña"

# Crear un DataFrame de ejemplo (puedes reemplazar esto con tus datos)
data = {'columna1': [5, 12, 8, 15, 22, 10, 18, 4, 6, 11]}
df = pd.DataFrame(data)

# Conexión a la base de datos
connection = psycopg2.connect(database=db_name, user=db_user, password=db_password)

# Nombre de la tabla en la base de datos
table_name = "mi_tabla"

# Escribir los registros almacenados en el DataFrame en la tabla
df.to_sql(name=table_name, con=connection, if_exists='replace', index=False)

# Cerrar la conexión
connection.close()

print(f"Los datos se han cargado correctamente en la tabla '{table_name}'.")