In [None]:
# Notebook para tratamento dos dados
# Calcular o tamanho dos dados das colunas title e content
# Retirar os registros que são outliers, considerando o intervalo de confiança de 90%, ou seja definindo o limite inferior e superior para ambas as colunas.
# Limpar o texto retirando tags HTML, URL's e Emails
# Limpar termos inuteis
# Salvar em Parquet


In [None]:
from google.colab import drive
drive.mount('/content/drive')

file_path = '/content/drive/MyDrive/Fiap/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql.functions import col, length, udf, lower
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession

import re

spark = SparkSession.builder.appName("AmazonDataSpark").getOrCreate()

In [None]:
# Carregar arquivos formato parquet para um dataframe pyspark
df = spark.read.parquet(file_path + 'amazon_data_parquet')
df.createOrReplaceTempView('df')

In [None]:
df_select = spark.sql("""
        SELECT length(title) as len_title, length(content) as len_content, title, content
        FROM df
        WHERE 1=1
    """)

df_select.select('len_title', 'len_content').summary().show()

+-------+-----------------+-----------------+
|summary|        len_title|      len_content|
+-------+-----------------+-----------------+
|  count|          1364345|          1364345|
|   mean|61.03971136332819|618.7584518578512|
| stddev|39.99977198169389|895.7781877064737|
|    min|                4|                5|
|    25%|               37|              194|
|    50%|               54|              398|
|    75%|               75|              805|
|    max|             1999|           197622|
+-------+-----------------+-----------------+



In [None]:
# 1. Definir os quantis que marcam os 90% centrais
# Queremos os limites inferior (5%) e superior (95%)
quantis = [0.05, 0.95]

# 2. Calcular os percentis para ambas as colunas de uma vez
# O terceiro parâmetro (0.001) é o erro relativo de aproximação.
# Um valor baixo como 0.001 garante alta precisão no cálculo.
limites = df_select.approxQuantile(['len_title', 'len_content'], quantis, 0.001)

# 3. Extrair os limites calculados para variáveis legíveis
limite_inferior_titulo = limites[0][0]
limite_superior_titulo = limites[0][1]

limite_inferior_conteudo = limites[1][0]
limite_superior_conteudo = limites[1][1]

print("--- Limites Calculados para os 90% Centrais ---")
print(f"Tamanho do Título: Manter registros entre {limite_inferior_titulo} e {limite_superior_titulo} caracteres.")
print(f"Tamanho da Descrição: Manter registros entre {limite_inferior_conteudo} e {limite_superior_conteudo} caracteres.")
print("-------------------------------------------------")


# 4. Aplicar o filtro para criar o DataFrame final com os 90% centrais
df_filtrado_90_central = df_select.filter(
    (col('len_title') >= limite_inferior_titulo) &
    (col('len_title') <= limite_superior_titulo) &
    (col('len_content') >= limite_inferior_conteudo) &
    (col('len_content') <= limite_superior_conteudo)
)

# 5. (Opcional) Verifique a contagem antes e depois para ver o impacto do filtro
print(f"Contagem original: {df.count()}")
print(f"Contagem após filtrar pelos 90% centrais: {df_filtrado_90_central.count()}")

# Agora 'df_filtrado_90_central' é o seu dataset de alta qualidade, pronto para a amostragem
# e formatação para o fine-tuning.

--- Limites Calculados para os 90% Centrais ---
Tamanho do Título: Manter registros entre 17.0 e 125.0 caracteres.
Tamanho da Descrição: Manter registros entre 47.0 e 1728.0 caracteres.
-------------------------------------------------
Contagem original: 1364345
Contagem após filtrar pelos 90% centrais: 1110290


In [None]:
# Crie uma função de limpeza em Python otimizada para o inglês
def clean_text_en(text):
    if text is None:
        return None

    # 1. Remover tags HTML
    text = re.sub(r'<[^>]+>', '', text)

    # 2. Remover URLs
    text = re.sub(r'https?://\S+|www\.\S+', '', text)

    # 3. Remover e-mails
    text = re.sub(r'\S+@\S+', '', text)

    # 4. ALTERAÇÃO: Remover caracteres que não são letras do alfabeto inglês, números ou pontuação comum.
    # A regex foi simplificada para não incluir 'ç' ou vogais acentuadas.
    text = re.sub(r'[^a-zA-Z0-9.,!?;:()\s]', '', text)

    # 5. Normalizar espaços em branco
    text = re.sub(r'\s+', ' ', text).strip()

    return text

