## Download file

In [4]:
import gdown
import os

# Caminho da pasta a ser criada
path = "datalake/bronze"

# Criar a pasta
os.makedirs(path, exist_ok=True)

# Verificar se a pasta foi criada com sucesso
os.path.exists(path)


# ID do arquivo do Google Drive (extraída do link)
file_id = "13rvnyK5PJADJQgYe-VbdXb7PpLPj7lPr"
# Construindo a URL de download
download_url = f"https://drive.google.com/uc?id={file_id}"

# Nome do arquivo local para salvar
output_file = "datalake/bronze/challenge-webmedia-e-globo-2023.zip"

# Fazendo o download
gdown.download(download_url, output_file, quiet=False)


Downloading...
From (original): https://drive.google.com/uc?id=13rvnyK5PJADJQgYe-VbdXb7PpLPj7lPr
From (redirected): https://drive.google.com/uc?id=13rvnyK5PJADJQgYe-VbdXb7PpLPj7lPr&confirm=t&uuid=092c99e0-642d-4322-b91b-380d40416284
To: /home/santili/fiap/projetos/1MLET_FASE5_DATATHON/datalake/bronze/challenge-webmedia-e-globo-2023.zip
100%|██████████| 672M/672M [00:17<00:00, 38.8MB/s] 


'datalake/bronze/challenge-webmedia-e-globo-2023.zip'

## UNZIP DATA

In [8]:
import zipfile

def unzip_file(file_path, extract_to):
    with zipfile.ZipFile(file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
        print(f"Arquivo descompactado em: {extract_to}")
        
destination = "datalake/bronze/challenge-webmedia-e-globo-2023.zip"
extract_to = "datalake/bronze"

unzip_file(destination, extract_to)

Arquivo descompactado em: datalake/bronze


## START SPARK SESSION

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, arrays_zip, from_unixtime, year, month, dayofmonth, to_timestamp, regexp_replace

spark = SparkSession.builder \
    .appName("Spark Init") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
25/02/05 21:32:37 WARN Utils: Your hostname, ProfessorX resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/02/05 21:32:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/05 21:32:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## ETL_TREINO_BRONZE_TO_SILVER

In [12]:
# Lendo o arquivo CSV
file_path = "datalake/bronze/files/treino/"
df = spark.read.option("header", "true").csv(file_path)

# Colunas que precisam ser divididas em listas
cols_to_split = [
    "history", "timestampHistory", "numberOfClicksHistory", 
    "timeOnPageHistory", "scrollPercentageHistory", 
    "pageVisitsCountHistory"
]

# Convertendo as colunas de string para arrays
for col_name in cols_to_split:
    df = df.withColumn(col_name, split(col(col_name), ",\\s*"))

# Unindo todas as colunas de array em uma única estrutura de arrays_zip
df = df.withColumn("zipped", arrays_zip(*[col(c) for c in cols_to_split]))

# Explodindo a coluna "zipped" para criar múltiplas linhas
df_exploded = df.withColumn("exploded", explode(col("zipped")))

# Extraindo os valores das colunas descompactadas
df_normalized = df_exploded.select(
    col("userId"),
    col("userType"),
    col("exploded.history").alias("history"),
    from_unixtime(col("exploded.timestampHistory").cast("long") / 1000, "yyyy-MM-dd HH:mm:ss").alias("timestampHistory"),
    col("exploded.numberOfClicksHistory").cast("int").alias("numberOfClicksHistory"),
    col("exploded.timeOnPageHistory").cast("int").alias("timeOnPageHistory"),
    col("exploded.scrollPercentageHistory").cast("float").alias("scrollPercentageHistory"),
    col("exploded.pageVisitsCountHistory").alias("pageVisitsCountHistory")
)

# Extraindo ano, mês e dia para particionamento
df_partitioned = df_normalized \
    .withColumn("year", year(col("timestampHistory"))) \
    .withColumn("month", month(col("timestampHistory"))) \
    .withColumn("day", dayofmonth(col("timestampHistory")))

# Salvando os dados no formato parquet.snappy particionado por ano, mês e dia
output_path = "datalake/silver/treino/"
df_partitioned.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .partitionBy("year", "month", "day") \
    .parquet(output_path)

print("Arquivos parquet salvos com sucesso!")


25/02/05 21:34:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/05 21:34:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/05 21:34:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/02/05 21:34:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/02/05 21:34:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/02/05 21:34:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/02/05 21:34:42 WARN MemoryManager: Total allocation exceeds 95.0

Arquivos parquet salvos com sucesso!


## ETL_ITENS_BRONZE_TO_SILVER

In [None]:
# Lendo o CSV com as opções necessárias para tratar corretamente as colunas
file_path = "datalake/bronze/itens/itens/"

df = spark.read \
    .option("header", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("multiLine", "true") \
    .option("inferSchema", "true") \
    .csv(file_path)

# Remover o fuso horário (+00:00) das colunas 'issued' e 'modified'
df = df.withColumn("issued", regexp_replace(col("issued"), r"\+00:00", "")) \
       .withColumn("modified", regexp_replace(col("modified"), r"\+00:00", ""))

# Converter para datetime usando to_timestamp
df = df.withColumn("issued", to_timestamp(col("issued"), "yyyy-MM-dd HH:mm:ss")) \
       .withColumn("modified", to_timestamp(col("modified"), "yyyy-MM-dd HH:mm:ss"))

# Removendo a coluna 'url' como solicitado
df = df.drop("url")

# Adicionando colunas de partição para ano, mês e dia com base na coluna 'issued'
df = df.withColumn("year", year(col("issued"))) \
       .withColumn("month", month(col("issued"))) \
       .withColumn("day", dayofmonth(col("issued")))

# Caminho de saída onde os arquivos parquet serão salvos
output_path = "datalake/silver/itens/"

# Salvando em formato parquet.snappy, particionado por year, month e day
df.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .partitionBy("year", "month", "day") \
    .parquet(output_path)

print("Arquivos parquet salvos com sucesso!")


[Stage 9:>                                                          (0 + 3) / 3]