In [5]:
# Lendo os dados da camada bronze
review_toys_df_silver = spark.read \
  .format("bigquery") \
  .load("pdm-class-rabelo-2024.projeto.review_toys_df")

review_toys_df_silver.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   20337513|R35M56CNG0733J|B004Z7H07K|     340753120|LeapFrog LeapPad1...|            Toys|          5|           23|         30|NULL|             NULL|Premium education...|Please check the ...| 2011-12-04|
|         US|   28319674|R1OZ5UHX1458MK|B0051OLQYY|     181355347|Yo Gabba! Foil Ba...|            Toys|          5|    

In [6]:
# Verificando duplicatas e tratando

#Contar o número total de linhas
total_rows = review_toys_df_silver.count()

#Contar o número de linhas sem duplicatas
unique_rows = review_toys_df_silver.dropDuplicates().count()

#Verificar se há duplicatas
if total_rows == unique_rows:
    print("Não há dados duplicados.")
else:
    print(f"Existiam {total_rows - unique_rows} linhas duplicadas.")

#Limpando as linhas duplicadas
review_toys_df_silver = review_toys_df_silver.dropDuplicates()



Não há dados duplicados.


                                                                                

In [8]:
# Analisando valores nulos
from pyspark.sql.functions import col, count, when, isnan

# Número total de linhas no DataFrame
total_rows = review_toys_df_silver.count()

# Contar valores nulos em cada coluna (em número absoluto)
null_count_df = review_toys_df_silver.select([
    count(when(col(c).isNull() | (col(c).cast("string").isNotNull() & isnan(col(c))), c)).alias(c)
    if dict(review_toys_df_silver.dtypes)[c] in ("float", "double")
    else count(when(col(c).isNull(), c)).alias(c)
    for c in review_toys_df_silver.columns
])

# Mostrar o número absoluto de nulos
null_count_df.show()

# Para calcular a porcentagem de nulos em cada coluna
for col_name in review_toys_df_silver.columns:
    # Verificar se a coluna é numérica (float/double)
    if dict(review_toys_df_silver.dtypes)[col_name] in ("float", "double"):
        null_count = review_toys_df_silver.filter(col(col_name).isNull() | isnan(col(col_name))).count()
    else:
        null_count = review_toys_df_silver.filter(col(col_name).isNull()).count()
    
    percentage_null = (null_count / total_rows) * 100
    print(f"Coluna: {col_name}, Nulos: {null_count}, Porcentagem: {percentage_null:.2f}%")

                                                                                

+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+-------+-----------------+---------------+-----------+-----------+
|marketplace|customer_id|review_id|product_id|product_parent|product_title|product_category|star_rating|helpful_votes|total_votes|   vine|verified_purchase|review_headline|review_body|review_date|
+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+-------+-----------------+---------------+-----------+-----------+
|          0|          0|        0|         0|             0|            0|               6|          6|            6|          6|4864249|          4864249|             13|        736|        374|
+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+-------+-----------------+---------------+-----------+-----------+

Coluna: market

                                                                                

Coluna: vine, Nulos: 4864249, Porcentagem: 100.00%


                                                                                

Coluna: verified_purchase, Nulos: 4864249, Porcentagem: 100.00%


                                                                                

Coluna: review_headline, Nulos: 13, Porcentagem: 0.00%


                                                                                

Coluna: review_body, Nulos: 736, Porcentagem: 0.02%




Coluna: review_date, Nulos: 374, Porcentagem: 0.01%


                                                                                

In [9]:
# Como o número de valores nulo sé basicamente nulo, vamos excluí-los uma vez que não prejudicará nosso modelo
# Remover linhas com valores nulos nas colunas especificadas
columns_to_check = ["product_category", "star_rating", "helpful_votes", "total_votes", "review_headline", "review_body", "review_date"]
cleaned_df = review_toys_df_silver.dropna(subset=columns_to_check)

# Verificar o número de registros antes e depois da limpeza
print(f"Registros antes da limpeza: {review_toys_df_silver.count()}")
print(f"Registros após a limpeza: {cleaned_df.count()}")

review_toys_df_silver = cleaned_df

                                                                                

Registros antes da limpeza: 4864249




Registros após a limpeza: 4863497


                                                                                

In [10]:
# Excluindo as colunas que estão com 100% de valores nulos uma vez que são irrelevantes para nosso caso
# Remover as colunas 'vine' e 'verified_purchase'
review_toys_df_silver = review_toys_df_silver.drop("vine", "verified_purchase")

# Exibir o esquema para confirmar a remoção
review_toys_df_silver.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: long (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: long (nullable = true)
 |-- helpful_votes: long (nullable = true)
 |-- total_votes: long (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



In [11]:
# Filtrar registros inconsistentes
inconsistent_df = review_toys_df_silver.filter((col("star_rating") < 1) | (col("star_rating") > 5))

# Verificar se existem avaliações fora do padrão
if inconsistent_df.count() > 0:
    print("Há registros inconsistentes:")
    inconsistent_df.show()
else:
    print("Não há registros inconsistentes.")

# Manter apenas registros consistentes
review_toys_df_silver = review_toys_df_silver.filter((col("star_rating") >= 1) & (col("star_rating") <= 5))

Não há registros inconsistentes.


In [12]:
# Criar coluna categórica de rating
review_toys_df_silver = review_toys_df_silver.withColumn(
    "rating_category",
    when(col("star_rating") >= 4, "Positive")
    .when(col("star_rating") == 3, "Neutral")
    .otherwise("Negative")
)

# Exibir resultado
review_toys_df_silver.select("star_rating", "rating_category").show(5)



+-----------+---------------+
|star_rating|rating_category|
+-----------+---------------+
|          5|       Positive|
|          5|       Positive|
|          5|       Positive|
|          5|       Positive|
|          5|       Positive|
+-----------+---------------+
only showing top 5 rows



                                                                                

In [13]:
from pyspark.sql.functions import lower, trim

# Uniformizar categorias
review_toys_df_silver = review_toys_df_silver.withColumn(
    "product_category",
    trim(lower(col("product_category")))
)

# Exibir resultado
review_toys_df_silver.select("product_category").distinct().show()



+----------------+
|product_category|
+----------------+
|            toys|
+----------------+



                                                                                

In [14]:
# Salvar 'review_toys_df_silver'
review_toys_df_silver.write \
  .format("bigquery") \
  .option("writeMethod", "direct") \
  .save("projeto.review_toys_df_silver")

                                                                                