In [21]:

import re
import time
import pandas as pd
from sklearn.model_selection import train_test_split 
from confluent_kafka import Producer
from json import loads
import pandas as pd
import joblib
from confluent_kafka import Consumer, KafkaException, KafkaError
from sklearn.ensemble import RandomForestRegressor
from sqlalchemy import create_engine

In [8]:

global happy_df
happy_df = []

def consume_messages(bootstrap_servers, group_id, topic, max_idle_time=10):
    global happy_df
    consumer = Consumer({
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    })
    
    consumer.subscribe([topic])
    
    last_message_time = time.time()  # Marca el tiempo de inicio
    timeout_count = 0
    
    try:
        while timeout_count < 3:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                if time.time() - last_message_time > max_idle_time:
                    print("Mensajes recibidos en su totalidad, cerrando consumidor.")
                    break
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f"Reached end of partition: {msg.partition()}")
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                # Mensaje recibido correctamente
                message = loads(msg.value().decode('utf-8'))
                print(f"Received message: {message}")
                happy_df.append(message)  # Almacenar el mensaje en la lista
                last_message_time = time.time()  # Resetear el temporizador después de recibir un mensaje
                timeout_count = 0
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()
        print("Consumidor cerrado correctamente.")
        

if __name__ == "__main__":
    bootstrap_servers = 'localhost:9092'
    group_id = 'happy_test_group'
    topic = 'happy_test'
    
    # Configurar tiempo máximo de inactividad
    consume_messages(bootstrap_servers, group_id, topic, max_idle_time=10)




   


