Para rodar o código será necessário na célula 3 importar as tabelas que anexamos juntamente ao código

Instalação e configuração do ambiente

 Esta célula configura o ambiente necessário para execução do Apache Spark no Google Colab.

  - Instalação do Apache Spark 3.5
  - Configuração do Java 8
  - Download do MongoDB Spark Connector

 Nesta fase inicia-se a parte prática do pipeline de dados, onde a infraestrutura distribuída é preparada para integrar
 múltiplas fontes (MongoDB + CSV) e executar operações paralelas.

In [4]:
# CÉLULA 1: Instalação e COnfiguração do Spark e MongoDB
print(" Configurando ambiente Spark...")

!pip install findspark pymongo pyspark

!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

!wget -q https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/3.0.1/mongo-spark-connector_2.12-3.0.1.jar

print(" Downloads concluídos!")

# Configuração do ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["PYSPARK_SUBMIT_ARGS"] = """
--jars /content/mongo-spark-connector_2.12-3.0.1.jar
--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
pyspark-shell
"""

import findspark
findspark.init()

print(" Ambiente Spark configurado com sucesso!")

 Configurando ambiente Spark...
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Collecting pymongo
  Downloading pymongo-4.15.4-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.8.0-py3-none-any.whl.metadata (5.7 kB)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Downloading pymongo-4.15.4-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (1.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m20.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.8.0-py3-none-any.whl (331 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m331.1/331.1 kB[0m [31m22.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: findspark, dnspython, pymongo
Successfully installed dnspython-2.8.0 findspark-2.0.1 pymongo-4.15.4
Get

IMPLEMENTAÇÃO / INGESTÃO DE DADOS

Carregamento de fonte complementar (CSV)

importar os arquivos sql após executar a célula


In [5]:
# CÉLULA 2: Upload do arquivos
print(" FAÇA O UPLOAD DOS ARQUIVOS CSV:")

from google.colab import files
uploaded = files.upload()

print(" Arquivos carregados:")
for filename in uploaded.keys():
    print(f"   - {filename} ({len(uploaded[filename])} bytes)")

 FAÇA O UPLOAD DOS ARQUIVOS CSV:


Saving matriculas.csv to matriculas.csv
Saving cursos.csv to cursos.csv
Saving alunos.csv to alunos.csv
 Arquivos carregados:
   - matriculas.csv (1407 bytes)
   - cursos.csv (2072 bytes)
   - alunos.csv (2402 bytes)


In [6]:
# CÉLULA 3: Configuração do MongoBD e do Spark
print(" Configurando conexão com MongoDB...")

from pyspark.sql import SparkSession
from pymongo import MongoClient

MONGODB_URI = "mongodb+srv://usuario_spark:SenhaSegura123!@cluster-academico.hotgdc0.mongodb.net/"
DATABASE_NAME = "sistema_academico"
COLLECTION_NAME = "alunos"

spark = SparkSession.builder \
    .appName("SistemaAcademico-MongoDB") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print(" Spark Session criada!")

 Configurando conexão com MongoDB...
 Spark Session criada!


MIGRAÇÃO DE DADOS

A célula implementa:
  1. Leitura dos CSVs (relacional)
  2. Construção dos documentos JSON
  3. Criação da coleção "alunos" no MongoDB Atlas
  4. Inserção de todos os documentos

In [8]:
# CÉLULA 4: Inserção de dados no MongoDB
print(" Inserindo dados no MongoDB Atlas...")

!pip install pymongo
import pandas as pd
import json
from pymongo import MongoClient

try:
    client = MongoClient(MONGODB_URI)
    db = client[DATABASE_NAME]
    collection = db[COLLECTION_NAME]

    print(" Conectado ao MongoDB Atlas!")

    print(" Carregando arquivos CSV...")
    df_alunos = pd.read_csv('alunos.csv', delimiter=';')
    df_matriculas = pd.read_csv('matriculas.csv', delimiter=';')
    df_cursos = pd.read_csv('cursos.csv', delimiter=';')

    print(f" Dados carregados: {len(df_alunos)} alunos, {len(df_matriculas)} matrículas, {len(df_cursos)} cursos")

    documentos_alunos = []

    for index, aluno in df_alunos.iterrows():
        matriculas_aluno = df_matriculas[df_matriculas['ALUNO_ID'] == aluno['ID']]

        matriculas_array = []
        for _, matricula in matriculas_aluno.iterrows():
            curso_info = df_cursos[df_cursos['ID'] == matricula['CURSO_ID']]

            if not curso_info.empty:
                curso = curso_info.iloc[0]

                nota = matricula['NOTA']
                if pd.isna(nota) or str(nota) == '\\N':
                    nota_num = None
                else:
                    try:
                        nota_num = float(nota)
                    except:
                        nota_num = None

                matricula_doc = {
                    "matricula_id": int(matricula['ID']),
                    "curso_id": int(matricula['CURSO_ID']),
                    "data_matricula": matricula['DATA_MATRICULA'],
                    "status": matricula['STATUS'],
                    "nota": nota_num,
                    "curso_info": {
                        "codigo": curso['CODIGO'],
                        "nome": curso['NOME'],
                        "descricao": curso['DESCRICAO'],
                        "creditos": int(curso['CREDITOS']),
                        "capacidade": int(curso['CAPACIDADE'])
                    }
                }
                matriculas_array.append(matricula_doc)

        aluno_doc = {
            "aluno_id": int(aluno['ID']),
            "numero_matricula": int(aluno['NUMERO_MATRICULA']),
            "nome": aluno['NOME'],
            "email": aluno['EMAIL'],
            "data_nascimento": aluno['DATA_NASCIMENTO'],
            "data_ingresso": aluno['DATA_INGRESSO'],
            "status_aluno": aluno['STATUS'],
            "matriculas": matriculas_array,
            "total_matriculas": len(matriculas_array),
            "matriculas_ativas": len([m for m in matriculas_array if m['status'] == 'matriculado']),
            "matriculas_concluidas": len([m for m in matriculas_array if m['status'] == 'concluido'])
        }
        documentos_alunos.append(aluno_doc)

    if documentos_alunos:
        collection.delete_many({})
        result = collection.insert_many(documentos_alunos)
        print(f"✅ {len(result.inserted_ids)} documentos inseridos no MongoDB!")

        # Verificar
        total_docs = collection.count_documents({})
        print(f" Total de documentos: {total_docs}")

except Exception as e:
    print(f" Erro: {e}")

 Inserindo dados no MongoDB Atlas...
 Conectado ao MongoDB Atlas!
 Carregando arquivos CSV...
 Dados carregados: 20 alunos, 20 matrículas, 20 cursos
✅ 20 documentos inseridos no MongoDB!
 Total de documentos: 20


- Coleta de documentos diretamente do MongoDB Atlas
- Conversão para Pandas e posteriormente para Spark DataFrame

In [9]:
# CÉLULA 5: Carregar dados do MongoDB para o Spark

try:
    from pymongo import MongoClient
    import pandas as pd

    client = MongoClient(MONGODB_URI)
    db = client[DATABASE_NAME]
    collection = db[COLLECTION_NAME]

    documents = list(collection.find())

    if documents:
        df_pandas = pd.DataFrame(documents)
        if '_id' in df_pandas.columns:
            df_pandas = df_pandas.drop('_id', axis=1)

        df_mongodb = spark.createDataFrame(df_pandas)

        print(f" {df_mongodb.count()} documentos carregados do MongoDB!")
        print(" Estrutura dos dados:")
        df_mongodb.printSchema()
        df_mongodb.select("aluno_id", "nome", "status_aluno", "total_matriculas").show(5, truncate=False)

    else:
        print(" Nenhum documento encontrado")

except Exception as e:
    print(f" Erro: {e}")

 20 documentos carregados do MongoDB!
 Estrutura dos dados:
root
 |-- aluno_id: long (nullable = true)
 |-- numero_matricula: long (nullable = true)
 |-- nome: string (nullable = true)
 |-- email: string (nullable = true)
 |-- data_nascimento: string (nullable = true)
 |-- data_ingresso: string (nullable = true)
 |-- status_aluno: string (nullable = true)
 |-- matriculas: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: long (valueContainsNull = true)
 |-- total_matriculas: long (nullable = true)
 |-- matriculas_ativas: long (nullable = true)
 |-- matriculas_concluidas: long (nullable = true)

+--------+--------------+------------+----------------+
|aluno_id|nome          |status_aluno|total_matriculas|
+--------+--------------+------------+----------------+
|1       |Ana Silva     |ativo       |2               |
|2       |Bruno Oliveira|ativo       |2               |
|3       |Carla Pereira |ativo       |1           

Nesta célula, os arquivos CSV são carregados diretamente pelo Spark,
 formando DataFrames distribuídos que podem ser integrados com outras fontes.
 Essa etapa permite combinar o modelo relacional com o modelo NoSQL,

In [10]:
# CÉLULA 6: Carregar CSV para Spark

df_alunos_csv = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "true").csv("alunos.csv")
df_cursos_csv = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "true").csv("cursos.csv")
df_matriculas_csv = spark.read.option("header", "true").option("delimiter", ";").option("inferSchema", "true").csv("matriculas.csv")

print("✅ CSVs carregados:")
print(f" Alunos: {df_alunos_csv.count()} registros")
print(f" Cursos: {df_cursos_csv.count()} registros")
print(f" Matrículas: {df_matriculas_csv.count()} registros")

✅ CSVs carregados:
 Alunos: 20 registros
 Cursos: 20 registros
 Matrículas: 20 registros


Esta célula implementa as operações centrais do pipeline:

  1. JOIN entre matrículas, cursos e alunos
  2. Uso de broadcast join para otimizar performance
  3. Uso de cache() para reutilizar DataFrames sem recalcular

In [11]:
# CÉLULA 7: Aplicar Otimizações e Joins

from pyspark.sql.functions import col, broadcast

df_mongodb.cache()
df_cursos_csv.cache()

print(f" Cache aplicado: MongoDB={df_mongodb.is_cached}, Cursos={df_cursos_csv.is_cached}")

df_cursos_preparados = df_cursos_csv.select(
    col("ID").alias("CURSO_ID_JOIN"),
    col("CODIGO").alias("CODIGO_CURSO"),
    col("NOME").alias("NOME_CURSO"),
    col("CAPACIDADE")
)

df_alunos_preparados = df_alunos_csv.select(
    col("ID").alias("ALUNO_ID_JOIN"),
    col("NUMERO_MATRICULA"),
    col("NOME").alias("NOME_ALUNO"),
    col("STATUS").alias("STATUS_ALUNO")
)

df_completo = df_matriculas_csv.join(
    broadcast(df_cursos_preparados),
    df_matriculas_csv.CURSO_ID == df_cursos_preparados.CURSO_ID_JOIN
).join(
    df_alunos_preparados,
    df_matriculas_csv.ALUNO_ID == df_alunos_preparados.ALUNO_ID_JOIN
)

print(f" JOINs realizados: {df_completo.count()} registros")
print(" Colunas disponíveis após JOIN:")
for coluna in df_completo.columns:
    print(f"   - {coluna}")

 Cache aplicado: MongoDB=True, Cursos=True
 JOINs realizados: 20 registros
 Colunas disponíveis após JOIN:
   - ID
   - ALUNO_ID
   - CURSO_ID
   - DATA_MATRICULA
   - STATUS
   - NOTA
   - CRIADO_EM
   - CURSO_ID_JOIN
   - CODIGO_CURSO
   - NOME_CURSO
   - CAPACIDADE
   - ALUNO_ID_JOIN
   - NUMERO_MATRICULA
   - NOME_ALUNO
   - STATUS_ALUNO


Comparar performance entre join normal e broadcast join

   Mensurar melhoria em tempo de execução

 O Broadcast Join replica o DataFrame pequeno em todos os nós,
 eliminando shuffle e acelerando significativamente o JOIN.

In [12]:
# CÉLULA 8: Comparação de performace
print(" Comparando performance...")

import time
from pyspark.sql.functions import broadcast

inicio_tradicional = time.time()
df_tradicional = df_matriculas_csv.join(df_cursos_csv, df_matriculas_csv.CURSO_ID == df_cursos_csv.ID)
count_tradicional = df_tradicional.count()
tempo_tradicional = time.time() - inicio_tradicional

inicio_broadcast = time.time()
df_broadcast = df_matriculas_csv.join(broadcast(df_cursos_csv), df_matriculas_csv.CURSO_ID == df_cursos_csv.ID)
count_broadcast = df_broadcast.count()
tempo_broadcast = time.time() - inicio_broadcast

print(f"RESULTADOS:")
print(f"JOIN Tradicional: {tempo_tradicional:.3f} segundos")
print(f"Broadcast JOIN: {tempo_broadcast:.3f} segundos")
print(f"Melhoria: {((tempo_tradicional - tempo_broadcast) / tempo_tradicional * 100):.1f}% mais rápido")

 Comparando performance...
RESULTADOS:
JOIN Tradicional: 0.545 segundos
Broadcast JOIN: 0.469 segundos
Melhoria: 14.0% mais rápido


As células 9 a 12 realizam múltiplas análises de negócio:

  • Cursos mais populares

  • Desempenho acadêmico (médias, min, max)

  • Perfil dos alunos

  • Evolução temporal das matrículas


 Cada operação utiliza:
  - groupBy()
  - filtros condicionais
  - agregações distribuídas

In [13]:
# CÉLULA 9: Análise - Cursos mais populares

from pyspark.sql.functions import count, when, expr, desc

cursos_populares = df_completo.groupBy(
    "CODIGO_CURSO", "NOME_CURSO", "CAPACIDADE"
).agg(
    count("*").alias("total_matriculas"),
    expr("SUM(CASE WHEN STATUS = 'matriculado' THEN 1 ELSE 0 END)").alias("matriculas_ativas"),
    expr("SUM(CASE WHEN STATUS = 'concluido' THEN 1 ELSE 0 END)").alias("matriculas_concluidas")
).withColumn(
    "taxa_ocupacao", (col("total_matriculas") / col("CAPACIDADE") * 100).cast("decimal(5,2)")
).orderBy(desc("total_matriculas"))

print(" Top 10 cursos mais populares:")
cursos_populares.show(10, truncate=False)

 Top 10 cursos mais populares:
+------------+------------------------+----------+----------------+-----------------+---------------------+-------------+
|CODIGO_CURSO|NOME_CURSO              |CAPACIDADE|total_matriculas|matriculas_ativas|matriculas_concluidas|taxa_ocupacao|
+------------+------------------------+----------+----------------+-----------------+---------------------+-------------+
|PROG101     |Introdução à Programação|80        |5               |3                |2                    |6.25         |
|BD101       |Banco de Dados          |50        |2               |1                |1                    |4.00         |
|MAT101      |Matemática Básica       |60        |2               |0                |2                    |3.33         |
|MAT201      |Cálculo I               |50        |2               |0                |2                    |4.00         |
|MAT301      |Álgebra Linear          |45        |1               |0                |1                    |2.22    

In [14]:
# CÉLULA 10: Análise - desempenho acadêmico

df_completo_nota = df_completo.withColumn("NOTA_NUM",
    when(col("NOTA") == "\\N", None).otherwise(col("NOTA").cast("double")))

desempenho_cursos = df_completo_nota.filter(
    (col("STATUS") == "concluido") & (col("NOTA_NUM").isNotNull())
).groupBy("CODIGO_CURSO", "NOME_CURSO").agg(
    expr("ROUND(AVG(NOTA_NUM), 2)").alias("media_geral"),
    count("*").alias("total_avaliados"),
    expr("ROUND(MIN(NOTA_NUM), 2)").alias("nota_minima"),
    expr("ROUND(MAX(NOTA_NUM), 2)").alias("nota_maxima")
).orderBy(desc("media_geral"))

print(" Cursos com melhor desempenho:")
desempenho_cursos.show(truncate=False)

 Cursos com melhor desempenho:
+------------+------------------------+-----------+---------------+-----------+-----------+
|CODIGO_CURSO|NOME_CURSO              |media_geral|total_avaliados|nota_minima|nota_maxima|
+------------+------------------------+-----------+---------------+-----------+-----------+
|MAT201      |Cálculo I               |8.88       |2              |8.75       |9.0        |
|BD101       |Banco de Dados          |8.4        |1              |8.4        |8.4        |
|PROG101     |Introdução à Programação|8.15       |2              |7.2        |9.1        |
|MAT301      |Álgebra Linear          |8.0        |1              |8.0        |8.0        |
|EST101      |Estatística             |7.8        |1              |7.8        |7.8        |
|MAT101      |Matemática Básica       |7.0        |2              |6.5        |7.5        |
|BIO101      |Biologia Geral          |7.0        |1              |7.0        |7.0        |
|HIS101      |História Geral          |6.9       

In [15]:
# CÉLULA 11: Análise - Perfil do aluno
alunos_ativos = df_completo.groupBy(
    "NUMERO_MATRICULA", "NOME_ALUNO", "STATUS_ALUNO"
).agg(
    count("*").alias("total_matriculas"),
    expr("SUM(CASE WHEN STATUS = 'matriculado' THEN 1 ELSE 0 END)").alias("cursando_atualmente"),
    expr("SUM(CASE WHEN STATUS = 'concluido' THEN 1 ELSE 0 END)").alias("cursos_concluidos")
).filter(col("total_matriculas") > 0).orderBy(desc("total_matriculas"))

print(" Alunos com mais matrículas:")
alunos_ativos.show(10, truncate=False)

 Alunos com mais matrículas:
+----------------+---------------+------------+----------------+-------------------+-----------------+
|NUMERO_MATRICULA|NOME_ALUNO     |STATUS_ALUNO|total_matriculas|cursando_atualmente|cursos_concluidos|
+----------------+---------------+------------+----------------+-------------------+-----------------+
|20230001        |Ana Silva      |ativo       |2               |2                  |0                |
|20230002        |Bruno Oliveira |ativo       |2               |0                  |2                |
|20230011        |Kátia Fernandes|ativo       |1               |0                  |1                |
|20230015        |Olivia Castro  |ativo       |1               |1                  |0                |
|20230010        |João Martins   |ativo       |1               |0                  |1                |
|20230014        |Nicolas Freitas|ativo       |1               |0                  |1                |
|20230018        |Rafael Duarte  |ativo     

In [16]:
# CÉLULA 12: Análise - Evolução temporal
from pyspark.sql.functions import year

evolucao_temporal = df_completo.withColumn("ano", year("DATA_MATRICULA")) \
    .groupBy("ano").agg(
        count("*").alias("total_matriculas"),
        expr("SUM(CASE WHEN STATUS = 'concluido' THEN 1 ELSE 0 END)").alias("concluidas")
    ).orderBy("ano")

print(" Evolução das matrículas por ano:")
evolucao_temporal.show()

 Evolução das matrículas por ano:
+----+----------------+----------+
| ano|total_matriculas|concluidas|
+----+----------------+----------+
|2019|               1|         0|
|2020|               2|         2|
|2021|               2|         2|
|2022|               4|         4|
|2023|               5|         3|
|2024|               6|         0|
+----+----------------+----------+



Implementa a etapa final do pipeline:

  1. Salvamento dos DataFrames transformados em Parquet
  2. Exportação para CSV

In [17]:
# CÉLULA 13: Salvar resultados

!mkdir -p /content/resultados

cursos_populares.write.mode("overwrite").parquet("/content/resultados/cursos_populares")
desempenho_cursos.write.mode("overwrite").parquet("/content/resultados/desempenho_cursos")
alunos_ativos.write.mode("overwrite").parquet("/content/resultados/alunos_ativos")

cursos_populares.toPandas().to_csv('cursos_populares.csv', index=False, encoding='utf-8')
desempenho_cursos.toPandas().to_csv('desempenho_cursos.csv', index=False, encoding='utf-8')
alunos_ativos.toPandas().to_csv('alunos_ativos.csv', index=False, encoding='utf-8')

print(" Resultados salvos!")

 Resultados salvos!


Download


In [18]:
# CÉLULA 14: Download
print("Preparando download...")

from google.colab import files

files.download('cursos_populares.csv')
files.download('desempenho_cursos.csv')
files.download('alunos_ativos.csv')

print("Downloads prontos!")

Preparando download...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloads prontos!


Resumo automatizado de tudo que foi executado:
#

  • Conexão ao MongoDB

  • Quantidade de documentos e dados carregados

  • Otimizações aplicadas

  • Resultados das análises

In [19]:
# CÉLULA 15: Resumo
print("\n" + "="*70)
print(" RELATÓRIO FINAL - MONGODB ATLAS + SPARK")
print("="*70)

print(" CONEXÃO E DADOS:")
print(f"   • MongoDB Atlas: Conectado com sucesso")
print(f"   • Documentos no MongoDB: {df_mongodb.count()}")
print(f"   • Alunos no sistema: {df_alunos_csv.count()}")
print(f"   • Cursos disponíveis: {df_cursos_csv.count()}")

print(f"\n OTIMIZAÇÕES:")
print(f"   • Cache aplicado: Sim")
print(f"   • Broadcast Join: {((tempo_tradicional - tempo_broadcast) / tempo_tradicional * 100):.1f}% mais rápido")

print(f"\n ANÁLISES REALIZADAS:")
print("   • Cursos mais populares")
print("   • Desempenho acadêmico")
print("   • Perfil dos alunos")
print("   • Evolução temporal")

print(f"\n RESULTADOS:")
print("   • Dados salvos em Parquet e CSV")
print("   • Downloads disponíveis")
print("    MongoDB como fonte principal")
print("    Spark para processamento")
print("    Otimizações aplicadas")
print("    Análises completas")
print("    Resultados exportados")
print("="*70)


 RELATÓRIO FINAL - MONGODB ATLAS + SPARK
 CONEXÃO E DADOS:
   • MongoDB Atlas: Conectado com sucesso
   • Documentos no MongoDB: 20
   • Alunos no sistema: 20
   • Cursos disponíveis: 20

 OTIMIZAÇÕES:
   • Cache aplicado: Sim
   • Broadcast Join: 14.0% mais rápido

 ANÁLISES REALIZADAS:
   • Cursos mais populares
   • Desempenho acadêmico
   • Perfil dos alunos
   • Evolução temporal

 RESULTADOS:
   • Dados salvos em Parquet e CSV
   • Downloads disponíveis
    MongoDB como fonte principal
    Spark para processamento
    Otimizações aplicadas
    Análises completas
    Resultados exportados
