# Lab 2.1: Frequência de referência para ir ao Banho e Tosa

Este notebook executa o pipeline de geração de dados de referência para a frequência de banho e tosa. As células abaixo irão compilar o código MapReduce, ingerir dados do PostgreSQL com Sqoop, executar o job no Hadoop e armazenar os dados resultantes na tabela `booking_reference`.

In [None]:
import os

# Define qual implementação de MapReduce usar: 'java' ou 'python'
MAPREDUCE_LANG = 'java'  # Altere para 'python' para usar a implementação em Python
os.environ['MAPREDUCE_LANG'] = MAPREDUCE_LANG

print(f"Usando implementação de MapReduce: {os.environ['MAPREDUCE_LANG'].upper()}")

## Compilando a Aplicação MapReduce

A primeira etapa é compilar nosso código-fonte Java em um arquivo `.jar` executável.

In [None]:
%%bash
if [ "$MAPREDUCE_LANG" == "java" ]; then
    cd booking-recommendation-generate-reference
    mvn package -q
    echo "JAR compilado com sucesso em: target/booking-recommendation-generate-reference-1.0-SNAPSHOT.jar"
fi

## Consultando histórico de Agendamentos

Execute a célula abaixo para se conectar novamente e fazer uma consulta `SELECT` com os dados inseridos anteriormente.

In [None]:
import psycopg2
import os

# As credenciais e o host são baseados no arquivo docker-compose.txt
DB_HOST = "localhost" # Nome do serviço no Docker Compose
DB_NAME = "postgres"
DB_USER = "postgres"
DB_USER_PWD = "postgres"

try:
    conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
    cur = conn.cursor()
    
    cur.execute("""SELECT
                b.pet_id,
                CONCAT(p.species, ';', p.animal_type, ';', p.fur_type) AS pet_profile,
                b.booking_date
            FROM booking b
            JOIN pet p ON b.pet_id = p.pet_id
            WHERE b.status = 'Realizado' AND p.nenabled = TRUE AND b.nenabled = TRUE""")
    rows = cur.fetchall()
    
    print("Agendamentos encontrados:")
    for row in rows:
        print(row)
        
except Exception as e:
    print(f"Ocorreu um erro: {e}")
finally:
    if 'conn' in locals() and conn is not None:
        cur.close()
        conn.close()

## Montar recomendação ao Banho e Tosa com base na frequência média de todos os Pets

In [None]:
import psycopg2
import os
from datetime import datetime

# Database connection details
DB_HOST = "localhost"
DB_NAME = "postgres"
DB_USER = "postgres"
DB_USER_PWD = "postgres"

pipeline_name = 'booking_reference'
start_time = datetime.now()
status = 'RUNNING'

try:
    conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
    cur = conn.cursor()

    # Insert a new record into the execution_history table
    cur.execute(
        "INSERT INTO execution_history (target_table, start_time, status) VALUES (%s, %s, %s) RETURNING execution_id",
        (pipeline_name, start_time, status)
    )
    execution_id = cur.fetchone()[0]
    conn.commit()

    # Store the execution_id for later use
    os.environ['EXECUTION_ID'] = str(execution_id)

    print(f"Execution started for pipeline: {pipeline_name}")
    print(f"Execution ID: {execution_id}")

except Exception as e:
    print(f"An error occurred: {e}")
finally:
    if 'conn' in locals() and conn is not None:
        cur.close()
        conn.close()

## 1. Ingestão do histórico de Agendamentos com Sqoop

Importar os dados de agendamentos do PostgreSQL para o HDFS. A query já combina os dados do pet para formar o perfil.

In [None]:
%%bash
INPUT_DIR=/petshop/input_booking_reference

# Remove o diretório de entrada se ele já existir
hdfs dfs -test -d $INPUT_DIR
if [ $? -eq 0 ]; then
    echo "Removendo diretório HDFS existente: $INPUT_DIR"
    hdfs dfs -rm -r $INPUT_DIR
fi

echo "Iniciando importação com Sqoop..."
sqoop import \
    --connect jdbc:postgresql://localhost:5432/postgres \
    --username postgres \
    --password postgres \
    --query "SELECT \
                b.pet_id, \
                CONCAT(p.species, ';', p.animal_type, ';', p.fur_type) AS pet_profile, \
                b.booking_date \
            FROM booking b \
            JOIN pet p ON b.pet_id = p.pet_id \
            WHERE b.status = 'Realizado' AND p.nenabled = TRUE AND b.nenabled = TRUE AND \$CONDITIONS" \
    --target-dir $INPUT_DIR \
    --m 1 \
    --split-by b.pet_id

echo "\nImportação concluída. Verificando os dados no HDFS:"
hdfs dfs -ls $INPUT_DIR
hdfs dfs -cat $INPUT_DIR/part-m-00000 | head -n 5

## 2. Execução do Job MapReduce

Com os dados no HDFS, podemos executar nosso job MapReduce. O resultado será salvo no diretório `/petshop/output_booking_reference`.

In [None]:
%%bash
INPUT_DIR=/petshop/input_booking_reference
OUTPUT_DIR=/petshop/output_booking_reference

# Remove o diretório de saída se ele já existir
hdfs dfs -test -d $OUTPUT_DIR
if [ $? -eq 0 ]; then
    echo "Removendo diretório HDFS de saída existente: $OUTPUT_DIR"
    hdfs dfs -rm -r $OUTPUT_DIR
