In [0]:
import datetime
import numpy as np
import pandas as pd
import random

from pyspark.sql.functions import col, date_format

random.seed(10)

#User IDs possible
user_ids = np.array([1, 2, 3, 4])
#Dates
start_date = datetime.datetime(2022, 1, 1)
end_date = datetime.datetime(2022, 2, 19)
time_between_dates = end_date - start_date
seconds_between_dates = time_between_dates.total_seconds()
# Product Identifiers
random_product = random.randrange(1000000, 9999999)

def generate_random_rows(n_rows: int):
    rows = []
    for i in range(n_rows):
        random_row = [
            start_date + datetime.timedelta(seconds=random.randrange(seconds_between_dates)),
            random.randrange(1, 4),
            random.randrange(1000000, 1000100)
        ]
        rows.append(random_row)
    return rows

n_rows = 1000000

data = generate_random_rows(n_rows=n_rows)

columns = ["event_time", "user_id", "product_id"]

# criar spark dataframe
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df = spark_df.withColumn("partition_date", date_format(col("event_time"), "yyyyMMdd"))



In [0]:
display(spark_df)

event_time,user_id,product_id,partition_date
2022-02-11T15:23:02.000+0000,2,1000073,20220211
2022-01-02T10:33:43.000+0000,1,1000059,20220102
2022-02-17T16:48:13.000+0000,2,1000083,20220217
2022-01-16T13:21:42.000+0000,1,1000066,20220116
2022-02-17T13:58:55.000+0000,2,1000009,20220217
2022-01-25T06:32:04.000+0000,3,1000046,20220125
2022-01-05T07:49:26.000+0000,2,1000017,20220105
2022-02-04T11:24:01.000+0000,2,1000053,20220204
2022-01-28T12:52:42.000+0000,3,1000033,20220128
2022-02-14T08:29:37.000+0000,1,1000087,20220214


In [0]:
#Escrever spark_df (spark Dataframe) em dbfs:/FileStore/clickstream e particionado por partition_date.
spark_df.write.mode("append").partitionBy("partition_date").parquet("dbfs:/FileStore/clickstream")



In [0]:
#Lista todas as partições escritas em dbfs:/FileStore/clickstream.
display(dbutils.fs.ls("dbfs:/FileStore/clickstream"))


path,name,size,modificationTime
dbfs:/FileStore/clickstream/_SUCCESS,_SUCCESS,0,1677884068000
dbfs:/FileStore/clickstream/partition_date=20220101/,partition_date=20220101/,0,0
dbfs:/FileStore/clickstream/partition_date=20220102/,partition_date=20220102/,0,0
dbfs:/FileStore/clickstream/partition_date=20220103/,partition_date=20220103/,0,0
dbfs:/FileStore/clickstream/partition_date=20220104/,partition_date=20220104/,0,0
dbfs:/FileStore/clickstream/partition_date=20220105/,partition_date=20220105/,0,0
dbfs:/FileStore/clickstream/partition_date=20220106/,partition_date=20220106/,0,0
dbfs:/FileStore/clickstream/partition_date=20220107/,partition_date=20220107/,0,0
dbfs:/FileStore/clickstream/partition_date=20220108/,partition_date=20220108/,0,0
dbfs:/FileStore/clickstream/partition_date=20220109/,partition_date=20220109/,0,0


In [0]:
#Ler ficheiros parquet escritos no exercício anterior para a partition_date = 20220202 (1) 
parquet_file = "dbfs:/FileStore/clickstream/partition_date=20220202"
df_partition = spark.read.parquet(parquet_file)

from pyspark.sql.functions import col, date_format, lit

parquet_file = "dbfs:/FileStore/clickstream/partition_date=20220202"
df_partition = spark.read.parquet(parquet_file)
df_partition = df_partition.withColumn("partition_date", lit("20220202"))



In [0]:
#e mostrar o resultado (2).
display(df_partition)


event_time,user_id,product_id,partition_date
2022-02-02T15:00:07.000+0000,2,1000015,20220202
2022-02-02T13:57:24.000+0000,1,1000062,20220202
2022-02-02T06:17:11.000+0000,3,1000014,20220202
2022-02-02T02:26:18.000+0000,2,1000048,20220202
2022-02-02T12:02:41.000+0000,1,1000017,20220202
2022-02-02T03:37:12.000+0000,2,1000098,20220202
2022-02-02T22:09:05.000+0000,3,1000019,20220202
2022-02-02T23:10:51.000+0000,1,1000077,20220202
2022-02-02T07:25:48.000+0000,3,1000027,20220202
2022-02-02T15:34:28.000+0000,3,1000097,20220202


In [0]:
# 1) calcular produto com mais clicks
# Ler os dados do diretório
parquet_dir = "dbfs:/FileStore/clickstream"
df = spark.read.parquet(parquet_dir)

# Agregar os dados por product_id e contar os cliques
click_count = df.groupBy("product_id").count()

# Ordenar em ordem decrescente de cliques e pegar o produto com mais cliques
most_clicked_product = click_count.orderBy(col("count").desc()).first()["product_id"]

print("O produto com mais cliques é:", most_clicked_product)


O produto com mais cliques é: 1000069


In [0]:
#Produto com mais cliques: 1000069 

In [0]:
# 2) calcular dia com mais clicks
from pyspark.sql.functions import desc

most_clicked_day = (
    spark_df
    .groupBy("partition_date")
    .agg({"product_id": "count"})
    .withColumnRenamed("count(product_id)", "clicks")
    .orderBy(desc("clicks"))
    .first()
)

print(f"O dia com mais cliques foi {most_clicked_day['partition_date']} com um total de {most_clicked_day['clicks']} cliques.")


O dia com mais cliques foi 20220124 com um total de 20713 cliques.


In [0]:
#O dia com mais cliques foi 20220124 com um total de 20713 cliques.