In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.utils import AnalysisException
import os
%run analysis

spark = SparkSession.builder.appName("CodeElevate").getOrCreate() #Não precisamos dos JARs da S3 aqui


spark.conf.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark.conf.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)

In [None]:
def process_data() -> DataFrame:
    """
    Processa o arquivo de log que já está setado na função. 
    Utiliza regex para fazer o parsing dos dados dentro do arquivo de log.
    Também formata a coluna 'size' para int, que será usado para as próximas questões.
    Além disso, salva o dataframe em...{a definir}

    Args:
        None

    Returns:
        df (pyspark.sql.DataFrame): DataFrame com os dados processados.

    Raises:
        ValueError: Se o arquivo de log estiver vazio.
        FileNotFoundError: Se o arquivo de log não for encontrado.
        AnalysisException: Se houver um erro no processamento Spark.
    """
    try:
        raw_df = spark.read.text('resources/access_log.txt')
        if raw_df.count() == 0:
            raise ValueError("O arquivo de log está vazio")

        log_pattern = r'^(\S+).*\[(.*?)\].*"(\w+)\s+([^\s]+)[^"]*"\s+(\d+)\s+(\S+)'

        df = raw_df.select(
             F.regexp_extract('value', log_pattern, 1).alias('ip'),
             F.regexp_extract('value', log_pattern, 2).alias('timestamp'),
             F.regexp_extract('value', log_pattern, 3).alias('method'),
             F.regexp_extract('value', log_pattern, 4).alias('path'),
             F.regexp_extract('value', log_pattern, 5).alias('status'),
             F.regexp_extract('value', log_pattern, 6).alias('size'))

    except FileNotFoundError as e:
        print(f"Erro ao acessar arquivo: {str(e)}")
        return None
    except AnalysisException as e:
        print(f"Erro no processamento Spark: {str(e)}")
        return None

    #Trocando valores "-" por 0    
    df = df.withColumn('size', F.when(F.col('size') == '-', '0').otherwise(F.col('size')).cast("integer"))
    #Cria coluna de dat_mes_carga para particionamento. Para este case, irei simular que o 'timestamp' será o dat_ref_carga.
    df = df.withColumn('dat_mes_carga', F.date_format(F.to_timestamp(F.col('timestamp'), 'dd/MMM/yyyy:HH:mm:ss Z'),'MM/yyyy'))
    
    df.cache()

    return df


In [None]:
def save_data(df: DataFrame) -> None:
    """
    Salva o dataframe no OpenSearch.
    Args:
        df (pyspark.sql.DataFrame): DataFrame com os dados processados.
    Returns:
        None
    Raises:
        Exception: Ao falhar o save de dados
    """
    try:
        df.write\
            .format("delta") \
            .mode("overwrite") \
            .save("/mnt/delta/logs_delta") #Considerei usar saveAsTable mas é melhor deixar mais genérico

        print("Salvo com sucesso")
    except Exception as e:
        print(f"Erro ao salvar: {str(e)}")
        raise



In [None]:
def main():

    #Processing & Saving
    df = process_data()
    #Saving
    save_data(df)
    #Analysis
    analyze_data(df)

In [None]:
main()