In [None]:
# Um Acionador do Spark que recebe um sparksession, sparkcontext e retorna um build com as dependências jars instanciadas, tamanho da memória e quantidade de núcleos usados e máximo, e também o nível de log só como "ERROR"
def spark_builder(sparksession, sparkcontext):
  # Instanciar as dependências jars
  jars = ["spark-sql-kafka-0-10_2.12-3.1.2.jar", "spark-avro_2.12-3.1.2.jar", "delta-core_2.12-1.0.0.jar"] # Exemplo de jars, pode variar de acordo com o projeto
  sparksession.conf.set("spark.jars", ",".join(jars))

  # Configurar o tamanho da memória e a quantidade de núcleos usados e máximo
  sparksession.conf.set("spark.executor.memory", "4g") # Exemplo de memória, pode variar de acordo com o projeto
  sparksession.conf.set("spark.executor.cores", "2") # Exemplo de núcleos, pode variar de acordo com o projeto
  sparksession.conf.set("spark.cores.max", "4") # Exemplo de núcleos máximos, pode variar de acordo com o projeto

  # Configurar o nível de log só como "ERROR"
  sparkcontext.setLogLevel("ERROR")

  # Retornar o build
  return sparksession.builder.getOrCreate()

# Uma função de download das dependências maven que recebe o link do repositório central maven e faz o download (usando só python)
def maven_downloader(repo_url):
  # Importar a biblioteca pysmartdl para fazer o download
  from pysmartdl import SmartDL

  # Criar uma lista de dependências maven (exemplo, pode variar de acordo com o projeto)
  dependencies = ["org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2", "org.apache.spark:spark-avro_2.12:3.1.2", "io.delta:delta-core_2.12:1.0.0"]

  # Iterar sobre as dependências e fazer o download de cada uma
  for dependency in dependencies:
    # Separar o grupo, o artefato e a versão da dependência
    group, artifact, version = dependency.split(":")
    # Construir o link do jar da dependência
    jar_url = repo_url + "/" + group.replace(".", "/") + "/" + artifact + "/" + version + "/" + artifact + "-" + version + ".jar"
    # Criar um objeto SmartDL com o link do jar
    downloader = SmartDL(jar_url)
    # Iniciar o download do jar
    downloader.start()
    # Imprimir uma mensagem de sucesso
    print(f"Downloaded {artifact}-{version}.jar from {jar_url}")

# Uma função que recebe um diretório de origem para mapear todos os arquivos avro de um repositório no s3 e criar um dataframe, retorna um dataframe delta
def avro_to_delta(source_dir):
  # Importar as bibliotecas spark, delta e boto3
  from pyspark.sql import SparkSession
  from delta import DeltaTable
  import boto3

  # Criar um objeto sparksession usando a função spark_builder
  spark = spark_builder(SparkSession.builder, SparkSession.builder.getOrCreate().sparkContext)

  # Criar um objeto boto3 para acessar o s3
  s3 = boto3.resource("s3")

  # Criar uma lista vazia para armazenar os caminhos dos arquivos avro
  avro_files = []

  # Iterar sobre os objetos do diretório de origem no s3
  for obj in s3.Bucket("my-bucket").objects.filter(Prefix=source_dir): # Exemplo de bucket, pode variar de acordo com o projeto
    # Verificar se o objeto é um arquivo avro
    if obj.key.endswith(".avro"):
      # Adicionar o caminho do arquivo avro à lista
      avro_files.append(f"s3a://{obj.bucket_name}/{obj.key}")

  # Criar um dataframe spark a partir dos arquivos avro usando o formato avro
  df = spark.read.format("avro").load(avro_files)

  # Criar um dataframe delta a partir do dataframe spark usando o formato delta
  delta_df = df.write.format("delta").saveAsTable("my_table") # Exemplo de tabela, pode variar de acordo com o projeto

  # Retornar o dataframe delta
  return delta_df

# Uma função que recebe um dataframe delta e faz os processamentos de uma camada bronze
def bronze_processing(delta_df):
  # Importar as bibliotecas spark e delta
  from pyspark.sql import SparkSession
  from delta import DeltaTable

  # Criar um objeto sparksession usando a função spark_builder
  spark = spark_builder(SparkSession.builder, SparkSession.builder.getOrCreate().sparkContext)

  # Criar um objeto deltatable a partir do dataframe delta
  delta_table = DeltaTable.forPath(spark, delta_df)

  # Fazer os processamentos de uma camada bronze (exemplo, pode variar de acordo com o projeto)
  # Adicionar uma coluna com a data e hora da ingestão dos dados
  delta_table.withColumn("ingestion_timestamp", current_timestamp())
  # Adicionar uma coluna com o hash dos dados para identificar duplicatas
  delta_table.withColumn("hash", md5(concat_ws("|", *delta_table.columns)))
  # Remover as duplicatas usando o hash
  delta_table.dropDuplicates(["hash"])
  # Retornar o objeto deltatable
  return delta_table

