<a href="https://colab.research.google.com/github/AlexandreRra/ALEXANDRE_AMORIM_DDF_TECH_022025/blob/main/dadosfera_case.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [37]:
!pip install transformers datasets torch
from google.colab import userdata
from IPython.display import display
from sklearn.preprocessing import LabelEncoder

import pandas as pd
import numpy as np
import datetime
import sqlite3
import re
import gc
import os
import zipfile
import json




#Settings


In [38]:
data_lake_root = '/content/data_lake'
landing_zone = f'{data_lake_root}/landing_zone'

os.makedirs(landing_zone, exist_ok=True)

def log_operation(message):
    print(f"[{datetime.datetime.now()}] {message}")

database_root = f'{data_lake_root}/sqlite'
os.makedirs(database_root, exist_ok=True)

# Data Ingestion

In [39]:
os.environ["KAGGLE_KEY"] = userdata.get('KAGGLE_KEY')
os.environ["KAGGLE_USERNAME"] = userdata.get('KAGGLE_USERNAME')

!kaggle datasets download -d piyushjain16/amazon-product-data

file_path = '/content/dataset/train.csv'

if not os.path.exists(file_path):
    !unzip -o "amazon-product-data.zip"
else:
    print(f"File {file_path} already exists. Skipping unzip.")

dataset_csv_path = 'dataset/train.csv'

Dataset URL: https://www.kaggle.com/datasets/piyushjain16/amazon-product-data
License(s): Attribution-NonCommercial 4.0 International (CC BY-NC 4.0)
amazon-product-data.zip: Skipping, found more recently modified local copy (use --force to force download)
File /content/dataset/train.csv already exists. Skipping unzip.


In [40]:
# df = pd.read_csv('dataset/train.csv')

# # Visualizar as primeiras linhas para confirmar o carregamento
# print("Primeiras 5 linhas do dataset:")
# display(df.head())

# ## 3. Análise Descritiva Básica
# # Informações gerais do dataset (colunas, tipos, valores nulos)
# print("\nInformações do Dataset:")
# display(df.info())

# # Estatísticas descritivas para colunas numéricas
# print("\nEstatísticas Descritivas:")
# display(df.describe())

# # Contagem de valores únicos por coluna
# print("\nValores Únicos por Coluna:")
# display(df.nunique())

# # Verificação de valores nulos
# print("\nValores Nulos por Coluna:")
# display(df.isnull().sum())

In [None]:


def get_db_connection():
    return sqlite3.connect(f'{data_lake_root}/sqlite/db')

conn = get_db_connection()
cursor = conn.cursor()

cursor.execute('DROP TABLE IF EXISTS products')
cursor.execute('''
    CREATE TABLE products (
        PRODUCT_ID INTEGER PRIMARY KEY,
        TITLE TEXT,
        BULLET_POINTS TEXT,
        DESCRIPTION TEXT,
        PRODUCT_TYPE_ID INTEGER,
        PRODUCT_LENGTH REAL
    )
''')
conn.commit()

df = pd.read_csv(dataset_csv_path)

df['PRODUCT_ID'] = df['PRODUCT_ID'].astype('int32')
df['PRODUCT_TYPE_ID'] = df['PRODUCT_TYPE_ID'].astype('int32')
df['PRODUCT_LENGTH'] = df['PRODUCT_LENGTH'].astype('float32')
df['TITLE'] = df['TITLE'].astype('category')
df['BULLET_POINTS'] = df['BULLET_POINTS'].astype('category')
df['DESCRIPTION'] = df['DESCRIPTION'].astype('category')

df.to_sql('products', conn, if_exists='replace', index=False)

# Preencher DESCRIPTION nula com BULLET_POINTS (filled_description)
query = '''
SELECT
    PRODUCT_ID,
    TITLE,
    BULLET_POINTS,
    DESCRIPTION,
    PRODUCT_TYPE_ID,
    PRODUCT_LENGTH,
    COALESCE(DESCRIPTION, BULLET_POINTS) AS filled_description
FROM products
'''
df_transformed = pd.read_sql_query(query, conn)

print("\nPrimeiros 5 registros após transformações:")
print(df_transformed.head())

conn.close()

del df, df_transformed
gc.collect()

# Catálogos

## Landing Zone

In [42]:
log_operation("Iniciando carga do dataset de produtos Amazon")

