# Lab 3: Cálculo de Valor por Perfil de Pet

Este notebook executa o pipeline de cálculo de Valor por Perfil de Pet (LTV). As células abaixo irão compilar o código MapReduce, ingerir dados do PostgreSQL com Sqoop, executa o job no Hadoop e verifica os resultados.

In [None]:
import os

# Define qual implementação de MapReduce usar: 'java' ou 'python'
MAPREDUCE_LANG = 'java'  # Altere para 'python' para usar a implementação em Python
os.environ['MAPREDUCE_LANG'] = MAPREDUCE_LANG

print(f"Usando implementação de MapReduce: {os.environ['MAPREDUCE_LANG'].upper()}")

## Compilando a Aplicação MapReduce

A primeira etapa é compilar nosso código-fonte Java em um arquivo `.jar` executável. A célula abaixo usa o Maven para isso. Ela navega até o diretório do projeto e executa o `mvn package`. A flag `-q` é para uma saída mais limpa.

In [None]:
%%bash
if [ "$MAPREDUCE_LANG" == "java" ]; then
    cd ltv-by-pet-profile
    mvn package -q
    echo "JAR compilado com sucesso em: target/ltv-by-pet-profile-1.0-SNAPSHOT.jar"
fi

## Consultando custo em compras por Pet

Execute a célula abaixo para se conectar novamente e fazer uma consulta `SELECT` com os dados inseridos anteriormente.

In [None]:
import psycopg2
import os

# As credenciais e o host são baseados no arquivo docker-compose.txt
DB_HOST = "localhost" # Nome do serviço no Docker Compose
DB_NAME = "postgres"
DB_USER = "postgres"
DB_USER_PWD = "postgres"

try:
    conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
    cur = conn.cursor()
    
    cur.execute("""SELECT 
            CONCAT(p.species, ';', p.animal_type, ';', p.fur_type) AS perfil_pet,
            (hc.quantity * hc.price) AS valor_compra 
            FROM purchase hc 
            JOIN pet p ON hc.tutor_id = p.tutor_id
            WHERE hc.nenabled = TRUE AND p.nenabled = TRUE""")
    rows = cur.fetchall()
    
    print("Custos encontrados:")
    for row in rows:
        print(row)
        
except Exception as e:
    print(f"Ocorreu um erro: {e}")
finally:
    if 'conn' in locals() and conn is not None:
        cur.close()
        conn.close()

## Montar custo por Pet com base no histórico de Compras

In [None]:
import psycopg2
import os
from datetime import datetime

# Database connection details
DB_HOST = "localhost"
DB_NAME = "postgres"
DB_USER = "postgres"
DB_USER_PWD = "postgres"

pipeline_name = 'ltv_by_pet_profile'
start_time = datetime.now()
status = 'RUNNING'

try:
    conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
    cur = conn.cursor()

    # Insert a new record into the execution_history table
    cur.execute(
        "INSERT INTO execution_history (target_table, start_time, status) VALUES (%s, %s, %s) RETURNING execution_id",
        (pipeline_name, start_time, status)
    )
    execution_id = cur.fetchone()[0]
    conn.commit()

    # Store the execution_id for later use
    os.environ['EXECUTION_ID'] = str(execution_id)

    print(f"Execution started for pipeline: {pipeline_name}")
    print(f"Execution ID: {execution_id}")

except Exception as e:
    print(f"An error occurred: {e}")
finally:
    if 'conn' in locals() and conn is not None:
        cur.close()
        conn.close()

## 1. Ingestão de histórico de Compras com Sqoop

Importar os dados de compras do PostgreSQL para o HDFS. O comando `hdfs dfs -rm` é usado para remover o diretório de destino antes da importação, garantindo que possamos executar esta célula várias vezes sem erros.

A query Sqoop já realiza um JOIN para obter o perfil do pet e o valor da compra, simplificando o MapReduce posterior.

In [None]:
%%bash
INPUT_DIR=/petshop/input_ltv

# Remove o diretório de entrada se ele já existir
hdfs dfs -test -d $INPUT_DIR
if [ $? -eq 0 ]; then
    echo "Removendo diretório HDFS existente: $INPUT_DIR"
    hdfs dfs -rm -r $INPUT_DIR
fi

echo "Iniciando importação com Sqoop..."
sqoop import \
    --connect jdbc:postgresql://localhost:5432/postgres \
    --username postgres \
    --password postgres \
    --query "SELECT \
                CONCAT(p.species, ';', p.animal_type, ';', p.fur_type) AS perfil_pet, \
                (hc.quantity * hc.price) AS valor_compra \
            FROM purchase hc \
            JOIN pet p ON hc.tutor_id = p.tutor_id \
            WHERE hc.nenabled = TRUE AND p.nenabled = TRUE AND \$CONDITIONS" \
    --target-dir $INPUT_DIR \
    --m 1 \
    --split-by hc.purchase_id

echo "\nImportação concluída. Verificando os dados no HDFS:"
hdfs dfs -ls $INPUT_DIR
hdfs dfs -cat $INPUT_DIR/part-m-00000 | head -n 5

## 2. Execução do Job MapReduce

Com os dados no HDFS, podemos executar nosso job MapReduce. A célula abaixo submete o código Java ou Python para o Hadoop. O resultado será salvo no diretório `/petshop/output_ltv`.

In [None]:
%%bash
INPUT_DIR=/petshop/input_ltv
OUTPUT_DIR=/petshop/output_ltv

# Remove o diretório de saída se ele já existir
hdfs dfs -test -d $OUTPUT_DIR
if [ $? -eq 0 ]; then
    echo "Removendo diretório HDFS de saída existente: $OUTPUT_DIR"
    hdfs dfs -rm -r $OUTPUT_DIR