# Uma função que recebe um dataframe delta da camada anterior e faz os processamentos da camada silver
def silver_processing(bronze_delta_df):
  # Importar as bibliotecas spark e delta
  from pyspark.sql import SparkSession
  from delta import DeltaTable

  # Criar um objeto sparksession usando a função spark_builder
  spark = spark_builder(SparkSession.builder, SparkSession.builder.getOrCreate().sparkContext)

  # Criar um objeto deltatable a partir do dataframe delta da camada bronze
  bronze_delta_table = DeltaTable.forPath(spark, bronze_delta_df)

  # Fazer os processamentos da camada silver (exemplo, pode variar de acordo com o projeto)
  # Criar uma visão temporária a partir do deltatable da camada bronze
  bronze_delta_table.createOrReplaceTempView("bronze_view")
  # Criar um dataframe spark a partir de uma consulta SQL que aplica as transformações desejadas
  silver_df = spark.sql("""
  SELECT
    movie_id, -- Exemplo de coluna, pode variar de acordo com o projeto
    movie_title, -- Exemplo de coluna, pode variar de acordo com o projeto
    movie_genre, -- Exemplo de coluna, pode variar de acordo com o projeto
    user_id, -- Exemplo de coluna, pode variar de acordo com o projeto
    user_rating, -- Exemplo de coluna, pode variar de acordo com o projeto
    ingestion_timestamp, -- Coluna da camada bronze
    hash -- Coluna da camada bronze
  FROM bronze_view
  WHERE user_rating IS NOT NULL -- Exemplo de filtro, pode variar de acordo com o projeto
  """)
  # Criar um dataframe delta a partir do dataframe spark usando o formato delta
  silver_delta_df = silver_df.write.format("delta").saveAsTable("silver_table") # Exemplo de tabela, pode variar de acordo com o projeto

  # Retornar o dataframe delta da camada silver
  return silver_delta_df

# Uma função que recebe um ou mais diretórios da camada silver delta lake e faz os processamentos da camada gold
def gold_processing(silver_delta_dirs):
  # Importar as bibliotecas spark e delta
  from pyspark.sql import SparkSession
  from delta import DeltaTable

  # Criar um objeto sparksession usando a função spark_builder
  spark = spark_builder(SparkSession.builder, SparkSession.builder.getOrCreate().sparkContext)

  # Criar uma lista vazia para armazenar os objetos deltatable da camada silver
  silver_delta_tables = []

  # Iterar sobre os diretórios da camada silver
  for silver_delta_dir in silver_delta_dirs:
    # Criar um objeto deltatable a partir do diretório
    silver_delta_table = DeltaTable.forPath(spark, silver_delta_dir)
    # Adicionar o objeto deltatable à lista
    silver_delta_tables.append(silver_delta_table)

  # Fazer os processamentos da camada gold (exemplo, pode variar de acordo com o projeto)
  # Criar um dataframe spark vazio para armazenar os dados da camada gold
  gold_df = spark.createDataFrame([], schema=None)
  # Iterar sobre os objetos deltatable da camada silver
  for silver_delta_table in silver_delta_tables:
    # Criar uma visão temporária a partir do deltatable da camada silver
    silver_delta_table.createOrReplaceTempView("silver_view")
    # Criar um dataframe spark a partir de uma consulta SQL que aplica as agregações desejadas
    gold_df_temp = spark.sql("""
    SELECT
      movie_id, -- Exemplo de coluna, pode variar de acordo com o projeto
      movie_title, -- Exemplo de coluna, pode variar de acordo com o projeto
      movie_genre, -- Exemplo de coluna, pode variar de acordo com o projeto
      AVG(user_rating) AS average_rating, -- Exemplo de agregação, pode variar de acordo com o projeto
      COUNT(user_id) AS number_of_users -- Exemplo de agregação, pode variar de acordo com o projeto
    FROM silver_view
    GROUP BY movie_id, movie_title, movie_genre -- Exemplo de agrupamento, pode variar de acordo com o projeto
    """)
    # Unir o dataframe spark temporário com o dataframe spark da camada gold
    gold_df = gold_df.union(gold_df_temp)

  # Criar um dataframe delta a partir do dataframe spark da camada gold usando o formato delta
  gold_delta_df = gold_df.write.format("delta").saveAsTable("gold_table") # Exemplo de tabela, pode variar de acordo com o projeto

  # Retornar o dataframe delta da camada gold
  return gold_delta_df
