In [3]:
from confluent_kafka import Producer
import json
import pickle
import pandas as pd
from sklearn.model_selection import train_test_split

producer_conf = {
    'bootstrap.servers': 'localhost:9092',
}

p = Producer(producer_conf)

def delivery_report(err, msg):
    if err is not None:
        print(f"Error: {err}")
    else:
        print(f"Mensaje producido: {msg.key()}: {msg.value()}")

df = pd.read_csv('../features/combined_happiness_data.csv')
columns_to_use = ['happiness_score', 'family', 'freedom', 'generosity', 'GDP', 'life_expectancy', 'government_corruption']
df = df[columns_to_use].dropna()
df['government_corruption'] = pd.to_numeric(df['government_corruption'], errors='coerce')
df = df.dropna()

X = df.drop('happiness_score', axis=1)
y = df['happiness_score']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Send
for i in range(len(X_test)):
    data = X_test.iloc[i].to_dict()
    true_value = y_test.iloc[i]
    message = json.dumps({'index': i, 'data': data, 'true_value': true_value})
    p.produce('first_topic', key=str(i), value=message, callback=delivery_report)

p.flush()


Mensaje producido: b'0': b'{"index": 0, "data": {"family": 0.95, "freedom": 0.452, "generosity": 0.22, "GDP": 0.308, "life_expectancy": 0.391, "government_corruption": 0.146}, "true_value": 4.35}'
Mensaje producido: b'1': b'{"index": 1, "data": {"family": 1.281, "freedom": 0.519, "generosity": 0.051, "GDP": 0.874, "life_expectancy": 0.365, "government_corruption": 0.064}, "true_value": 4.441}'
Mensaje producido: b'2': b'{"index": 2, "data": {"family": 0.81, "freedom": 0.334, "generosity": 0.216, "GDP": 0.652, "life_expectancy": 0.424, "government_corruption": 0.113}, "true_value": 5.472}'
Mensaje producido: b'3': b'{"index": 3, "data": {"family": 1.31, "freedom": 0.598, "generosity": 0.262, "GDP": 1.503, "life_expectancy": 0.825, "government_corruption": 0.182}, "true_value": 6.825}'
Mensaje producido: b'4': b'{"index": 4, "data": {"family": 1.471, "freedom": 0.547, "generosity": 0.291, "GDP": 1.398, "life_expectancy": 0.819, "government_corruption": 0.133}, "true_value": 6.886}'
Mensa

0

In [4]:
from confluent_kafka import Consumer, KafkaError
import json
import pickle
import pandas as pd
from sklearn.metrics import mean_squared_error, r2_score

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
}

c = Consumer(consumer_conf)
c.subscribe(['first_topic'])

with open('modelo_prediccion_felicidad.pkl', 'rb') as file:
    model = pickle.load(file)

predictions = []
true_values = []

# Expected number of messages (from test set)
num_expected_messages = 235  
num_processed_messages = 0

while num_processed_messages < num_expected_messages:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'.format(msg.topic(), msg.partition()))
        elif msg.error():
            print(msg.error())
            break
    else:
        try:
            # Decoding
            data = json.loads(msg.value().decode('utf-8'))
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")
            print(f"Message value: {msg.value()}")
            continue

        index = data['index']
        features = data['data']
        true_value = data['true_value']

        features_df = pd.DataFrame([features])

        # prediction
        prediction = model.predict(features_df)

        # Store
        predictions.append(prediction[0])
        true_values.append(true_value)

        num_processed_messages += 1

        print(f"Index: {index}, Prediction: {prediction[0]}, True Value: {true_value}")

mse = mean_squared_error(true_values, predictions)
r2 = r2_score(true_values, predictions)

print(f"Mean Squared Error: {mse}")
print(f"R^2 Score: {r2}")

c.close()


Index: 0, Prediction: 4.3255499949264555, True Value: 4.35
Index: 1, Prediction: 4.284319990107215, True Value: 4.441
Index: 2, Prediction: 4.402369985504153, True Value: 5.472
Index: 3, Prediction: 7.04333001209259, True Value: 6.825
Index: 4, Prediction: 6.889450001754768, True Value: 6.886
Index: 5, Prediction: 5.711029989414217, True Value: 5.919
Index: 6, Prediction: 5.744890001296997, True Value: 5.33599996566772
Index: 7, Prediction: 5.832889975891112, True Value: 5.81
Index: 8, Prediction: 4.5717599943415355, True Value: 5.163
Index: 9, Prediction: 4.249919969024656, True Value: 4.559
Index: 10, Prediction: 5.825309968528748, True Value: 5.84999990463257
Index: 11, Prediction: 6.74521000267029, True Value: 7.246
Index: 12, Prediction: 5.470760000000002, True Value: 4.456
Index: 13, Prediction: 3.929510003356935, True Value: 3.587
Index: 14, Prediction: 4.290180002174381, True Value: 3.933
Index: 15, Prediction: 4.41411998931885, True Value: 3.006
Index: 16, Prediction: 6.294330