<img src="https://www.iscte-iul.pt/assets/images/logo_iscte_detailed.svg" style="width: 450px;margin-top:30px;" align ="center">

<div style= "font-size: 40px;  margin-top:40px; font-weight:bold; font-family: 'Avenir Next LT Pro', sans-serif;"><center>Data Joining: <strong>E-Commerce</strong></center></div>
<div style= "font-size: 35px; font-weight:bold; font-family: 'Avenir Next LT Pro', sans-serif;"><center>Merging the 2 csv files to a unique parquet</center></div>

<div style= "font-size: 27px;font-weight:bold;line-height: 1.1; margin-top:40px; font-family: 'Avenir Next LT Pro', sans-serif;"><center>Processamento e Modelação de Big Data 2024/2025</center></div> <br>

   <div style= "font-size: 20px;font-weight:bold; font-family: 'Avenir Next LT Pro', sans-serif;"><center> Grupo 7:</center></div>
   <div><center> Diogo Freitas | 104841 </center></div>
   <div><center> João Francisco Botas | 104782 </center></div>
   <div><center> Miguel Gonçalves | 105944 </center></div>
   <div><center> Ricardo Galvão | 105285 </center></div>

--- 
## Spark Session

Iniciar a sessão do Spark com o nome de `Projeto`.

In [1]:
# Basic imports
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import (
    StructType, StructField,
    StringType, LongType, DoubleType, TimestampType
)

# Create a Spark session
spark = SparkSession.builder \
    .appName("PMBD") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [2]:
print(spark.sparkContext._jsc.sc().isStopped())  # False -> tudo bem; True -> Spark está desligado

False


---
## Read Data

Primeiro vamos definir o schema ao ler os dados para ser mais eficiente a leitura.

NOTA: **Ajustar a pasta de `data` consoante o caminho**

In [3]:
# read csv files on data folder
data_dir = "../data/"
dataRaw_dir = data_dir + "raw/"

schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", LongType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", LongType(), True),
    StructField("user_session", StringType(), True)
])

# ec_total = spark.read.csv(data_dir, header=True, schema=schema)

ec_oct = spark.read \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .csv(dataRaw_dir + "2019-Oct.csv", header=True, schema=schema)

ec_nov = spark.read \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .csv(dataRaw_dir + "2019-Nov.csv", header=True, schema=schema)

In [4]:
ec_nov.show(5)

+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 00:00:01|      view|  17302664|2053013553853497655|                NULL| creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|    lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 00:00:01|      view|   1004775|2053013555631882655|electronics.smart...|xiaomi

In [5]:
# count rows
print(f"Number of rows in October 2019 file: {ec_oct.count()}")
print(f"Number of rows in November 2019 file: {ec_nov.count()}")

Number of rows in October 2019 file: 42448764
Number of rows in November 2019 file: 67501979


- Number of rows in October 2019 file: 42448764
- Number of rows in November 2019 file: 67501979

---
## Data Joining

Juntar os dados dos dois ficheiros e depois escrever num parquet para ser utilizado mais para a frente. 

In [6]:
# merge the two datasets
ec_total = ec_oct.union(ec_nov)
print(f"Number of rows in total dataset file: {ec_total.count()}")

Number of rows in total dataset file: 109950743


In [7]:
import os

output_path = dataRaw_dir + "ec_total.parquet"
if not os.path.exists(output_path):
    ec_total.write.format("parquet").mode("overwrite").save(output_path)
else:
    print("O diretório já existe. Não foi sobrescrito.")

O diretório já existe. Não foi sobrescrito.


In [8]:
spark.sparkContext.setLogLevel("INFO")

---
Ler o parquet para ver se está tudo bem.

In [9]:
# read parquet file and select 5% of the data
ec_total = spark.read.parquet(dataRaw_dir + "ec_total.parquet")
# TODO: talvez ajustar porque a ordem pode ser aleatoria nos 5%
ec_5p = ec_total.sample(fraction=0.05, seed=42)
ec_5p.show(5)

+-------------------+----------+----------+-------------------+--------------------+-------------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|        brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+-------------+------+---------+--------------------+
|2019-11-17 08:43:00|      view|   1005253|2053013555631882655|electronics.smart...|       xiaomi|288.04|516404307|a383cb03-2673-446...|
|2019-11-17 08:43:01|      view|  60000003|2162513074060264222|                NULL|geoffanderson|  44.4|515817144|505cf403-f7ce-4ab...|
|2019-11-17 08:43:01|      view|   4700388|2053013560899928785|auto.accessories....|    prestigio| 32.18|572492652|4879a14c-58b3-43a...|
|2019-11-17 08:43:01|      cart|  28718385|2053013565228450757|       apparel.shoes|       rieker|103.99|518296473|8af4a493-9188-43a...|
|2019-11-17 08:43:01|      cart|  2650014

In [10]:
# count rows
print(f"Number of rows in the 5% sample: {ec_5p.count()}")

Number of rows in the 5% sample: 5495486


In [11]:
print(f"Number of rows in the parquet file: {ec_total.count()}")
print(f"Number of rows of each : {42448764 + 67501979}")

Number of rows in the parquet file: 109950743
Number of rows of each : 109950743


In [12]:
# show 10 rows
ec_5p.show(10)

+-------------------+----------+----------+-------------------+--------------------+-------------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|        brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+-------------+------+---------+--------------------+
|2019-11-17 08:43:00|      view|   1005253|2053013555631882655|electronics.smart...|       xiaomi|288.04|516404307|a383cb03-2673-446...|
|2019-11-17 08:43:01|      view|  60000003|2162513074060264222|                NULL|geoffanderson|  44.4|515817144|505cf403-f7ce-4ab...|
|2019-11-17 08:43:01|      view|   4700388|2053013560899928785|auto.accessories....|    prestigio| 32.18|572492652|4879a14c-58b3-43a...|
|2019-11-17 08:43:01|      cart|  28718385|2053013565228450757|       apparel.shoes|       rieker|103.99|518296473|8af4a493-9188-43a...|
|2019-11-17 08:43:01|      cart|  2650014

In [13]:
ec_5p.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: long (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_session: string (nullable = true)



Produtos com mais de 2 categorias

In [14]:
from pyspark.sql.functions import countDistinct, collect_list, collect_set

# 1. Filtrar produtos com mais de 2 categorias distintas
produtos_com_mais_de_2_cat = ec_total.groupBy("product_id") \
    .agg(
        countDistinct("category_id").alias("num_categorias"),
        collect_set("category_id").alias("lista_category_id"),
        collect_set("category_code").alias("lista_category_code")
    ) \
    .filter("num_categorias > 2") \
    .orderBy("num_categorias", ascending=False)

# 2. Converter para pandas para visualização
produtos_com_mais_de_2_cat.toPandas()

Unnamed: 0,product_id,num_categorias,lista_category_id,lista_category_code
