In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import functions as F
from datetime import datetime


args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


movies_schema = StructType([
    StructField("id", StringType(), True),
    StructField("tituloPincipal", StringType(), True),
    StructField("tituloOriginal", StringType(), True),
    StructField("anoLancamento", IntegerType(), True),
    StructField("tempoMinutos", IntegerType(), True),
    StructField("genero", StringType(), True),
    StructField("notaMedia", FloatType(), True),
    StructField("numeroVotos", IntegerType(), True),
    StructField("generoArtista", StringType(), True),
    StructField("personagem", StringType(), True),
    StructField("nomeArtista", StringType(), True),
    StructField("anoNascimento", IntegerType(), True),
    StructField("anoFalecimento", IntegerType(), True),
    StructField("profissao", StringType(), True),
    StructField("titulosMaisConhecidos", StringType(), True)
])


series_schema = StructType([
    StructField("id", StringType(), True),
    StructField("tituloPincipal", StringType(), True),
    StructField("tituloOriginal", StringType(), True),
    StructField("anoLancamento", IntegerType(), True),
    StructField("anoTermino", IntegerType(), True),
    StructField("tempoMinutos", IntegerType(), True),
    StructField("genero", StringType(), True),
    StructField("notaMedia", FloatType(), True),
    StructField("numeroVotos", IntegerType(), True),
    StructField("generoArtista", StringType(), True),
    StructField("personagem", StringType(), True),
    StructField("nomeArtista", StringType(), True),
    StructField("anoNascimento", IntegerType(), True),
    StructField("anoFalecimento", IntegerType(), True),
    StructField("profissao", StringType(), True),
    StructField("titulosMaisConhecidos", StringType(), True)
])

# Data atual
current_date = datetime.now().strftime('%Y-%m-%d')

# Constantes de caminhos S3
RAW_ZONE_PATH = "s3://data-lake-mateus/Raw/Local/CSV"
TRUSTED_ZONE_PATH = "s3://data-lake-mateus/Trusted/Local/PARQUET"

# Função para processar dados
def process_data(entity, schema):
    raw_path = f"{RAW_ZONE_PATH}/{entity}/2024/11/28/{entity.lower()}.csv"
    trusted_path = f"{TRUSTED_ZONE_PATH}/{entity}/{current_date}/"
    
    # Leitura dos dados
    df_raw = spark.read.option("delimiter", "|").csv(raw_path, header=True, schema=schema)
    
    # Limpeza dos dados
    df_clean = (
    df_raw
    .dropDuplicates()
    .dropna(how="all")
    .select([
        F.when(F.col(col) == "\\N", None).otherwise(F.col(col)).alias(col) 
        for col in df_raw.columns
    ])
    .withColumn("id", F.regexp_replace(F.col("id"), "[^0-9]", "").cast(IntegerType()))
    .withColumn(
        "titulosMaisConhecidos",
        F.when(
            F.col("titulosMaisConhecidos").isNotNull(),
            F.concat_ws(
                ",", 
                F.expr("transform(split(titulosMaisConhecidos, ','), x -> cast(regexp_replace(x, '[^0-9]', '') as int))")
            )
        ).otherwise(None)
    )
)

    
    
    df_clean.write.mode("overwrite").format("parquet").save(trusted_path)
    print(f"Dados do {entity} processados e salvos em: {trusted_path}")

# Processar filmes e séries
process_data("Movies", movies_schema)
process_data("Series", series_schema)


job.commit()

