<a href="https://colab.research.google.com/github/JessicaArauj/E2E_Automation_with_N8N_AWS_Python_and_PySpark/blob/main/transformation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install

In [None]:
!pip install pyspark==3.5.0 boto3
!pip install boto3

# Imports

In [None]:
import os
import io
import zipfile
import boto3
import glob

from pyspark.sql import SparkSession
from pyspark.sql.functions import array, struct, explode, col, lit, split, udf
from pyspark.sql.types import IntegerType, DoubleType, StringType, LongType

from google.colab import userdata

# Variables

In [None]:
AWS_ACCESS_KEY_ID = userdata.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = userdata.get('AWS_SECRET_ACCESS_KEY')
AWS_REGION = userdata.get('AWS_REGION')
BUCKET_NAME = userdata.get('BUCKET_NAME')

# Data processing, transformation, and loading pipeline using PySpark and AWS S3

In [None]:
s3 = boto3.client(
    "s3",
    region_name=AWS_REGION,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)

spark = SparkSession.builder.appName("S3DataTransformation").getOrCreate()

months_map = {
    "1": "Janeiro",
    "2": "Fevereiro",
    "3": "Março",
    "4": "Abril",
    "5": "Maio",
    "6": "Junho",
    "7": "Julho",
    "8": "Agosto",
    "9": "Setembro",
    "10": "Outubro",
    "11": "Novembro",
    "12": "Dezembro",
}
month_udf = udf(lambda m: months_map.get(str(int(m)), m) if m else None, StringType())

uf_to_region = {
    "AC": "Norte", "AP": "Norte", "AM": "Norte", "PA": "Norte",
    "RO": "Norte", "RR": "Norte", "TO": "Norte",
    "MA": "Nordeste", "PI": "Nordeste", "CE": "Nordeste", "RN": "Nordeste",
    "PB": "Nordeste", "PE": "Nordeste", "AL": "Nordeste", "SE": "Nordeste",
    "BA": "Nordeste",
    "MG": "Sudeste", "SP": "Sudeste", "RJ": "Sudeste", "ES": "Sudeste",
    "PR": "Sul", "RS": "Sul", "SC": "Sul",
    "MT": "Centro-Oeste", "MS": "Centro-Oeste", "GO": "Centro-Oeste", "DF": "Centro-Oeste",
}


def uf_to_region_func(uf):
    if uf == "Brasil":
        return "Brasil"
    return uf_to_region.get(uf, "Unknown")


uf_to_region_udf = udf(uf_to_region_func, StringType())


def process_normal_csv(df):
    if "Ano" in df.columns:
        df = df.withColumn("Ano", col("Ano").cast(StringType()))
    if "Mês" in df.columns:
        df = df.withColumn("Mês", month_udf(col("Mês")))

    string_cols = [
        "Grupo Econômico", "Empresa", "CNPJ", "Porte da Prestadora",
        "UF", "Município", "Código IBGE Município", "Faixa de Velocidade",
        "Tecnologia", "Meio de Acesso",
    ]
    for c in string_cols:
        if c in df.columns:
            df = df.withColumn(c, col(c).cast(StringType()))
    if "Acessos" in df.columns:
        df = df.withColumn("Acessos", col("Acessos").cast(DoubleType()))
    if "UF" in df.columns:
        df = df.withColumn("Região", uf_to_region_udf(col("UF")))
    return df