fi

if [ "$MAPREDUCE_LANG" == "java" ]; then
    JAR_PATH=booking-recommendation-generate-reference/target/booking-recommendation-generate-reference-1.0-SNAPSHOT.jar
   
    echo "Executando o job MapReduce (Java)..."
    hadoop jar $JAR_PATH -D log4j.logger.com.petshop.hadoop.BookingReferenceMapper=DEBUG $INPUT_DIR $OUTPUT_DIR

elif [ "$MAPREDUCE_LANG" == "python" ]; then
    MAPPER_PATH=booking-recommendation-generate-reference-python/mapper.py
    REDUCER_PATH=booking-recommendation-generate-reference-python/reducer.py
  
    echo "Executando o job MapReduce (Python)..."
    hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
        -file $MAPPER_PATH -mapper $MAPPER_PATH \
        -file $REDUCER_PATH -reducer $REDUCER_PATH \
        -input $INPUT_DIR \
        -output $OUTPUT_DIR
fi

echo "\nJob concluído!"

## 3. Verificando resultados

Vamos verificar o resultado processado no HDFS. O arquivo `part-r-00000` deve conter o `pet_profile` e a `frequency_days`.

In [None]:
%%bash
OUTPUT_DIR=/petshop/output_booking_reference

echo "Conteúdo do diretório de saída:"
hdfs dfs -ls $OUTPUT_DIR

echo "\nResultado do processamento:"
hdfs dfs -cat $OUTPUT_DIR/part-r-00000

## 4. Gravando resultados no PostgreSQL

Com o resultado processado e validado, a próxima etapa é armazená-lo no PostgreSQL para consumo pelo pipeline de recomendação.

In [None]:
import psycopg2
import os
from subprocess import Popen, PIPE

# Database connection details
DB_HOST = "localhost"
DB_NAME = "postgres"
DB_USER = "postgres"
DB_USER_PWD = "postgres"

# HDFS output file
output_file = "/petshop/output_booking_reference/part-r-00000"

try:
    # Connect to the database
    conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
    cur = conn.cursor()

    # Truncate the table before inserting new data
    cur.execute("TRUNCATE TABLE booking_reference;")
    print("Table booking_reference truncated.")

    # Read the HDFS file
    process = Popen(["hdfs", "dfs", "-cat", output_file], stdout=PIPE)
    (stdout, stderr) = process.communicate()
    exit_code = process.wait()

    if exit_code == 0:
        # Decode the output and split into lines
        lines = stdout.decode("utf-8").strip().split('\n')
        
        print(f"Inserting {len(lines)} records into booking_reference...")
        
        # Process each line and insert into the database
        for line in lines:
            if not line:
                continue
                
            pet_profile, frequency_days = line.split('\t')
            species, animal_type, fur_type = pet_profile.split(';')
            
            # Prepare and execute the INSERT statement
            cur.execute(
                "INSERT INTO booking_reference (species, animal_type, fur_type, frequency_days) VALUES (%s, %s, %s, %s)",
                (species, animal_type, fur_type, int(frequency_days))
            )
        
        # Commit the transaction
        conn.commit()
        print("Data successfully inserted into booking_reference.")

    else:
        print(f"Error reading HDFS file: {stderr}")

except Exception as e:
    print(f"An error occurred: {e}")
finally:
    if 'conn' in locals() and conn is not None:
        cur.close()
        conn.close()

In [None]:
import psycopg2
import os
from datetime import datetime
from subprocess import Popen, PIPE

# Database connection details
DB_HOST = "localhost"
DB_NAME = "postgres"
DB_USER = "postgres"
DB_USER_PWD = "postgres"

# HDFS output file
output_file = "/petshop/output_booking_reference/part-r-00000"

# Get the execution_id from the environment variable
execution_id = os.environ.get('EXECUTION_ID')

if execution_id:
    try:
        # Read the HDFS file to count the number of processed records
        process = Popen(["hdfs", "dfs", "-cat", output_file], stdout=PIPE)
        (stdout, stderr) = process.communicate()
        exit_code = process.wait()

        records_processed = 0
        if exit_code == 0:
            lines = stdout.decode("utf-8").strip().split('\\n')
            records_processed = len(lines)

        end_time = datetime.now()
        status = 'COMPLETED'

        conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
        cur = conn.cursor()

        # Update the execution_history record
        cur.execute(
            "UPDATE execution_history SET end_time = %s, status = %s, records_processed = %s WHERE execution_id = %s",
            (end_time, status, records_processed, execution_id)
        )
        conn.commit()

        print(f"Execution finished for pipeline with ID: {execution_id}")
        print(f"Records processed: {records_processed}")

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if 'conn' in locals() and conn is not None:
            cur.close()
            conn.close()
else:
    print("Execution ID not found. The final status could not be updated.")

## 5. Consultando dados inseridos no PostgreSQL

Execute a célula abaixo para verificar se os dados foram inseridos corretamente.

In [None]:
try:
    conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
    cur = conn.cursor()
    
    cur.execute("SELECT * FROM booking_reference;")
    rows = cur.fetchall()
    
    print("Registros encontrados:")
    for row in rows:
        print(row)
        
except Exception as e:
    print(f"Ocorreu um erro: {e}")
finally:
    if 'conn' in locals() and conn is not None:
        cur.close()
        conn.close()