## üì§Extra√ß√£o de Dados Vendas Celulares - Camada PRATAü•à
- Carregar a base de dados em um DataFrame
- Realizar os tratamentos, normaliza√ß√µes e convers√£o de dados
- Salvar o DataFrame tratado na Camada PRATA (Silver)


### üìöIMPORTs Bibliotecas

In [None]:
import pandas as pd
import re
from decouple import Config, RepositoryEnv
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from datetime import datetime
from pyspark.sql import DataFrame
from sqlalchemy import create_engine

### ‚öôÔ∏èCarregando Variaveis de Ambiente

In [None]:
# Config
env_path = ".env"
config = Config(repository=RepositoryEnv(env_path))

senha_banco_de_dados = config('DATA_BASE_PASSWORD')
caminho_excel = config('CAMINHO_EXCEL')
caminho_env = config('CAMINHO_ENV')
#print(senha)

### üìëCarregando [ EXCEL ]

In [None]:
# Caminho completo para o arquivo
caminho_arquivo = caminho_excel

# Carrega o arquivo Excel em um DataFrame
df_excel = pd.read_excel(caminho_arquivo)

# Mostra o DF
display(df_excel)


### üõ†Ô∏èNormalizando coluna de `Valor` --> PANDAS

In [None]:
#---> Convertando dados com PANDAS

# Fun√ß√£o para normalizar os valores
def normalizar_valor(valor):
    if pd.isna(valor):
        return None
    
    # Converte para string
    valor_str = str(valor)
    
    # Remove s√≠mbolo de moeda e espa√ßos
    valor_str = re.sub(r'[^\d,.-]', '', valor_str)
    
    # Substitui v√≠rgula por ponto, se for o caso
    if ',' in valor_str and '.' not in valor_str:
        valor_str = valor_str.replace(',', '.')
    elif ',' in valor_str and '.' in valor_str:
        # Caso raro tipo 1.234,56 ‚Üí 1234.56
        valor_str = valor_str.replace('.', '').replace(',', '.')
    
    try:
        # Converte para FLOAT
        return float(valor_str)
    except ValueError:
        return None

# Aplica ao DataFrame
df_excel['valor_normalizado'] = df_excel['Valor'].apply(normalizar_valor)

# df final => EXCEL
display(df_excel)


### üîÉConvertendo DF de PANDAS para Spark

In [None]:
# üîß For√ßa o Spark a usar o Python do ambiente virtual
os.environ["PYSPARK_PYTHON"] = caminho_env
os.environ["PYSPARK_DRIVER_PYTHON"] = caminho_env
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

# üî• Cria a sess√£o Spark
spark = (
    SparkSession.builder
    .appName("ExcelToSpark")
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    .getOrCreate()
)

df_spark = spark.createDataFrame(df_excel)
df_spark.show(10, truncate=False)


### üîÉConvertendo Coluna de `Data da Venda`

In [None]:


# fun√ß√£o Python para converter padr√£o de data
def normalizar_data(valor):
    if not valor:
        return None
    valor = str(valor).strip().replace("/", "-")

    formatos = ["%d-%m-%Y", "%Y-%m-%d", "%m-%d-%Y"]
    for f in formatos:
        try:
            return datetime.strptime(valor, f).strftime("%Y-%m-%d")
        except ValueError:
            continue
    return None  # se nenhum formato bater

# registra a UDF
normalizar_data_udf = F.udf(normalizar_data, StringType())

# aplica no DataFrame Spark
df_spark = df_spark.withColumn(
    "data_venda_normalizada",
    normalizar_data_udf(F.col("Data da Venda"))
)

# converte para tipo date
df_spark = df_spark.withColumn(
    "data_venda_normalizada",
    F.to_date("data_venda_normalizada", "yyyy-MM-dd")
)

# mostra o resultado final
df_spark.select("Data da Venda", "data_venda_normalizada").show(20, truncate=False)


### üõ†Ô∏èNormalizando dados da coluna Nome