try:
    # Ler o arquivo com opções para lidar com possíveis problemas de encoding e delimitadores
    df = pd.read_csv(dataset_csv_path, encoding='utf-8', low_memory=False)

    # Verificar informações básicas
    log_operation(f"Dataset carregado com sucesso. Dimensões: {df.shape}")

    # Mostrar informações do dataset
    print("\nInformações do Dataset:")
    print(f"Número de registros: {df.shape[0]}")
    print(f"Número de colunas: {df.shape[1]}")
    print("\nColunas presentes no dataset:")
    print(df.columns.tolist())

    # Verificar valores nulos
    print("\nContagem de valores nulos por coluna:")
    print(df.isnull().sum())

    # Mostrar primeiras linhas para inspeção
    print("\nPrimeiras 5 linhas do dataset:")
    print(df.head())

    # Salvar o arquivo original na landing zone com timestamp
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    landing_file = f"{landing_zone}/amazon_products_raw_{timestamp}.parquet"

    # Salvar em formato parquet para melhor compressão e performance
    df.to_parquet(landing_file, index=False)
    log_operation(f"Dataset salvo na Landing Zone: {landing_file}")

    # Registrar metadados básicos
    metadata = {
        'source_file': file_path,
        'landing_file': landing_file,
        'rows': df.shape[0],
        'columns': df.shape[1],
        'column_names': df.columns.tolist(),
        'timestamp': timestamp,
        'null_counts': df.isnull().sum().to_dict()
    }

    # Salvar metadados
    with open(f"{landing_zone}/metadata_{timestamp}.json", 'w') as f:
        json.dump(metadata, f, indent=2)

    log_operation("Processo de carga na Landing Zone concluído com sucesso")

except Exception as e:
    log_operation(f"Erro durante o carregamento do dataset: {str(e)}")

[2025-02-27 15:59:42.834233] Iniciando carga do dataset de produtos Amazon
[2025-02-27 15:59:59.915907] Dataset carregado com sucesso. Dimensões: (2249698, 6)

Informações do Dataset:
Número de registros: 2249698
Número de colunas: 6

Colunas presentes no dataset:
['PRODUCT_ID', 'TITLE', 'BULLET_POINTS', 'DESCRIPTION', 'PRODUCT_TYPE_ID', 'PRODUCT_LENGTH']

Contagem de valores nulos por coluna:
PRODUCT_ID               0
TITLE                   13
BULLET_POINTS       837366
DESCRIPTION        1157382
PRODUCT_TYPE_ID          0
PRODUCT_LENGTH           0
dtype: int64

Primeiras 5 linhas do dataset:
   PRODUCT_ID                                              TITLE  \
0     1925202  ArtzFolio Tulip Flowers Blackout Curtain for D...   
1     2673191  Marks & Spencer Girls' Pyjama Sets T86_2561C_N...   
2     2765088  PRIKNIK Horn Red Electric Air Horn Compressor ...   
3     1594019  ALISHAH Women's Cotton Ankle Length Leggings C...   
4      283658  The United Empire Loyalists: A Chronicle 

## Landing Zone

In [43]:
# Notebook 3: Standardized Zone - Padronização e Limpeza dos Dados
# Objetivo: Limpar, padronizar e normalizar os dados para análise

# Montar o Google Drive
# drive.mount('/content/drive')

# # Definir diretórios de trabalho no Data Lake
# data_lake_root = '/content/drive/MyDrive/data_lake_amazon'
# masked_zone = f'{data_lake_root}/masked_zone'
# standardized_zone = f'{data_lake_root}/standardized_zone'

# # Criar diretórios se não existirem
# os.makedirs(standardized_zone, exist_ok=True)

# # Função para registrar logs
# def log_operation(message):
#     print(f"[{datetime.datetime.now()}] {message}")

# # Buscar o arquivo mais recente na masked zone
# log_operation("Buscando o arquivo mais recente na Masked Zone")

# masked_files = [f for f in os.listdir(masked_zone) if f.endswith('.parquet')]
# latest_file = sorted(masked_files)[-1]  # Pega o arquivo mais recente
# masked_file_path = f"{masked_zone}/{latest_file}"

def log_operation(message):
    print(f"[{datetime.datetime.now()}] {message}")

# Carregar o dataset
df = pd.read_csv('dataset/train.csv')
log_operation(f"Dataset carregado com sucesso. Dimensões: {df.shape}")

# 1. Padronização dos nomes das colunas
log_operation("Padronizando nomes das colunas")
df.columns = [col.lower().replace(' ', '_') for col in df.columns]

# 2. Limpeza e padronização dos dados
log_operation("Iniciando limpeza e padronização dos dados")

# Função para remover caracteres especiais e HTML
def clean_text(text):
    if pd.isna(text):
        return text

    # Converter para string
    text = str(text)

    # Remover tags HTML
    text = re.sub(r'<.*?>', '', text)

    # Remover caracteres especiais e manter apenas alfanuméricos, espaços e pontuação básica
    text = re.sub(r'[^\w\s.,;:!?-]', ' ', text)

    # Remover múltiplos espaços
    text = re.sub(r'\s+', ' ', text)

    return text.strip()

# Aplicar limpeza às colunas de texto
text_columns = df.select_dtypes(include=['object']).columns.tolist()
for col in text_columns:
    log_operation(f"Limpando coluna de texto: {col}")
    df[col] = df[col].apply(clean_text)

# 3. Tratamento de valores nulos
log_operation("Tratando valores nulos")

# Para colunas de texto, substituir NaN por string vazia
for col in text_columns:
    df[col] = df[col].fillna('')

# Para colunas numéricas, substituir NaN por 0 ou média, dependendo do contexto
numeric_columns = df.select_dtypes(include=['number']).columns.tolist()

for col in numeric_columns:
    df[col] = df[col].fillna(0)

