In [1]:
#Libraries
import pandas as pd
import sqlite3
import pickle
from kafka import KafkaProducer, KafkaConsumer
import json
import random
import time

import warnings
warnings.filterwarnings("ignore")

In [2]:
# Kafka settings
bootstrap_servers = ['localhost:9092']  # Kafka server address
topic = 'predictions_topic'  # Kafka topic to send predictions

# Function to create Kafka producer
def create_kafka_producer():
    return KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        value_serializer=lambda v: json.dumps(v).encode('utf-8'), # Serialize data to JSON format
        api_version=(0, 10, 1)
    )

In [3]:
def load_models():
    # Load the linear regression model
    with open('happiness_model_linear.pkl', 'rb') as f:
        linear_regression_model = pickle.load(f)

    # Load the decision tree model
    with open('happiness_model_tree.pkl', 'rb') as f:
        decision_tree_model = pickle.load(f)

    return linear_regression_model, decision_tree_model

In [4]:
# Function to create SQLite database and table
def create_sqlite_db():
    conn = sqlite3.connect('predictions.db')  # Connect to SQLite database (creates the file if it doesn't exist)
    c = conn.cursor()
    c.execute('DROP TABLE IF EXISTS predictions;')
    # Create the 'predictions' table if it doesn't exist
    c.execute('''
        CREATE TABLE predictions (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            GDP_per_capita REAL,
            Life_Expectancy REAL,
            Freedom REAL,
            linear_prediction REAL,
            tree_prediction REAL
        )
    ''')
    conn.commit()
    conn.close()
    print("Database and table created.")

In [5]:
def insert_prediction_to_db(features, linear_prediction, tree_prediction):
    conn = sqlite3.connect('predictions.db')
    c = conn.cursor()

    # Extraemos las columnas de los features (asumiendo que están en el formato de un diccionario)
    gdp_per_capita = features.get('GDP per Capita', None)
    life_expectancy = features.get('Life Expectancy', None)
    freedom = features.get('Freedom', None)

    # Insertamos los valores en la base de datos
    c.execute('''
        INSERT INTO predictions (GDP_per_capita, Life_Expectancy, Freedom, linear_prediction, tree_prediction)
        VALUES (?, ?, ?, ?, ?)
    ''', (gdp_per_capita, life_expectancy, freedom, linear_prediction, tree_prediction))

    # Commit y cerrar la conexión
    conn.commit()
    conn.close()

    print(f"Data inserted into database: GDP_per_capita={gdp_per_capita}, Life_Expectancy={life_expectancy}, Freedom={freedom}, Linear Prediction={linear_prediction}, Tree Prediction={tree_prediction}")


In [6]:
def send_data_to_kafka(data, linear_regression_model, decision_tree_model, producer):
    for i in range(len(data)):
        # Extraemos los features de cada fila (sin la columna 'Happiness Score')
        features = data.iloc[i].drop('Happiness Score').to_dict()

        # Hacemos las predicciones con ambos modelos
        linear_prediction = linear_regression_model.predict([list(features.values())])[0]
        tree_prediction = decision_tree_model.predict([list(features.values())])[0]

        # Convertimos los features a un JSON para enviarlos a Kafka
        features_json = json.dumps(features)

        # Creamos el mensaje para enviar a Kafka
        message = {
            'features': features_json,  # Convertimos los features a un string JSON
            'linear_prediction': linear_prediction,
            'tree_prediction': tree_prediction
        }

        # Enviamos el mensaje a Kafka
        producer.send('predictions_topic', value=message)
        producer.flush()  # Aseguramos que el mensaje se envíe inmediatamente

        print(f"Sent to Kafka: {message}")


In [18]:
# Function to consume data from Kafka and store it in SQLite
def consume_data_from_kafka_and_store():
    consumer = KafkaConsumer(
        'predictions_topic',  # The Kafka topic to consume from
        bootstrap_servers=bootstrap_servers,
        group_id='prediction_group',  # Consumer group ID
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),  # Deserialize data from JSON
        api_version=(0, 10, 1),
        enable_auto_commit=True,
        auto_offset_reset='earliest', 
        #consumer_timeout_ms=1000 
    )

    for message in consumer:
        data = message.value
        
        # Extract the features and predictions from the Kafka message
        features_json = data['features']
        linear_prediction = data['linear_prediction']
        tree_prediction = data['tree_prediction']

        # También insertamos los datos en la base de datos SQLite
        insert_prediction_to_db(features, linear_prediction, tree_prediction)
        
        print(f"Data consumed and stored: {data}")

In [20]:
def main():
    # Create the SQLite database and table
    create_sqlite_db()

    # Load the models
    linear_regression_model, decision_tree_model = load_models()

    # Create a Kafka producer
    producer = create_kafka_producer()

    # Assuming final_data is your DataFrame that contains the features and target
    data_test = pd.read_csv('data_test.csv')  # Load your dataset here

    # Send data to Kafka and store it in SQLite
    send_data_to_kafka(data_test, linear_regression_model, decision_tree_model, producer)

    # Start consuming data from Kafka and storing it in SQLite
    consume_data_from_kafka_and_store()

if __name__ == '__main__':
    main()

Database and table created.
Sent to Kafka: {'features': '{"GDP per Capita": 0.308, "Life Expectancy": 0.391, "Freedom": 0.452}', 'linear_prediction': 4.442772734340008, 'tree_prediction': 4.395}
Sent to Kafka: {'features': '{"GDP per Capita": 0.874, "Life Expectancy": 0.365, "Freedom": 0.519}', 'linear_prediction': 5.305658955137872, 'tree_prediction': 4.574}
Sent to Kafka: {'features': '{"GDP per Capita": 0.97306, "Life Expectancy": 0.68613, "Freedom": 0.4027}', 'linear_prediction': 5.53044459873677, 'tree_prediction': 4.788}
Sent to Kafka: {'features': '{"GDP per Capita": 1.15851, "Life Expectancy": 0.3494, "Freedom": 0.28098}', 'linear_prediction': 5.109296856005871, 'tree_prediction': 4.49700021743774}
Sent to Kafka: {'features': '{"GDP per Capita": 1.08754, "Life Expectancy": 0.61415, "Freedom": 0.40425}', 'linear_prediction': 5.601604564855813, 'tree_prediction': 5.615}
Sent to Kafka: {'features': '{"GDP per Capita": 0.31292, "Life Expectancy": 0.16347, "Freedom": 0.27544}', 'lin

NameError: name 'features' is not defined