In [None]:
def normalizar_nome(nome: str) -> str:
    if not nome:
        return None
    
    nome = nome.strip().lower()

    # Corrige casos comuns de nomes juntos
    nome = re.sub(r'(\d+)', r' \1', nome)  # separa n√∫meros (ex: iphone15pro ‚Üí iphone 15pro)
    nome = re.sub(r'([a-z])([A-Z])', r'\1 \2', nome)  # separa letras coladas
    
    # Aplica capitaliza√ß√£o inicial
    nome_formatado = " ".join(word.capitalize() for word in nome.split())

    # Mant√©m siglas espec√≠ficas em mai√∫sculo
    siglas = {"VI", "GT", "PRO", "ULTRA", "ROG"}
    palavras = []
    for word in nome_formatado.split():
        if word.upper() in siglas:
            palavras.append(word.upper())
        else:
            palavras.append(word)
    
    # Corrige "Iphone" para "iPhone"
    resultado = " ".join(palavras)
    resultado = re.sub(r'\bIphone\b', 'iPhone', resultado)

    return resultado.strip()

# Registrar como fun√ß√£o UDF no Spark
udf_normalizar_nome = F.udf(normalizar_nome, StringType())

# Aplicar a fun√ß√£o √† coluna "Nome"
df_spark = df_spark.withColumn("Nome_Normalizado", udf_normalizar_nome(F.col("Nome")))

# Exibir resultado
df_spark.select("Nome", "Nome_Normalizado").show(20, truncate=False)


In [None]:
df_spark.show(10, truncate=False)

### üßπTirando colunas Indesejadas

In [None]:
def remover_colunas(df: DataFrame, colunas_para_remover: list) -> DataFrame:
    colunas_existentes = [col for col in colunas_para_remover if col in df.columns]

    if not colunas_existentes:
        print("‚ö†Ô∏è Nenhuma das colunas informadas existe no DataFrame.")
        return df

    print(f"üßπ Removendo colunas: {colunas_existentes}")
    return df.drop(*colunas_existentes)

# Lista de colunas para excluir
colunas_excluir = [
    "Nome", 
    "Valor", 
    "Data da Venda"
]

df_spark = remover_colunas(df_spark, colunas_excluir)

df_spark.show(10, truncate=False)

### üîÑRenomeando nome das colunas

In [None]:
def renomear_colunas(df: DataFrame, mapeamento: dict) -> DataFrame:
    df_renomeado = df
    colunas_existentes = df.columns

    for antiga, nova in mapeamento.items():
        if antiga in colunas_existentes:
            df_renomeado = df_renomeado.withColumnRenamed(antiga, nova)
            print(f"üîÅ Coluna renomeada: '{antiga}' ‚Üí '{nova}'")
        else:
            print(f"‚ö†Ô∏è Coluna '{antiga}' n√£o encontrada ‚Äî ignorada.")

    return df_renomeado

# Chamando fun√ß√£o para Renomear
mapeamento_colunas = {
    "Quantidade Vendida": "quantity_sold",
    "valor_normalizado": "price",
    "data_venda_normalizada": "sale_date",
    "Nome_Normalizado": "name"
}

df_spark = renomear_colunas(df_spark, mapeamento_colunas)

df_spark.show(10, truncate=False)

### üìåAplicando nova ordem das colunas

In [None]:
def reordenar_colunas(df: DataFrame, nova_ordem: list) -> DataFrame:
    colunas_existentes = df.columns

    # Garante que s√≥ use colunas que realmente existem no df
    colunas_validas = [c for c in nova_ordem if c in colunas_existentes]

    # Adiciona ao final as colunas que n√£o estavam na lista (para n√£o perder nenhuma)
    colunas_restantes = [c for c in colunas_existentes if c not in colunas_validas]

    ordem_final = colunas_validas + colunas_restantes

    print("üìã Nova ordem de colunas:")
    print(ordem_final)

    return df.select(ordem_final)

nova_ordem = [
    "name",
    "price",
    "quantity_sold",
    "sale_date",
]

df_spark = reordenar_colunas(df_spark, nova_ordem)

df_spark.show(10, truncate=False)

### üíæSalvando DF na camada [ PRATA ] --> Silver

In [None]:
# Para n√£o sobrecarregar o SPARK
df_sample = df_spark.limit(1000).toPandas()

In [None]:
df_pandas = df_sample

# cria a engine de conex√£o com o PostgreSQL
engine = create_engine(
    'postgresql+psycopg2://kevinsoffa:'+senha_banco_de_dados+'@localhost:5432/sales_smart_phones_silver'
)

# salva o DataFrame no PostgreSQL
df_pandas.to_sql(
    'sales_smart_phones_silver',  # nome da tabela
    engine,
    if_exists='replace',           # substitui se j√° existir
    index=False                     # n√£o salva o √≠ndice
)
