## Carregando e Explorando o Conjunto de Dados

In [16]:
# Inicie uma sessão Spark no seu notebook.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ProductDataAnalysis").config("spark.executor.cores", "2").getOrCreate()

In [17]:
# Carregue os dados do arquivo CSV que já está dentro do seu projeto
file_path = "product+classification+and+clustering/pricerunner_aggregate.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [18]:
df.show(5)

+----------+--------------------+------------+-----------+--------------------+------------+---------------+
|Product ID|       Product Title| Merchant ID| Cluster ID|       Cluster Label| Category ID| Category Label|
+----------+--------------------+------------+-----------+--------------------+------------+---------------+
|         1|apple iphone 8 pl...|           1|          1|Apple iPhone 8 Pl...|        2612|  Mobile Phones|
|         2|apple iphone 8 pl...|           2|          1|Apple iPhone 8 Pl...|        2612|  Mobile Phones|
|         3|apple mq8n2b/a ip...|           3|          1|Apple iPhone 8 Pl...|        2612|  Mobile Phones|
|         4|apple iphone 8 pl...|           4|          1|Apple iPhone 8 Pl...|        2612|  Mobile Phones|
|         5|apple iphone 8 pl...|           5|          1|Apple iPhone 8 Pl...|        2612|  Mobile Phones|
+----------+--------------------+------------+-----------+--------------------+------------+---------------+
only showing top 5 

In [19]:
df.printSchema()

root
 |-- Product ID: integer (nullable = true)
 |-- Product Title: string (nullable = true)
 |--  Merchant ID: integer (nullable = true)
 |--  Cluster ID: integer (nullable = true)
 |--  Cluster Label: string (nullable = true)
 |--  Category ID: integer (nullable = true)
 |--  Category Label: string (nullable = true)



In [20]:
df.describe().show()

+-------+-----------------+--------------------+------------------+------------------+--------------------+------------------+----------------+
|summary|       Product ID|       Product Title|       Merchant ID|        Cluster ID|       Cluster Label|       Category ID|  Category Label|
+-------+-----------------+--------------------+------------------+------------------+--------------------+------------------+----------------+
|  count|            35311|               35311|             35311|             35311|               35311|             35311|           35311|
|   mean|26150.80017558268|                NULL|120.50188326583785|30110.687632749003|                NULL| 2618.142929965167|            NULL|
| stddev|13498.19122018199|                NULL|117.04555721851422|18410.265642128295|                NULL|3.6007080727956833|            NULL|
|    min|                1|10 hd portable te...|                 1|                 1|AEG A71101TSX0 St...|              2612|          

In [21]:
print(spark.sparkContext.getConf().getAll())

