In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, monotonically_increasing_id
from pyspark.sql.types import IntegerType, DoubleType 
from datetime import datetime, timedelta
import math
import os

# Iniciando uma SparkSession com Delta Lake
spark = SparkSession.builder \
    .appName("SilverLayer") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Adicionando configurações S3 diretamente na sessão Spark
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY"))
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.getenv("AWS_SECRET_KEY"))
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

# Configurações do bucket S3
silver_bucket = "s3a://engenharia-dados-satc-silver-bucket"
gold_bucket = "s3a://engenharia-dados-satc-gold-bucket"

# Carregar arquivos Delta da camada silver
datas = spark.read.format("delta").load(f'{silver_bucket}/dim_data')
pedidos = spark.read.format("delta").load(f'{silver_bucket}/pedidos')
localizacoes = spark.read.format("delta").load(f'{silver_bucket}/localizacoes')


In [0]:
# Renomear colunas e adicionar IDs sequenciais


pedidos = pedidos.withColumn('sk_pedido', monotonically_increasing_id())
localizacoes = localizacoes.withColumn('sk_loc', col('cep').cast(IntegerType()))

# Calcular volume e massa dos pedidos
pedidos = pedidos.withColumn('volume', (col('comprimento_cm') * col('altura_cm') * col('largura_cm')) / (100*100*100))
pedidos = pedidos.withColumn('massa', col('peso_g') / 1000)

In [0]:
# Selecionar colunas para transações
transacoes = pedidos.select(
    col('sk_pedido').alias('sk_pedido'),
    col('idcliente').alias('idcliente'),
    col('idvendedor').alias('idvendedor'),
    col('pagamento').alias('pagamento'),
    col('parcelas').alias('parcelas')
)

### CRIANDO ENTREGAS ###
entregas = pedidos.select(
    col('sk_pedido').alias('sk_pedido'),
    col('categoria').alias('categoria'),
    col('statuspedido').alias('statuspedido'),
    col('cepcliente').alias('cepcliente'),
    col('cepvendedor').alias('cepvendedor')
)
entregas = entregas.join(localizacoes.withColumnRenamed('sk_loc', 'destino'), entregas.cepcliente == localizacoes.cep, 'left')

entregas = entregas.select(
    col('sk_pedido').alias('sk_pedido'),
    col('categoria').alias('categoria'),
    col('statuspedido').alias('statuspedido'),
    col('destino').alias('destino'),
    col('cepvendedor').alias('cepvendedor')
)

entregas = entregas.join(localizacoes.withColumnRenamed('sk_loc', 'origem'), entregas.cepvendedor == localizacoes.cep, 'left')

entregas = entregas.select(
    col('sk_pedido').alias('sk_pedido'),
    col('categoria').alias('categoria'),
    col('statuspedido').alias('statuspedido'),
    col('origem').alias('origem'),
    col('destino').alias('destino')
)


In [0]:
### CRIANDO PEDIDOS ###

pedidos = pedidos.withColumn('datacompra', to_date(col('datacompra')))
pedidos = pedidos.withColumn('datacliente', to_date(col('datacliente')))
pedidos = pedidos.withColumn('prazo', to_date(col('prazo')))

pedidos = pedidos.withColumn('ped_diasentrega', (col('datacliente').cast('long') - col('datacompra').cast('long')) / (24*60*60))
pedidos = pedidos.withColumn('ped_diasprazo', (col('prazo').cast('long') - col('datacompra').cast('long')) / (24*60*60))

fato = pedidos.select(
    col('sk_pedido').alias('sk_pedido'),
    col('datacompra').alias('datacompra'),
    col('iditem').alias('iditem'),
    col('preco').alias('preco'),
    col('frete').alias('frete'),
    col('volume').alias('volume'),
    col('massa').alias('massa'),
    col('ped_diasentrega').alias('ped_diasentrega'),
    col('ped_diasprazo').alias('ped_diasprazo')
)

datas = datas.withColumn('dt_data', to_date(col('dt_data')))
fato = fato.join(datas, fato.datacompra == datas.dt_data, 'left').select(
    col('sk_pedido').alias('sk_pedido'),
    col('sk_data').alias('sk_data'),
    col('iditem').alias('iditem'),
    col('preco').alias('preco'),
    col('frete').alias('frete'),
    col('volume').alias('volume'),
    col('massa').alias('massa'),
    col('ped_diasentrega').alias('ped_diasentrega'),
    col('ped_diasprazo').alias('ped_diasprazo')
)

In [0]:
entregas = entregas.select(
    col('sk_pedido').alias('sk_pedido'),
    col('categoria').alias('categoria'),
    col('statuspedido').alias('statuspedido'),
    col('origem').alias('origem'),
    col('destino').alias('destino')
)

In [0]:
localizacoes = localizacoes.withColumn('latitude', col('latitude').cast('double'))
localizacoes = localizacoes.withColumn('longitude', col('longitude').cast('double'))

trans = entregas.join(localizacoes, col("origem") == col("sk_loc"), how="left")

trans = trans.select(
    col('sk_pedido').alias('sk_pedido'),
    col('latitude').alias('lat1'),
    col('longitude').alias('long1'),
    col('destino').alias('destino')
)

trans = entregas.join(localizacoes, col("destino") == col("sk_loc"), how="left")

trans = trans.select(
    col('sk_pedido').alias('pedido'),
    col('latitude').alias('lat1'),
    col('longitude').alias('long1'),
    col('latitude').alias('lat2'),
    col('longitude').alias('long2'),
)

pedidos = pedidos.join(trans, col("sk_pedido") == col("pedido"), how="left")

In [0]:
# Função Haversine
def haversine(lat1, lon1, lat2, lon2):
    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
        return None

    R = 6371.0  # Raio da Terra em quilômetros

    lat1_rad = math.radians(lat1)
    lon1_rad = math.radians(lon1)
    lat2_rad = math.radians(lat2)
    lon2_rad = math.radians(lon2)

    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad

    a = math.sin(dlat / 2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    distance = R * c
    return distance

# Registrar a função como UDF
haversine_udf = udf(haversine, DoubleType())

# Aplicar a UDF ao DataFrame
pedidos = pedidos.withColumn('distancia_km', haversine_udf(pedidos['lat1'], pedidos['long1'], pedidos['lat2'], pedidos['long2']))

pedidos = pedidos.drop('lat1', 'long1', 'lat2', 'long2')

In [0]:
### SALVAR TABELAS NA CAMADA GOLD ###

datas.write.format("delta").mode('overwrite').save(f'{gold_bucket}/dim_data')
fato.write.format("delta").mode('overwrite').save(f'{gold_bucket}/fato_pedidos')
entregas.write.format("delta").mode('overwrite').save(f'{gold_bucket}/dim_entregas')
transacoes.write.format("delta").mode('overwrite').save(f'{gold_bucket}/dim_transacao')
localizacoes.write.format("delta").mode('overwrite').save(f'{gold_bucket}/dim_localizacoes')