In [1]:
from pyspark.sql import SparkSession

# Passo 1: Configurar o Spark
minio_bucket_bronze = "bronze"
minio_bucket_prata = "prata"

spark = SparkSession.builder \
    .appName("TransformacaoPrata") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-bundle:1.11.1026") \
    .config("fs.s3a.endpoint", "http://172.18.0.4:9000") \
    .config("fs.s3a.access.key", "minioaccesskey") \
    .config("fs.s3a.secret.key", "miniosecretkey") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

# Passo 2: Ler os dados da camada Bronze
path_matricula_aulas_bronze = f"s3a://{minio_bucket_bronze}/csv/matricula_aulas"
df_matricula_aulas_bronze = spark.read.parquet(path_matricula_aulas_bronze)
print("Schema de matricula_aulas (Bronze):")
df_matricula_aulas_bronze.printSchema()
df_matricula_aulas_bronze.show(5)

path_matricula_disciplinas_bronze = f"s3a://{minio_bucket_bronze}/csv/matricula_disciplinas"
df_matricula_disciplinas_bronze = spark.read.parquet(path_matricula_disciplinas_bronze)
print("\nSchema de matricula_disciplinas (Bronze):")
df_matricula_disciplinas_bronze.printSchema()
df_matricula_disciplinas_bronze.show(5)

path_matricula_notas_bronze = f"s3a://{minio_bucket_bronze}/csv/matricula_notas"
df_matricula_notas_bronze = spark.read.parquet(path_matricula_notas_bronze)
print("\nSchema de matricula_notas (Bronze):")
df_matricula_notas_bronze.printSchema()
df_matricula_notas_bronze.show(5)

path_turma_avaliacoes_bronze = f"s3a://{minio_bucket_bronze}/csv/turma_avaliacoes"
df_turma_avaliacoes_bronze = spark.read.parquet(path_turma_avaliacoes_bronze)
print("\nSchema de turma_avaliacoes (Bronze):")
df_turma_avaliacoes_bronze.printSchema()
df_turma_avaliacoes_bronze.show(5)

# Passo 3: Realizar transformações na camada Prata

# Transformação de matricula_aulas
df_matricula_aulas_prata = df_matricula_aulas_bronze.select(
    "id_matricula",
    "id_disciplina",
    "data_aula"
).withColumnRenamed("id_matricula", "matricula_id") \
 .withColumnRenamed("id_disciplina", "disciplina_id") \
 .withColumnRenamed("data_aula", "aula_data")

print("\nSchema de matricula_aulas (Prata):")
df_matricula_aulas_prata.printSchema()
df_matricula_aulas_prata.show(5)

# Transformação de matricula_disciplinas
df_matricula_disciplinas_prata = df_matricula_disciplinas_bronze.select(
    "id_matricula",
    "id_disciplina"
).withColumnRenamed("id_matricula", "matricula_id") \
 .withColumnRenamed("id_disciplina", "disciplina_id")

print("\nSchema de matricula_disciplinas (Prata):")
df_matricula_disciplinas_prata.printSchema()
df_matricula_disciplinas_prata.show(5)

# Join de matricula_aulas com matricula_disciplinas
df_matriculas_completa_prata = df_matricula_aulas_prata.join(
    df_matricula_disciplinas_prata,
    ["matricula_id", "disciplina_id"],
    "inner"
)

print("\nSchema de matriculas_completa (Prata):")
df_matriculas_completa_prata.printSchema()
df_matriculas_completa_prata.show(5)

# Transformação de matricula_notas (selecionando colunas relevantes e renomeando)
df_matricula_notas_prata = df_matricula_notas_bronze.select(
    "id_matricula",
    "id_disciplina",
    "nota"
).withColumnRenamed("id_matricula", "matricula_id") \
 .withColumnRenamed("id_disciplina", "disciplina_id")

print("\nSchema de matricula_notas (Prata):")
df_matricula_notas_prata.printSchema()
df_matricula_notas_prata.show(5)

