In [0]:
import pandas as pd
from io import BytesIO
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
import requests

In [0]:
dbutils.widgets.text("token", "", "token")
token = dbutils.widgets.get("token")

In [0]:
# Spark session
spark = SparkSession.builder.getOrCreate()

In [0]:
# URL do CSV
url_raw = "https://raw.githubusercontent.com/RafaelaSantos92/projeto-final/2d91c9a99aae72fa73f779d77e2f49fbf6a85c5d/raw-data/Populacao%20-%20Censo%202022.csv"

# Token
headers = {'Authorization': f'token {token}'} if token else None

# request
response = requests.get(url_raw, headers=headers)
response.raise_for_status()

# Carrega CSV em pandas
pdf = pd.read_csv(BytesIO(response.content))

# Normaliza os nomes das colunas para Delta
pdf.columns = [
    c.strip()
     .lower()
     .replace(" ", "_")
     .replace("-", "_")
     .replace(",", "")
     .replace("(", "")
     .replace(")", "")
     .replace(".", "")
     .encode('ascii', errors='ignore').decode()
    for c in pdf.columns
]

# Converte para Spark DataFrame
df = spark.createDataFrame(pdf)

# Adiciona colunas da camada Bronze
df = df.withColumn("source_file", lit(url_raw)) \
       .withColumn("ingestion_time", current_timestamp())

# Mostra 5 linhas para conferir
df.show(5)

# Salva como tabela gerenciada no schema Bronze
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("projeto_final_bronze.bronze_ibge_censo_2022")


In [0]:
import requests
from io import BytesIO
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
import re

url = "https://api.github.com/repos/RafaelaSantos92/projeto-final/contents/raw-data/panorama-da-eja-no-brasil.xlsx?ref=main"

headers = {
    "Authorization": f"token {token}",
    "Accept": "application/vnd.github.v3.raw"
}

response = requests.get(url, headers=headers)
response.raise_for_status()

df = pd.read_excel(BytesIO(response.content))

# Remove colunas vazias e colunas "Unnamed"
df = df.dropna(axis=1, how="all")
df = df.loc[:, ~df.columns.str.contains('^Unnamed', case=False)]

# NORMALIZAÇÃO SEGURA PARA DELTA
def normalize_column_name(col):
    # Remove acentos e caracteres não ASCII
    col = col.encode('ascii', errors='ignore').decode()
    # Substitui qualquer caractere que não seja letra, número ou underline por underline
    col = re.sub(r'[^0-9a-zA-Z_]', '_', col)
    # Remove múltiplos underlines seguidos
    col = re.sub(r'_+', '_', col)
    # Remove underline no começo ou no final
    col = col.strip('_')
    # Força lowercase
    return col.lower()

df.columns = [normalize_column_name(c) for c in df.columns]

# Converte todas as colunas para string
df = df.astype(str)

spark = SparkSession.builder.getOrCreate()
df_spark = spark.createDataFrame(df)

df_spark = df_spark.withColumn("source_file", lit("panorama-da-eja-no-brasil.xlsx")) \
                   .withColumn("ingestion_time", current_timestamp())

# Mostra algumas linhas
df_spark.show(5)

# Salva como tabela Delta no schema Bronze
df_spark.write.format("delta") \
       .mode("overwrite") \
       .saveAsTable("projeto_final_bronze.bronze_panorama_eja")