def process_colunas_csv(df):
    value_cols = [
        c for c in df.columns if c not in [
            "CNPJ", "Município", "UF", "Faixa de Velocidade",
            "Tecnologia", "Empresa", "Porte da Prestadora",
            "Código IBGE Município", "Grupo Econômico", "Meio de Acesso",
        ]
    ]
    exploded = df.select(
        "*",
        explode(
            array(
                *[
                    struct(lit(c).alias("AnoMes"), col(c).alias("Acessos"))
                    for c in value_cols
                ]
            )
        ).alias("tmp")
    ).select(
        col("CNPJ"), col("Município"), col("UF"), col("Faixa de Velocidade"),
        col("Tecnologia"), col("Empresa"), col("Porte da Prestadora"),
        col("Código IBGE Município"), col("Grupo Econômico"), col("Meio de Acesso"),
        col("tmp.AnoMes"), col("tmp.Acessos"),
    )

    exploded = exploded.withColumn("Ano", split(col("AnoMes"), "-").getItem(0).cast(StringType()))
    exploded = exploded.withColumn("Mês", month_udf(split(col("AnoMes"), "-").getItem(1)))
    exploded = exploded.drop("AnoMes")
    exploded = exploded.withColumn("Acessos", col("Acessos").cast(DoubleType()))
    exploded = exploded.withColumn("Região", uf_to_region_udf(col("UF")))
    return exploded


def process_densidade_csv(df):
    string_cols = [
        "Ano", "Mês", "UF", "Município", "Código IBGE", "Nível Geográfico Densidade",
    ]
    for c in string_cols:
        if c in df.columns:
            df = df.withColumn(c, col(c).cast(StringType()))
    if "Mês" in df.columns:
        df = df.withColumn("Mês", month_udf(col("Mês")))
    if "UF" in df.columns:
        df = df.withColumn("Região", uf_to_region_udf(col("UF")))
    if "Densidade" in df.columns:
        df = df.withColumn("Densidade", col("Densidade").cast(DoubleType()))
    return df


def process_total_csv(df):
    if "Ano" in df.columns:
        df = df.withColumn("Ano", col("Ano").cast(StringType()))
    if "Mês" in df.columns:
        df = df.withColumn("Mês", month_udf(col("Mês")))
    if "Acessos" in df.columns:
        df = df.withColumn("Acessos", col("Acessos").cast(LongType()))
    return df


if "Contents" not in response:
    print("No files found in S3")
else:
    zip_files = [obj["Key"] for obj in response["Contents"] if obj["Key"].endswith(".zip")]
    print("Found ZIP files in S3:", zip_files)

    for zip_key in zip_files:
        print(f"\nProcessing {zip_key}...")
        local_zip = f"/tmp/{os.path.basename(zip_key)}"
        s3.download_file(BUCKET_NAME, zip_key, local_zip)

        with zipfile.ZipFile(local_zip, "r") as z:
            for file_name in z.namelist():
                print(f" - Extracting {file_name}")
                local_file = f"/tmp/{os.path.basename(file_name)}"
                with z.open(file_name) as f:
                    with open(local_file, "wb") as out_f:
                        out_f.write(f.read())

                if file_name.lower().endswith(".csv"):
                    df = spark.read.csv(local_file, header=True, sep=";", inferSchema=True)

                    if file_name.lower().endswith("_colunas.csv"):
                        df = process_colunas_csv(df)
                    elif "densidade" in file_name.lower():
                        df = process_densidade_csv(df)
                    elif "total" in file_name.lower():
                        df = process_total_csv(df)
                    else:
                        df = process_normal_csv(df)

                    csv_local = f"/tmp/{os.path.splitext(os.path.basename(file_name))[0]}"
                    df.coalesce(1).write.mode("overwrite").option("header", True).option("encoding", "UTF-8").csv(csv_local)

                    csv_file = glob.glob(f"{csv_local}/*.csv")[0]
                    with open(csv_file, 'r', encoding='utf-8') as f:
                        content = f.read()
                    with open(csv_file, 'w', encoding='utf-8-sig') as f:
                        f.write(content)

                    s3_key = f"processed_data/{os.path.splitext(os.path.basename(file_name))[0]}.csv"
                    s3.upload_file(csv_file, BUCKET_NAME, s3_key)
                    print(f"Processed file saved in s3://{BUCKET_NAME}/{s3_key}")
                else:
                    s3_key = f"processed_data/{os.path.basename(file_name)}"
                    s3.upload_file(local_file, BUCKET_NAME, s3_key)
                    print(f"File uploaded without processing: s3://{BUCKET_NAME}/{s3_key}")