# Transformação de turma_avaliacoes (selecionando colunas relevantes e renomeando)
df_turma_avaliacoes_prata = df_turma_avaliacoes_bronze.select(
    "id_turma",
    "id_disciplina",
    "titulo"
).withColumnRenamed("id_turma", "turma_id") \
 .withColumnRenamed("id_disciplina", "disciplina_id") \
 .withColumnRenamed("titulo", "avaliacao_titulo")

print("\nSchema de turma_avaliacoes (Prata):")
df_turma_avaliacoes_prata.printSchema()
df_turma_avaliacoes_prata.show(5)

# Passo 4: Persistir os dados na camada Prata
path_matricula_aulas_prata_parquet = f"s3a://{minio_bucket_prata}/matricula_aulas"
df_matricula_aulas_prata.write.mode("overwrite").parquet(path_matricula_aulas_prata_parquet)
print(f"\nDados de matricula_aulas (Prata) escritos em: {path_matricula_aulas_prata_parquet}")

path_matricula_disciplinas_prata_parquet = f"s3a://{minio_bucket_prata}/matricula_disciplinas"
df_matricula_disciplinas_prata.write.mode("overwrite").parquet(path_matricula_disciplinas_prata_parquet)
print(f"Dados de matricula_disciplinas (Prata) escritos em: {path_matricula_disciplinas_prata_parquet}")

path_matricula_notas_prata_parquet = f"s3a://{minio_bucket_prata}/matricula_notas"
df_matricula_notas_prata.write.mode("overwrite").parquet(path_matricula_notas_prata_parquet)
print(f"Dados de matricula_notas (Prata) escritos em: {path_matricula_notas_prata_parquet}")

path_turma_avaliacoes_prata_parquet = f"s3a://{minio_bucket_prata}/turma_avaliacoes"
df_turma_avaliacoes_prata.write.mode("overwrite").parquet(path_turma_avaliacoes_prata_parquet)
print(f"Dados de turma_avaliacoes (Prata) escritos em: {path_turma_avaliacoes_prata_parquet}")

path_matriculas_completa_prata_parquet = f"s3a://{minio_bucket_prata}/matriculas_completa"
df_matriculas_completa_prata.write.mode("overwrite").parquet(path_matriculas_completa_prata_parquet)
print(f"Dados completos de matrículas (Prata) escritos em: {path_matriculas_completa_prata_parquet}")

# Passo 5: Parar a sessão Spark (opcional)
spark.stop()

Schema de matricula_aulas (Bronze):
root
 |-- id_matricula: integer (nullable = true)
 |-- id_disciplina: integer (nullable = true)
 |-- data_aula: date (nullable = true)
 |-- presente: boolean (nullable = true)

+------------+-------------+----------+--------+
|id_matricula|id_disciplina| data_aula|presente|
+------------+-------------+----------+--------+
|          25|          501|2020-06-08|    true|
|          25|          501|2020-06-15|   false|
|          25|          501|2020-06-22|    true|
|          25|          501|2020-06-29|    true|
|          25|          501|2020-07-06|    true|
+------------+-------------+----------+--------+
only showing top 5 rows


Schema de matricula_disciplinas (Bronze):
root
 |-- id_matricula: integer (nullable = true)
 |-- id_disciplina: integer (nullable = true)

+------------+-------------+
|id_matricula|id_disciplina|
+------------+-------------+
|          25|          501|
|          25|          502|
|          25|          503|
|      

In [3]:
df_matricula_aulas_bronze.printSchema()

root
 |-- id_matricula: integer (nullable = true)
 |-- id_disciplina: integer (nullable = true)
 |-- data_aula: date (nullable = true)
 |-- presente: boolean (nullable = true)



In [4]:
df_matricula_disciplinas_bronze.printSchema()

root
 |-- id_matricula: integer (nullable = true)
 |-- id_disciplina: integer (nullable = true)



In [2]:
df_matricula_notas_bronze.printSchema()

root
 |-- id_matricula: integer (nullable = true)
 |-- id_disciplina: integer (nullable = true)
 |-- data: date (nullable = true)
 |-- nota: double (nullable = true)



In [2]:
df_turma_avaliacoes_bronze.printSchema()

root
 |-- id_turma: integer (nullable = true)
 |-- id_disciplina: integer (nullable = true)
 |-- data_avaliacao: date (nullable = true)
 |-- titulo: string (nullable = true)

