In [None]:
!pip install "numpy<2" pyspark

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

In [2]:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
print(np.__version__)

1.26.4


In [3]:
# Carregando o Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("spark") \
    .getOrCreate()

In [4]:
# Carregando o Drive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [5]:
# Carregando dados
df_oct_2019= spark.read.parquet('/content/gdrive/MyDrive/oct_2019.parquet')

In [6]:
# Visualizar DF
df_oct_2019.show()

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...|    NULL|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:...|      view|   1004237|205301355563188265

In [62]:
df_oct_2019.count()

42448762

In [7]:
# Observar o schema de dados
df_oct_2019.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: long (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_session: string (nullable = true)



# Funções Uteis

In [56]:
from pyspark.sql.functions import col, coalesce

In [9]:
# Retorna valores nulos
def nulos(df):
  nulos= {}
  for coluna in df.columns:
    nulos[coluna]= df.filter(col(coluna).isNull()).count()
  return nulos

In [None]:
# Exemplo filtro
#df_oct_2019.filter(
#    (col("brand").isNotNull()) &
#    (col("product_id") == 17200506)
#).count()

# Tratando os dados

In [14]:
# Verificando nulos
dic_nulos= nulos(df_oct_2019)
dic_nulos

{'event_time': 0,
 'event_type': 0,
 'product_id': 0,
 'category_id': 0,
 'category_code': 13515609,
 'brand': 6117080,
 'price': 0,
 'user_id': 0,
 'user_session': 2}

In [15]:
# Percentual nulos x dataset
print(f'category_code: {round((dic_nulos["category_code"]/df_oct_2019.count()) * 100,3)}%')
print(f'brand: {round((dic_nulos["brand"]/df_oct_2019.count()) * 100,3)}%')
print(f'user_session: {round((dic_nulos["user_session"]/df_oct_2019.count()) * 100,6)}%')

category_code: 31.84%
brand: 14.411%
user_session: 5e-06%


# Coluna user_session

In [16]:
# Explorando user_session nulos
df_oct_2019.filter(col("user_session").isNull()).show()

+--------------------+----------+----------+-------------------+--------------------+-------+------+---------+------------+
|          event_time|event_type|product_id|        category_id|       category_code|  brand| price|  user_id|user_session|
+--------------------+----------+----------+-------------------+--------------------+-------+------+---------+------------+
|2019-10-06 14:26:...|      cart|   1801723|2053013554415534427|electronics.video.tv|    tcl|135.65|557388939|        NULL|
|2019-10-25 10:36:...|      cart|   1004767|2053013555631882655|electronics.smart...|samsung|246.52|549825742|        NULL|
+--------------------+----------+----------+-------------------+--------------------+-------+------+---------+------------+



In [17]:
# Excluindo user_session (poucos registros; irrelevantes)
df_oct_2019= df_oct_2019.filter(col("user_session").isNotNull())
df_oct_2019.filter(col("user_session").isNull()).show()

+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+



In [18]:
# Verificando nulos
dic_nulos= nulos(df_oct_2019)
dic_nulos

{'event_time': 0,
 'event_type': 0,
 'product_id': 0,
 'category_id': 0,
 'category_code': 13515609,
 'brand': 6117080,
 'price': 0,
 'user_id': 0,
 'user_session': 0}

# Coluna brand

In [20]:
# Explorando brand nulos
df_oct_2019.filter(col("brand").isNull()).show()

+--------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...| NULL| 543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|  23100006|2053013561638126333|                NULL| NULL|357.79|513642368|17566c27-0a8f-450...|
|2019-10-01 00:00:...|      view|  34700031|2061717937420501730|                NULL| NULL|151.87|539512263|f27a45f8-fb98-459...|
|2019-10-01 00:00:...|      view|  13500046|2053013557099889147|furniture.bedroom...| NULL| 60.75|555446365|7f0062d8-ead0-4e0...|
|2019-10-01 00:00:...|      view|  31501072|2053013558031024687|                NULL| NULL

### Observações
A coluna product_id representa o id do produto e não há valores nulos para esta coluna.

Será que o product_id x não se repete tanto para casos onde a coluna brand está preenchida ou não?

Usando product_id, podemos obter parte da marca para os casos onde esta informação não está preenchida em brand.

In [28]:
# Obtendo dataset onde a marca não é preenchida
df_brand_null= df_oct_2019.filter((col("brand").isNull()))
df_brand_null.show()

+--------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...| NULL| 543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|  23100006|2053013561638126333|                NULL| NULL|357.79|513642368|17566c27-0a8f-450...|
|2019-10-01 00:00:...|      view|  34700031|2061717937420501730|                NULL| NULL|151.87|539512263|f27a45f8-fb98-459...|
|2019-10-01 00:00:...|      view|  13500046|2053013557099889147|furniture.bedroom...| NULL| 60.75|555446365|7f0062d8-ead0-4e0...|
|2019-10-01 00:00:...|      view|  31501072|2053013558031024687|                NULL| NULL

In [31]:
# Obtendo lista de produtos onde a marca é nula
product_id_brand_null= [row.product_id for row in df_brand_null.select("product_id").distinct().collect()]
product_id_brand_null[:5]

[4700630, 38900016, 28715827, 31501100, 21401936]

In [32]:
# Testando product_id brand null
if 17200506 in product_id_brand_null:
  print("Está na lista")
else:
  print("Não está na lista")

Está na lista


In [30]:
# Obtendo o tamanho da lista
print(f'Produtos: {len(product_id_brand_null)}')

Produtos: 51219


In [40]:
from pyspark.sql.types import StructType, StructField, IntegerType

In [43]:
# Criando dataframe product_id_brand_not_null
schema_ids = StructType([
    StructField("product_id", IntegerType(), False)
])
df_product_id_brand_not_null= spark.createDataFrame([(id,) for id in product_id_brand_null], schema=schema_ids)
df_product_id_brand_not_null.show()

+----------+
|product_id|
+----------+
|   4700630|
|  38900016|
|  28715827|
|  31501100|
|  21401936|
|  40800000|
|  28703873|
|  10100528|
|  10501017|
|  26200416|
|  14700175|
|  32701311|
|  22200179|
|  15200856|
|  28100933|
|   3200239|
|   9100370|
|  26403215|
|  19900021|
|  17200560|
+----------+
only showing top 20 rows



In [47]:
# Testando df_product_id_brand_not_null
linha= df_product_id_brand_not_null.filter(df_product_id_brand_not_null.product_id== 17200506).first()
linha["product_id"]

17200506

In [49]:
# JOIN usando Spark
resultados_df = df_oct_2019.join(
    df_product_id_brand_not_null,
    "product_id", # Coluna usada para o join
    "inner"       # Tipo do join
  ).where(
    col("brand").isNotNull() # Filtra para remover marcas nulas
  ).select(
    "product_id", "brand" # Seleciona apenas as colunas que nos interessam
  ).distinct() # Adicionado para garantir resultados únicos caso hajam duplicatas

resultados_df.show()

+----------+---------+
|product_id|    brand|
+----------+---------+
|   4502656|  samsung|
|  12720442| cordiant|
|  10200661|   barbie|
|  10503794|   mattel|
|   5600405|    braun|
|  12719511|   viatti|
|   2800042|  kristal|
|  12202775|    stark|
|   1307139|       hp|
|  53000007|     nike|
|  15800219|   berkut|
|   3200302|  redmond|
|  14100591|   madina|
|  51200059|   norfin|
|  25001609|  lidonet|
|   6601076|      amd|
|  35200094|milavitsa|
|  31200822|     ikea|
|  28401184|    karya|
|  35200054|milavitsa|
+----------+---------+
only showing top 20 rows



In [55]:
# Obtendo o tamanho da lista
print(f'Produtos: {resultados_df.count()}')

Produtos: 7648


In [57]:
# Renomeia a coluna no DataFrame de lookup para evitar ambiguidade
resultados_renomeado_df = resultados_df.withColumnRenamed("brand", "brand_nova")

# LEFT JOIN para trazer as novas marcas para junto dos produtos originais
df_com_join = df_oct_2019.join(
    resultados_renomeado_df,
    on="product_id",
    how="left"
)

colunas_originais = df_oct_2019.columns

df_oct_2019_2= df_com_join.withColumn(
    "brand", # Sobrescreve a coluna 'brand' original
    coalesce(df_oct_2019.brand, col("brand_nova"))
).select(colunas_originais) # Seleciona apenas as colunas originais para manter o schema

In [58]:
df_oct_2019_2.show()

+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|   1003306|2053013555631882655|electronics.smart...|  apple| 588.77|555446831|6ec635da-ea15-4a5...|
|2019-10-29 02:11:...|      view|   1003726|2053013555631882655|electronics.smart...|   sony| 128.39|564625734|b3200271-6139-458...|
|2019-10-04 10:59:...|      view|   1004240|2053013555631882655|electronics.smart...|  apple|1062.68|553616201|2188d4a6-10da-4ef...|
|2019-10-17 16:34:...|      view|   1004246|2053013555631882655|electronics.smart...|  apple| 763.96|553690312|12a36156-f10d-40c...|
|2019-10-12 19:40:...|      view|   1004321|2053013555631882655|elect

In [60]:
# Explorando brand nulos
df_oct_2019_2.filter(col("brand").isNull()).show()

+--------------------+----------+----------+-------------------+--------------------+-----+-----+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|brand|price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+-----+-----+---------+--------------------+
|2019-10-29 06:38:...|      view|   1005267|2053013555631882655|electronics.smart...| NULL|  0.0|519697924|67e99dad-32a9-4d8...|
|2019-10-29 06:46:...|      view|   1005267|2053013555631882655|electronics.smart...| NULL|  0.0|543816058|3e0499c0-92ab-46d...|
|2019-10-29 06:47:...|      view|   1005267|2053013555631882655|electronics.smart...| NULL|  0.0|543816058|3e0499c0-92ab-46d...|
|2019-10-29 06:48:...|      view|   1005267|2053013555631882655|electronics.smart...| NULL|  0.0|513856670|3b8c5ffb-6670-4e9...|
|2019-10-29 06:49:...|      view|   1005267|2053013555631882655|electronics.smart...| NULL|  0.0|

In [64]:
# Verificando nulos dataset novo
df_oct_2019_2.filter(col("brand").isNull()).count()

5944657

In [65]:
# Verificando nulos (df original)
dic_nulos= nulos(df_oct_2019)
dic_nulos

{'event_time': 0,
 'event_type': 0,
 'product_id': 0,
 'category_id': 0,
 'category_code': 13515609,
 'brand': 6117080,
 'price': 0,
 'user_id': 0,
 'user_session': 0}

In [66]:
# Percentual nulos x dataset
print(f'category_code: {round((dic_nulos["category_code"]/df_oct_2019.count()) * 100,3)}%')
print(f'brand: {round((dic_nulos["brand"]/df_oct_2019.count()) * 100,3)}%')
print(f'user_session: {round((dic_nulos["user_session"]/df_oct_2019.count()) * 100,6)}%')

category_code: 31.84%
brand: 14.411%
user_session: 0.0%


In [67]:
# Verificando nulos (df novo)
dic_nulos= nulos(df_oct_2019_2)
dic_nulos

{'event_time': 0,
 'event_type': 0,
 'product_id': 0,
 'category_id': 0,
 'category_code': 13522425,
 'brand': 5944657,
 'price': 0,
 'user_id': 0,
 'user_session': 0}

In [68]:
# Percentual nulos x dataset novo
print(f'category_code: {round((dic_nulos["category_code"]/df_oct_2019_2.count()) * 100,3)}%')
print(f'brand: {round((dic_nulos["brand"]/df_oct_2019_2.count()) * 100,3)}%')
print(f'user_session: {round((dic_nulos["user_session"]/df_oct_2019_2.count()) * 100,6)}%')

category_code: 31.851%
brand: 14.002%
user_session: 0.0%