# Registre a função como uma UDF do Spark
clean_text_en_udf = udf(clean_text_en, StringType())

# Aplique a UDF nas colunas de título e descrição
# (Usando o DataFrame da etapa anterior, 'df_filtrado_95_central')
df_cleaned_en = df_filtrado_90_central.withColumn("title_clear", clean_text_en_udf(col("title"))) \
                                      .withColumn("content_clear", clean_text_en_udf(col("content")))

print("Etapa 1: Limpeza com Regex para Inglês concluída.")
df_cleaned_en.select("title", "title_clear", "content", "content_clear").show(5, truncate=False)

Etapa 1: Limpeza com Regex para Inglês concluída.
+--------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# ALTERAÇÃO: Lista de termos inúteis atualizada para o inglês.
useless_terms_en = [
    "click here",
    "read more",
    "learn more",
    "coming soon",
    "to be updated",
    "lorem ipsum",
    "for illustrative purposes only",
    "image for display only",
    "price on request",
    "contact us for price",
    "all rights reserved",
    "subscribe to our newsletter"
]

# A lógica do filtro permanece a mesma, apenas usando a nova lista
filtro_termos_inuteis_en = " AND ".join([f"NOT lower(content_clear) LIKE '%{termo}%'" for termo in useless_terms_en])

df_sem_boilerplate_en = df_cleaned_en.filter(filtro_termos_inuteis_en)

print("\nEtapa 2: Remoção de texto padrão em Inglês concluída.")
print(f"Registros antes da remoção de boilerplate: {df_cleaned_en.count()}")
print(f"Registros após a remoção de boilerplate: {df_sem_boilerplate_en.count()}")


Etapa 2: Remoção de texto padrão em Inglês concluída.
Registros antes da remoção de boilerplate: 1110290
Registros após a remoção de boilerplate: 1084833


In [None]:
# Manter os registros com a maior descrição
# seja o DataFrame com os dados limpos, mas ainda com títulos duplicados.

# Vamos chamar nossa tabela de "produtos" para a query.
df_sem_boilerplate_en.createOrReplaceTempView("produtos")


# Passo 2: Escrever a query SQL usando uma Common Table Expression (CTE) para clareza
# Esta query faz exatamente o mesmo que o código PySpark anterior.
query_sql = """
WITH RankedDescriptions AS (
    -- Primeiro, selecionamos todas as colunas e adicionamos um ranking
    SELECT
        title, title_clear,
        content, content_clear,
        -- A função ROW_NUMBER() cria um número de linha para cada grupo
        ROW_NUMBER() OVER (
            -- PARTITION BY define o grupo (todos os registros com o mesmo título)
            PARTITION BY title_clear
            -- ORDER BY define a ordem dentro do grupo (pelo tamanho da descrição, da maior para a menor)
            ORDER BY LENGTH(content_clear) DESC
        ) as rank
    FROM
        produtos
)
-- Finalmente, selecionamos apenas as colunas que queremos da CTE...
SELECT
    title_clear,
    content_clear
FROM
    RankedDescriptions
-- ...onde o rank é 1 (ou seja, a descrição mais longa de cada grupo)
WHERE
    rank = 1
"""

# Passo 3: Executar a query SQL e armazenar o resultado em um novo DataFrame
df_final_sql = spark.sql(query_sql)



In [None]:
df_final_sql.show(1)

+--------------------+--------------------+
|         title_clear|       content_clear|
+--------------------+--------------------+
|(1 Bottle) Dexc20...|Slimaluma is an e...|
+--------------------+--------------------+
only showing top 1 row



In [None]:
# Salvar o DataFrame df_filtered em formato Parquet no Google Drive
output_path_parquet = '/content/drive/MyDrive/Fiap/database_limpo'

# Spark pode escrever DataFrames em vários formatos, incluindo Parquet.
# Usaremos o modo 'overwrite' para substituir o arquivo/diretório se já existir.
try:
    df_final_sql.write.mode('overwrite').parquet(output_path_parquet)
    print(f"DataFrame df_sem_boilerplate_en salvo com sucesso em '{output_path_parquet}' em formato Parquet.")

except Exception as e:
    print(f"Ocorreu um erro ao salvar o DataFrame em formato Parquet: {e}")

DataFrame df_sem_boilerplate_en salvo com sucesso em '/content/drive/MyDrive/Fiap/database_limpo' em formato Parquet.