Received message: {'economy': 1.41691517829895, 'social_support': 1.43633782863617, 'life_expectancy': 0.913475871086121, 'freedom': 0.505625545978546, 'government_corruption': 0.163760736584663, 'generosity': 0.12057276815176, 'year': 2017.0, 'continent_Asia': 1.0, 'continent_Asia/Europe': 0.0, 'continent_Europe': 0.0, 'continent_Europe/Asia': 0.0, 'continent_North America': 0.0, 'continent_Oceania': 0.0, 'continent_South America': 0.0, 'happiness_score': 5.92000007629395}
Received message: {'economy': 0.274, 'social_support': 0.757, 'life_expectancy': 0.505, 'freedom': 0.142, 'government_corruption': 0.078, 'generosity': 0.275, 'year': 2019.0, 'continent_Asia': 0.0, 'continent_Asia/Europe': 0.0, 'continent_Europe': 0.0, 'continent_Europe/Asia': 0.0, 'continent_North America': 0.0, 'continent_Oceania': 0.0, 'continent_South America': 0.0, 'happiness_score': 3.973}
Received message: {'economy': 0.191, 'social_support': 0.56, 'life_expectancy': 0.495, 'freedom': 0.443, 'government_corru

In [14]:
if 'happy_df' in globals():
    happy_df = pd.DataFrame(happy_df)
    
    print("Primeros 5 mensajes recibidos:")
    print(happy_df.head())
    print("Shape del DataFrame:", happy_df.shape)
else:
    print("No se encontraron mensajes recibidos.")


Primeros 5 mensajes recibidos:
    economy  social_support  life_expectancy   freedom  government_corruption  \
0  1.416915        1.436338         0.913476  0.505626               0.163761   
1  0.274000        0.757000         0.505000  0.142000               0.078000   
2  0.191000        0.560000         0.495000  0.443000               0.089000   
3  0.786441        1.548969         0.498273  0.658249               0.246528   
4  1.053510        1.248230         0.787230  0.449740               0.084840   

   generosity    year  continent_Asia  continent_Asia/Europe  \
0    0.120573  2017.0             1.0                    0.0   
1    0.275000  2019.0             0.0                    0.0   
2    0.218000  2019.0             0.0                    0.0   
3    0.415984  2017.0             1.0                    0.0   
4    0.114510  2015.0             0.0                    0.0   

   continent_Europe  continent_Europe/Asia  continent_North America  \
0               0.0       

In [10]:
my_happiness_model = joblib.load("../model/my_happiness_model.pkl")

In [15]:
# Realizar predicciones
feature_predictor = happy_df.drop('happiness_score', axis=1)
happy_df['happiness_score_prediction'] = my_happiness_model.predict(feature_predictor)

# Mostrar la forma del DataFrame y las primeras filas
print(happy_df.shape)
print(happy_df.head())

(235, 16)
    economy  social_support  life_expectancy   freedom  government_corruption  \
0  1.416915        1.436338         0.913476  0.505626               0.163761   
1  0.274000        0.757000         0.505000  0.142000               0.078000   
2  0.191000        0.560000         0.495000  0.443000               0.089000   
3  0.786441        1.548969         0.498273  0.658249               0.246528   
4  1.053510        1.248230         0.787230  0.449740               0.084840   

   generosity    year  continent_Asia  continent_Asia/Europe  \
0    0.120573  2017.0             1.0                    0.0   
1    0.275000  2019.0             0.0                    0.0   
2    0.218000  2019.0             0.0                    0.0   
3    0.415984  2017.0             1.0                    0.0   
4    0.114510  2015.0             0.0                    0.0   

   continent_Europe  continent_Europe/Asia  continent_North America  \
0               0.0                    0.0     

In [18]:
print(happy_df.dtypes)


economy                       float64
social_support                float64
life_expectancy               float64
freedom                       float64
government_corruption         float64
generosity                    float64
year                          float64
continent_Asia                float64
continent_Asia/Europe         float64
continent_Europe              float64
continent_Europe/Asia         float64
continent_North America       float64
continent_Oceania             float64
continent_South America       float64
happiness_score               float64
happiness_score_prediction    float64
dtype: object


In [20]:
with open('db_config.json', 'r') as file:
    db_config = json.load(file)

# Conectar a la base de datos
conn = psycopg2.connect(
    host=db_config['host'],
    user=db_config['user'],
    password=db_config['password'],
    dbname="water",
    port=db_config['port']
)

cur = conn.cursor()


cur.execute("""
    CREATE TABLE IF NOT EXISTS happy_df (
        economy FLOAT,
        social_support FLOAT,
        life_expectancy FLOAT,
        freedom FLOAT,
        government_corruption FLOAT,
        generosity FLOAT,
        year FLOAT,
        continent_Asia FLOAT,
        continent_Asia_Europe FLOAT,
        continent_Europe FLOAT,
        continent_Europe_Asia FLOAT,
        continent_North_America FLOAT,
        continent_Oceania FLOAT,
        continent_South_America FLOAT,
        happiness_score FLOAT,
        happiness_score_prediction FLOAT
    )
""")

# Confirmar la transacción
conn.commit()

# Cerrar cursor y conexión
cur.close()
conn.close()

print("Tabla creada con éxito.")


Tabla creada con éxito.


In [23]:
happy_df.columns = [
    'economy', 'social_support', 'life_expectancy', 'freedom',
    'government_corruption', 'generosity', 'year', 'continent_asia',
    'continent_asia_europe', 'continent_europe', 'continent_europe_asia',
    'continent_north_america', 'continent_oceania', 'continent_south_america',
    'happiness_score', 'happiness_score_prediction'
]

# Cargar configuración de la base de datos
with open('db_config.json', 'r') as file:
    db_config = json.load(file)

# Crear la conexión al motor de la base de datos
engine = create_engine(f'postgresql+psycopg2://{db_config["user"]}:{db_config["password"]}@{db_config["host"]}:{db_config["port"]}/{db_config["dbname"]}')

# Intentar insertar los datos en la tabla 'candidates'
try:
    happy_df.to_sql('happy_df', engine, if_exists='append', index=False)
    print("Todos los datos se han añadido correctamente a la base de datos.")
except Exception as e:
    print(f"Se ha producido un error al insertar los datos: {e}")


Todos los datos se han añadido correctamente a la base de datos.
