In [1]:
# Importações
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, when, countDistinct, min, \
max, avg, round, lit, row_number, desc, asc, log1p, percent_rank, upper
from pyspark.sql.window import Window

In [14]:
# Criação do ambiente Spark
spark = SparkSession.builder \
.appName('tabelas') \
.getOrCreate()

In [16]:
# Carregamento dos datasets tratados
df1 = spark.read.parquet(r"C:\Users\fred\Documents\CursoFullStack\Projeto\Web Scraping\Dados\df1_tratado.parquet")
df3 = spark.read.parquet(r"C:\Users\fred\Documents\CursoFullStack\Projeto\Web Scraping\Dados\df3_tratado.parquet")
df4 = spark.read.csv(r"C:\Users\fred\Documents\CursoFullStack\Projeto\Web Scraping\Dados\IDs_URLs.csv", sep=';', header=True)

In [None]:
# Avaliação Ponderada
media_avaliacoes_produtos = df1.select(avg(col("Avaliacao"))).collect()[0][0]
m = 10  # valor mínimo de confiança
df_transicao = df1.withColumn(
    'avaliacao_ponderada',
    (col('Quant_Avaliacoes') / (col('Quant_Avaliacoes') + lit(m))) * col('Avaliacao') +
    ((lit(m) / (col('Quant_Avaliacoes') + lit(m))) * lit(media_avaliacoes_produtos)))

In [None]:
# Escore de Engajamento
df_transicao = df_transicao.withColumn(
    'Escore_Engajamento',
    log1p(col('Quant_Avaliacoes') + col('Quant_Comentarios')))

In [None]:
# Escore Custo-Benefício (evita divisão por zero)
df_transicao = df_transicao.withColumn(
    'Escore_Custo_Beneficio',
    when(col('Preco') > 0, col('avaliacao_ponderada') / col('Preco')).otherwise(lit(None)))

In [None]:
# Faixa de Preço (quantis)
quantis = df_transicao.approxQuantile("Preco", [0.33, 0.66], 0.01)
baixo, medio = quantis[0], quantis[1]
df_transicao = df_transicao.withColumn(
    "Faixa_Preco",
    when(col("Preco") <= baixo, "Baixo")
    .when((col("Preco") > baixo) & (col("Preco") <= medio), "Médio")
    .otherwise("Alto"))

In [None]:
# Produto Destaque (WR ≥ 4.5 e Engajamento ≥ mediana)

# Mediana do engajamento
mediana_engajamento = df_transicao.approxQuantile("Escore_Engajamento", [0.5], 0.01)[0]

df_transicao = df_transicao.withColumn(
    "Produto_Destaque",
    when((col("avaliacao_ponderada") >= 4.5) & (col("Escore_Engajamento") >= mediana_engajamento), True).otherwise(False))

In [None]:
# Tabela Fato
fato_avaliacoes_produto = df_transicao

In [18]:
# Tabela dim_produto
dim_produto = df1.select('ID', 'Produto', 'Categoria', 'Marca')

In [19]:
# Tabela dim_caracteristicas
dim_caracteristicas = df3

In [20]:
# Tabela dim_url
dim_url = df4

In [21]:
# Tabela dim_marca
df1 = df1.withColumn("Marca", upper(col("Marca")))

# Agregações por marca
dim_marca = fato_avaliacoes_produto.groupBy("Marca").agg(
    countDistinct("ID").alias("Qtd_Produtos"),
    avg("avaliacao_ponderada").alias("Media_Avaliacao_Marca"),
    avg("Escore_Engajamento").alias("Engajamento_Marca_Medio"),
    avg("Preco").alias("Preco_Medio_Marca")
)

# Quantis para definir as faixas
quantis = dim_marca.approxQuantile("Preco_Medio_Marca", [0.33, 0.66], 0.01)
baixo, medio = quantis[0], quantis[1]

# Adiciona faixa de preço por marca
dim_marca = dim_marca.withColumn(
    "Faixa_Preco_Marca",
    when(col("Preco_Medio_Marca") <= baixo, "Baixo")
    .when((col("Preco_Medio_Marca") > baixo) & (col("Preco_Medio_Marca") <= medio), "Médio")
    .otherwise("Alto")
)


In [22]:
# Tabela dim_categoria
dim_categoria = fato_avaliacoes_produto.groupBy("Categoria").agg(
    countDistinct("ID").alias("Qtd_Produtos"),
    avg("avaliacao_ponderada").alias("Media_Avaliacao_Categoria"),
    avg("Escore_Engajamento").alias("Engajamento_Categoria_Medio"),
    avg("Preco").alias("Preco_Medio_Categoria")
)

# Definindo faixas de preço por categoria com quantis
quantis = dim_categoria.approxQuantile("Preco_Medio_Categoria", [0.33, 0.66], 0.01)
baixo, medio = quantis[0], quantis[1]

dim_categoria = dim_categoria.withColumn(
    "Faixa_Preco_Categoria",
    when(col("Preco_Medio_Categoria") <= baixo, "Baixo")
    .when((col("Preco_Medio_Categoria") > baixo) & (col("Preco_Medio_Categoria") <= medio), "Médio")
    .otherwise("Alto")
)


In [23]:
fato_avaliacoes_produto.write.parquet(r"C:\Users\fred\Documents\CursoFullStack\Projeto\Web Scraping\Dados\modelo_estrela\fato_avaliacoes_produto")
dim_produto.write.parquet(r"C:\Users\fred\Documents\CursoFullStack\Projeto\Web Scraping\Dados\modelo_estrela\dim_produto")
dim_caracteristicas.write.parquet(r"C:\Users\fred\Documents\CursoFullStack\Projeto\Web Scraping\Dados\modelo_estrela\dim_caracteristicas")
dim_url.write.parquet(r"C:\Users\fred\Documents\CursoFullStack\Projeto\Web Scraping\Dados\modelo_estrela\dim_url")
dim_marca.write.parquet(r"C:\Users\fred\Documents\CursoFullStack\Projeto\Web Scraping\Dados\modelo_estrela\dim_marca")
dim_categoria.write.parquet(r"C:\Users\fred\Documents\CursoFullStack\Projeto\Web Scraping\Dados\modelo_estrela\dim_categoria")