# 4. Extração de features da descrição e bullet points
log_operation("Extraindo features de texto")

# Verificar se existe coluna de descrição
if 'description' in df.columns:
    # Criar feature de comprimento da descrição
    df['description_length'] = df['description'].apply(lambda x: len(str(x)))

    # Criar feature de contagem de palavras-chave
    keywords = ['quality', 'premium', 'best', 'new', 'improved']
    df['keyword_count'] = df['description'].apply(
        lambda x: sum(1 for keyword in keywords if keyword.lower() in str(x).lower())
    )

# Verificar se existe coluna de bullet points
if 'bullet_points' in df.columns:
    # Contar número de bullet points
    df['bullet_count'] = df['bullet_points'].apply(
        lambda x: len(str(x).split('\n')) if pd.notna(x) else 0
    )

# 6. Codificação de variáveis categóricas
log_operation("Codificando variáveis categóricas")

# Identificar colunas categóricas (com poucos valores únicos)
categorical_columns = []
for col in text_columns:
    if df[col].nunique() < 50:  # Limite arbitrário, ajustar conforme necessário
        categorical_columns.append(col)

# Codificar variáveis categóricas
label_encoders = {}
for col in categorical_columns:
    le = LabelEncoder()
    not_null = df[col].notna()
    if not_null.any():
        df.loc[not_null, f'{col}_encoded'] = le.fit_transform(df.loc[not_null, col])
        label_encoders[col] = {label: idx for idx, label in enumerate(le.classes_)}
    else:
        df[f'{col}_encoded'] = np.nan

# 7. Salvar o dataset padronizado
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
standardized_file = f"{standardized_zone}/amazon_products_standardized_{timestamp}.parquet"
df.to_parquet(standardized_file, index=False)

log_operation(f"Dataset padronizado salvo em: {standardized_file}")

# Criação do dicionário de dados
log_operation("Gerando dicionário de dados")

# Estrutura básica para o dicionário de dados
data_dict = []
for col in df.columns:
    col_info = {
        'column_name': col,
        'data_type': str(df[col].dtype),
        'description': '',  # A ser preenchido manualmente
        'sample_values': str(df[col].head(3).tolist()),
        'null_count': int(df[col].isna().sum()),
        'unique_values': int(df[col].nunique()),
    }

    # Adicionar informações específicas para colunas importantes
    if 'price' in col:
        col_info['description'] = 'Preço do produto em unidade monetária (provavelmente USD)'
    elif 'description' == col:
        col_info['description'] = 'Descrição completa do produto'
    elif 'name' in col:
        col_info['description'] = 'Nome do produto'
    elif 'dimension' in col:
        col_info['description'] = 'Dimensões físicas do produto'
    elif 'bullet_points' in col:
        col_info['description'] = 'Características principais do produto em formato de lista'

    data_dict.append(col_info)

# Salvar dicionário de dados
import json
with open(f"{standardized_zone}/data_dictionary_{timestamp}.json", 'w') as f:
    json.dump(data_dict, f, indent=2)

log_operation("Processo de padronização concluído com sucesso")

# Exibir informações sobre o dataset padronizado
print("\nInformações do Dataset Padronizado:")
print(f"Número de registros: {df.shape[0]}")
print(f"Número de colunas: {df.shape[1]}")
print("\nNovas colunas criadas:")
original_cols = set(pd.read_parquet(masked_file_path).columns)
new_cols = set(df.columns) - original_cols
print(list(new_cols))

[2025-02-27 16:00:56.096281] Dataset carregado com sucesso. Dimensões: (2249698, 6)
[2025-02-27 16:00:56.096492] Padronizando nomes das colunas
[2025-02-27 16:00:56.096895] Iniciando limpeza e padronização dos dados
[2025-02-27 16:00:56.235060] Limpando coluna de texto: title
[2025-02-27 16:01:12.380797] Limpando coluna de texto: bullet_points
[2025-02-27 16:01:54.106415] Limpando coluna de texto: description
[2025-02-27 16:02:34.256469] Tratando valores nulos
[2025-02-27 16:02:35.385383] Extraindo features de texto
[2025-02-27 16:02:45.419108] Codificando variáveis categóricas


NameError: name 'standardized_zone' is not defined

## Standardized Zone

In [None]:
df_standardized = pd.read_csv('dataset/train.csv')

# Handle missing values by reassigning to the columns
df_standardized['TITLE'] = df_standardized['TITLE'].fillna('not provided')
df_standardized['BULLET_POINTS'] = df_standardized['BULLET_POINTS'].fillna('not provided')
df_standardized['DESCRIPTION'] = df_standardized['DESCRIPTION'].fillna('not provided')

# Basic text cleaning for Description and Bullet Points (remove special characters)
def clean_text(text):
    if isinstance(text, str):
        text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
        return text.lower().strip()
    return ''

# Apply text cleaning function
df_standardized['DESCRIPTION'] = df_standardized['DESCRIPTION'].apply(clean_text)
df_standardized['BULLET_POINTS'] = df_standardized['BULLET_POINTS'].apply(clean_text)

# Display cleaned data
print("Standardized Data:")
display(df_standardized.head())