[('spark.app.startTime', '1735865577362'), ('spark.executor.id', 'driver'), ('spark.app.submitTime', '1735865577190'), ('spark.executor.cores', '2'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'), ('spark

## Limpeza e Preparação dos Dados

In [22]:
from pyspark.sql.functions import col
df.select([col(c).isNull().alias(c) for c in df.columns]).show()

+----------+-------------+------------+-----------+--------------+------------+---------------+
|Product ID|Product Title| Merchant ID| Cluster ID| Cluster Label| Category ID| Category Label|
+----------+-------------+------------+-----------+--------------+------------+---------------+
|     false|        false|       false|      false|         false|       false|          false|
|     false|        false|       false|      false|         false|       false|          false|
|     false|        false|       false|      false|         false|       false|          false|
|     false|        false|       false|      false|         false|       false|          false|
|     false|        false|       false|      false|         false|       false|          false|
|     false|        false|       false|      false|         false|       false|          false|
|     false|        false|       false|      false|         false|       false|          false|
|     false|        false|       false| 

In [23]:
# Exemplo de exclusão de linhas com valores nulos
df_clean = df.na.drop()

In [24]:
df.printSchema()

root
 |-- Product ID: integer (nullable = true)
 |-- Product Title: string (nullable = true)
 |--  Merchant ID: integer (nullable = true)
 |--  Cluster ID: integer (nullable = true)
 |--  Cluster Label: string (nullable = true)
 |--  Category ID: integer (nullable = true)
 |--  Category Label: string (nullable = true)



In [25]:
# Lista de nomes de colunas conforme o esquema com espaços
col_names = df_clean.schema.names

# Renomear colunas para remover espaços iniciais
for col_name in col_names:
    new_col_name = col_name.strip()  # strip() remove espaços do começo e do fim
    df_clean = df_clean.withColumnRenamed(col_name, new_col_name)

# Verificando o novo esquema
df_clean.printSchema()

root
 |-- Product ID: integer (nullable = true)
 |-- Product Title: string (nullable = true)
 |-- Merchant ID: integer (nullable = true)
 |-- Cluster ID: integer (nullable = true)
 |-- Cluster Label: string (nullable = true)
 |-- Category ID: integer (nullable = true)
 |-- Category Label: string (nullable = true)



In [26]:
# Exemplo de conversão de uma coluna de string para inteiro
df_clean = df_clean.withColumn("Merchant ID", df_clean["Merchant ID"].cast("integer"))

In [27]:
# NOTA: A conversão de tipos de dados é útil quando você precisa alterar o tipo de uma coluna. 
# No nosso DataFrame, a coluna 'Merchant ID' já é do tipo inteiro, conforme mostrado pelo esquema:
df_clean.printSchema()

# Portanto, a conversão para inteiro não é necessária neste caso. Se fosse uma coluna do tipo string
# que contém apenas números, a conversão seria realizada da seguinte maneira:
# df_clean = df_clean.withColumn("Merchant ID", df_clean["Merchant ID"].cast("integer"))

root
 |-- Product ID: integer (nullable = true)
 |-- Product Title: string (nullable = true)
 |-- Merchant ID: integer (nullable = true)
 |-- Cluster ID: integer (nullable = true)
 |-- Cluster Label: string (nullable = true)
 |-- Category ID: integer (nullable = true)
 |-- Category Label: string (nullable = true)



## Análise Exploratória de Dados

In [28]:
#Calculando a Distribuição de Produtos por Categoria
from pyspark.sql.functions import count

# Agrupando por categoria e contando os produtos
categoria_distribuicao = df_clean.groupBy("Category Label").agg(count("Product ID").alias("Count")).orderBy("Count", ascending=False)

# Visualizando o resultado
categoria_distribuicao.show()

+----------------+-----+
|  Category Label|Count|
+----------------+-----+
| Fridge Freezers| 5501|
|   Mobile Phones| 4081|
|Washing Machines| 4044|
|            CPUs| 3862|
|         Fridges| 3584|
|             TVs| 3564|
|     Dishwashers| 3424|
| Digital Cameras| 2697|
|      Microwaves| 2342|
|        Freezers| 2212|
+----------------+-----+



In [29]:
# Identificando os Comerciantes com Mais Ofertas
comerciantes_top = df_clean.groupBy("Merchant ID").agg(count("Product ID").alias("Total Products")).orderBy("Total Products", ascending=False)

comerciantes_top.show()

+-----------+--------------+
|Merchant ID|Total Products|
+-----------+--------------+
|          3|          2547|
|          6|          1591|
|        298|          1523|
|         31|          1350|
|        119|          1239|
|          7|          1204|
|         17|          1193|
|        293|          1177|
|        294|          1000|
|        301|           901|
|         22|           860|
|        131|           760|
|        300|           736|
|         14|           699|
|         48|           663|
|         15|           661|
|        125|           642|
|         98|           593|
|        128|           583|
|         64|           582|
+-----------+--------------+
only showing top 20 rows



In [30]:
# Importando a função necessária
from pyspark.sql.functions import countDistinct

# Contando a quantidade de títulos de produtos únicos em cada categoria
diversidade_categoria = df_clean.groupBy("Category Label").agg(countDistinct("Product Title").alias("Unique Product Titles"))

# Exibindo o resultado
diversidade_categoria.show()

+----------------+---------------------+
|  Category Label|Unique Product Titles|
+----------------+---------------------+
|      Microwaves|                 2107|
|         Fridges|                 3211|
|             TVs|                 3295|
|Washing Machines|                 3426|
|        Freezers|                 1911|
| Digital Cameras|                 2411|
|            CPUs|                 3062|
|     Dishwashers|                 3060|
| Fridge Freezers|                 4830|
|   Mobile Phones|                 3682|
+----------------+---------------------+



In [None]:
produto_iva = df_clean.withColumn('IVA', col("PRICE") * 0.2)

In [None]:
df_combinado = df_produtos.join(df_estoque, df_produtos.ProdutoID == df_estoque.ProdutoID)