fi

if [ "$MAPREDUCE_LANG" == "java" ]; then
    JAR_PATH=ltv-by-pet-profile/target/ltv-by-pet-profile-1.0-SNAPSHOT.jar
   
    echo "Executando o job MapReduce (Java)..."
    hadoop jar $JAR_PATH -D log4j.logger.com.petshop.hadoop.LTVMapper=DEBUG $INPUT_DIR $OUTPUT_DIR

elif [ "$MAPREDUCE_LANG" == "python" ]; then
    MAPPER_PATH=ltv-by-pet-profile-python/mapper.py
    REDUCER_PATH=ltv-by-pet-profile-python/reducer.py
 
    echo "Executando o job MapReduce (Python)..."
    hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
        -file $MAPPER_PATH -mapper $MAPPER_PATH \
        -file $REDUCER_PATH -reducer $REDUCER_PATH \
        -input $INPUT_DIR \
        -output $OUTPUT_DIR
fi

echo "\nJob concluído!"

## 3. Verificando resultados

Vamos verificar o resultado processado no HDFS. O arquivo `part-r-00000` deve conter o `perfil_pet` e o `valor_total`.

In [None]:
%%bash
OUTPUT_DIR=/petshop/output_ltv

echo "Conteúdo do diretório de saída:"
hdfs dfs -ls $OUTPUT_DIR

echo "\nResultado do processamento:"
hdfs dfs -cat $OUTPUT_DIR/part-r-00000

## 4. Gravando resultados no PostgreSQL

Com o resultado processado e validado, a próxima etapa é armazená-lo no PostgreSQL para consumo por outras aplicações. A célula abaixo lê o resultado do HDFS e o insere na tabela `ltv_by_pet_profile`.

In [None]:
import psycopg2
import os
from subprocess import Popen, PIPE

# Database connection details
DB_HOST = "localhost"
DB_NAME = "postgres"
DB_USER = "postgres"
DB_USER_PWD = "postgres"

# HDFS output file
output_file = "/petshop/output_ltv/part-r-00000"

try:
    # Connect to the database
    conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
    cur = conn.cursor()

    # Truncate the table before inserting new data
    cur.execute("TRUNCATE TABLE ltv_by_pet_profile;")
    print("Table ltv_by_pet_profile truncated.")

    # Read the HDFS file
    process = Popen(["hdfs", "dfs", "-cat", output_file], stdout=PIPE)
    (stdout, stderr) = process.communicate()
    exit_code = process.wait()

    if exit_code == 0:
        # Decode the output and split into lines
        lines = stdout.decode("utf-8").strip().split('\n')
        
        print(f"Inserting {len(lines)} records into ltv_by_pet_profile...")
        
        # Process each line and insert into the database
        for line in lines:
            if not line:
                continue
                
            pet_profile, total_value = line.split('\t')
            
            # Prepare and execute the INSERT statement
            cur.execute(
                "INSERT INTO ltv_by_pet_profile (pet_profile, total_value) VALUES (%s, %s)",
                (pet_profile, float(total_value))
            )
        
        # Commit the transaction
        conn.commit()
        print("Data successfully inserted into ltv_by_pet_profile.")

    else:
        print(f"Error reading HDFS file: {stderr}")

except Exception as e:
    print(f"An error occurred: {e}")
finally:
    if 'conn' in locals() and conn is not None:
        cur.close()
        conn.close()

In [None]:
import psycopg2
import os
from datetime import datetime
from subprocess import Popen, PIPE

# Database connection details
DB_HOST = "localhost"
DB_NAME = "postgres"
DB_USER = "postgres"
DB_USER_PWD = "postgres"

# HDFS output file
output_file = "/petshop/output_ltv/part-r-00000"

# Get the execution_id from the environment variable
execution_id = os.environ.get('EXECUTION_ID')

if execution_id:
    try:
        # Read the HDFS file to count the number of processed records
        process = Popen(["hdfs", "dfs", "-cat", output_file], stdout=PIPE)
        (stdout, stderr) = process.communicate()
        exit_code = process.wait()

        records_processed = 0
        if exit_code == 0:
            lines = stdout.decode("utf-8").strip().split('\\n')
            records_processed = len(lines)

        end_time = datetime.now()
        status = 'COMPLETED'

        conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
        cur = conn.cursor()

        # Update the execution_history record
        cur.execute(
            "UPDATE execution_history SET end_time = %s, status = %s, records_processed = %s WHERE execution_id = %s",
            (end_time, status, records_processed, execution_id)
        )
        conn.commit()

        print(f"Execution finished for pipeline with ID: {execution_id}")
        print(f"Records processed: {records_processed}")

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if 'conn' in locals() and conn is not None:
            cur.close()
            conn.close()
else:
    print("Execution ID not found. The final status could not be updated.")

## 5. Consultando dados inseridos no PostgreSQL

Execute a célula abaixo para se conectar novamente e fazer uma consulta `SELECT` para verificar se os dados foram inseridos corretamente na tabela `ltv_by_pet_profile`.

In [None]:
try:
    conn = psycopg2.connect(host=DB_HOST, dbname=DB_NAME, user=DB_USER, password=DB_USER_PWD)
    cur = conn.cursor()
    
    cur.execute("SELECT * FROM ltv_by_pet_profile;")
    rows = cur.fetchall()
    
    print("Registros encontrados:")
    for row in rows:
        print(row)
        
except Exception as e:
    print(f"Ocorreu um erro: {e}")
finally:
    if 'conn' in locals() and conn is not None:
        cur.close()
        conn.close()