# ft_pedidos

Esse notebook é responsável por reunir todas as informações úteis de cada pedido do e-commerce


## Configurações Iniciais

### Preparação do Ambiente PySpark

#### Instalando as Dependências

In [55]:
# Instala o Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Baixa o Apache Spark 3.4.1 com Hadoop 3 (versão estável e recente)
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Descompacta o arquivo baixado
!tar xf spark-3.4.1-bin-hadoop3.tgz

# Instala a biblioteca findspark
!pip install -q findspark

#### Editando as Variáveis de Ambiente

In [56]:
import os

# Define a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Define a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

from pyspark.sql import SparkSession

# Cria a SparkSession
spark = SparkSession.builder.master("local[*]").appName("E-commerce Dataset").getOrCreate()

### Importando as Bibliotecas


In [57]:
from pyspark.sql import functions as F

### Carregando os Dados

In [None]:
df_orders = (
    spark.read.csv(
        "/content/olist_orders_dataset.csv",
        header=True,
        inferSchema=True
    )
)

df_order_item = (
    spark.read.csv(
        "/content/olist_order_items_dataset.csv",
        header=True,
        inferSchema=True
    )
    .withColumnRenamed("price", "net_price")
)

df_order_payments = (
    spark.read.csv(
        "/content/olist_order_payments_dataset.csv",
        header=True,
        inferSchema=True
    )
    .withColumn(
        "payment_type",
        F.when(
            F.col("payment_type") == "not_defined",
            "NÃO DEFINIDO"
        )
        .when(
            F.col("payment_type") == "credit_card",
            "CARTÃO DE CRÉDITO"
        )
        .when(
            F.col("payment_type") == "debit_card",
            "CARTÃO DE DÉBITO"
        )
        .otherwise(F.upper(F.col("payment_type")))
    )
    .withColumn(
        "installment_value",
        F.col("payment_value") / F.col("payment_installments")
    )
)

df_order_reviews = (
    spark.read.csv(
        "/content/olist_order_reviews_dataset.csv",
        header=True,
        inferSchema=True
    )
    .drop("review_creation_date", "review_answer_timestamp")
    .dropna(subset=["order_id"])
)

## Transformação dos Dados

In [None]:
df_ft_pedidos = (
    df_orders
    .join(
        df_order_item,
        on="order_id",
        how="left"
    )
    .join(
        df_order_payments,
        on="order_id",
        how="left"
    )
    .join(
        df_order_reviews,
        on="order_id",
        how="left"
    )
    .withColumn(
        "delivery_time_seconds",
        (F.col("order_delivered_customer_date").cast("long") - F.col("order_purchase_timestamp").cast("long")).cast("int")
    )
    .withColumn(
        "delivery_time_hours",
        F.col("delivery_time_seconds") / 3600
    )
    .withColumn(
        "delivery_time_days",
        F.col("delivery_time_seconds") / (3600 * 24)
    )
    .withColumn(
        "freight_percent",
        F.col("freight_value") / F.col("net_price")
    )
    .withColumn(
        "review_comment_message",
        F.regexp_replace(F.col("review_comment_message"), '"', "'")
    )
)

## Salvamento dos Dados

In [60]:
(
    df_ft_pedidos
    .coalesce(1)
    .write
    .option("header", "true")
    .mode("overwrite")
    .csv("/transformed_data/ft_pedidos")
)