# BigData Environment - Exemplo de Integra√ß√£o

Este notebook demonstra como usar os servi√ßos integrados do ambiente BigData:
- **Spark**: Processamento de dados distribu√≠do
- **MinIO**: Storage de objetos
- **Airflow**: Orquestra√ß√£o de workflows
- **Jenkins**: CI/CD

## 1. Configura√ß√£o Inicial

In [None]:
# Instalar pacotes necess√°rios se n√£o estiverem dispon√≠veis
import subprocess
import sys

required_packages = ['minio', 'boto3']
for package in required_packages:
    try:
        __import__(package)
    except ImportError:
        print(f"Instalando {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# Importar bibliotecas necess√°rias
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, max, min
import matplotlib.pyplot as plt
import seaborn as sns
from minio import Minio
import io
import json

# Configurar visualiza√ß√£o
plt.style.use('seaborn-v0_8')
sns.set_palette('husl')

print("Bibliotecas importadas com sucesso!")

## üîß Verifica√ß√£o do Ambiente

**Se voc√™ encontrar o erro `ModuleNotFoundError: No module named 'pyspark'`, execute a c√©lula abaixo:**

In [None]:
# üîß Verifica√ß√£o e Instala√ß√£o do PySpark (se necess√°rio)
try:
    import pyspark
    print(f"‚úÖ PySpark j√° est√° instalado! Vers√£o: {pyspark.__version__}")
except ImportError:
    print("‚ö†Ô∏è  PySpark n√£o encontrado. Instalando...")
    import subprocess
    import sys
    
    # Instalar PySpark
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pyspark==3.5.0"])
    
    # Tentar importar novamente
    import pyspark
    print(f"‚úÖ PySpark instalado com sucesso! Vers√£o: {pyspark.__version__}")

# Testar importa√ß√µes b√°sicas
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, count, avg, max, min
    print("‚úÖ M√≥dulos do PySpark importados com sucesso!")
except Exception as e:
    print(f"‚ùå Erro ao importar m√≥dulos do PySpark: {e}")
    print("üí° Reinicie o kernel e tente novamente.")

## 2. Conex√£o com Spark

In [None]:
# Criar sess√£o Spark conectada ao cluster
spark = SparkSession.builder \
    .appName("Jupyter-Spark-Integration") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "1") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

print(f"Spark Session criada: {spark.sparkContext.appName}")
print(f"Spark UI: http://localhost:8081")
print(f"Vers√£o do Spark: {spark.version}")

## 3. Conex√£o com MinIO

In [None]:
# Configurar cliente MinIO
minio_client = Minio(
    "minio:9000",
    access_key="minioadmin",
    secret_key="minioadmin123",
    secure=False
)

# Listar buckets dispon√≠veis
try:
    buckets = minio_client.list_buckets()
    print("Buckets dispon√≠veis no MinIO:")
    for bucket in buckets:
        print(f"  - {bucket.name} (criado em: {bucket.creation_date})")
except Exception as e:
    print(f"Erro ao conectar com MinIO: {e}")

## 4. Cria√ß√£o e Processamento de Dados

In [None]:
# Criar dataset de exemplo
np.random.seed(42)
n_samples = 1000

data = {
    'id': range(1, n_samples + 1),
    'nome': [f'Usuario_{i}' for i in range(1, n_samples + 1)],
    'idade': np.random.randint(18, 70, n_samples),
    'departamento': np.random.choice(['TI', 'Vendas', 'RH', 'Marketing', 'Financeiro'], n_samples),
    'salario': np.random.normal(5000, 1500, n_samples).round(2),
    'experiencia': np.random.randint(0, 20, n_samples),
    'satisfacao': np.random.uniform(1, 5, n_samples).round(1)
}

# Criar DataFrame Pandas
df_pandas = pd.DataFrame(data)
print("Dataset criado com sucesso!")
print(f"Shape: {df_pandas.shape}")
df_pandas.head()

In [None]:
# Converter para Spark DataFrame
df_spark = spark.createDataFrame(df_pandas)

print("Esquema do DataFrame Spark:")
df_spark.printSchema()

print("\nPrimeiras 10 linhas:")
df_spark.show(10)

## 5. An√°lise de Dados com Spark

In [None]:
# Estat√≠sticas b√°sicas
print("=== Estat√≠sticas B√°sicas ===")
df_spark.describe().show()

# An√°lise por departamento
print("\n=== An√°lise por Departamento ===")
dept_stats = df_spark.groupBy("departamento") \
    .agg(
        count("*").alias("total_funcionarios"),
        avg("idade").alias("idade_media"),
        avg("salario").alias("salario_medio"),
        avg("experiencia").alias("experiencia_media"),
        avg("satisfacao").alias("satisfacao_media")
    ) \
    .orderBy("salario_medio", ascending=False)

dept_stats.show()

## 6. Salvando Dados no MinIO

In [None]:
# Salvar dados originais no MinIO via Spark
try:
    print("Salvando dados no MinIO...")
    
    # Salvar dados originais
    df_spark.write \
        .mode("overwrite") \
        .option("header", "true") \
        .csv("s3a://jupyter-data/datasets/funcionarios")
    
    # Salvar estat√≠sticas por departamento
    dept_stats.write \
        .mode("overwrite") \
        .option("header", "true") \
        .csv("s3a://jupyter-data/datasets/estatisticas_departamento")
    
    print("Dados salvos com sucesso no MinIO!")
    
except Exception as e:
    print(f"Erro ao salvar no MinIO via Spark: {e}")
    print("Tentando salvar via cliente MinIO...")
    
    # Fallback: salvar via cliente MinIO
    csv_buffer = io.StringIO()
    df_pandas.to_csv(csv_buffer, index=False)
    csv_data = csv_buffer.getvalue().encode('utf-8')
    
    minio_client.put_object(
        "jupyter-data",
        "datasets/funcionarios.csv",
        io.BytesIO(csv_data),
        len(csv_data),
        content_type="text/csv"
    )
    print("Dados salvos via cliente MinIO!")

## 7. Visualiza√ß√µes

In [None]:
# Converter estat√≠sticas para Pandas para visualiza√ß√£o
dept_stats_pandas = dept_stats.toPandas()

# Criar visualiza√ß√µes
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('An√°lise de Funcion√°rios por Departamento', fontsize=16)

# Gr√°fico 1: N√∫mero de funcion√°rios por departamento
axes[0, 0].bar(dept_stats_pandas['departamento'], dept_stats_pandas['total_funcionarios'])
axes[0, 0].set_title('Funcion√°rios por Departamento')
axes[0, 0].set_xlabel('Departamento')
axes[0, 0].set_ylabel('N√∫mero de Funcion√°rios')
axes[0, 0].tick_params(axis='x', rotation=45)

# Gr√°fico 2: Sal√°rio m√©dio por departamento
axes[0, 1].bar(dept_stats_pandas['departamento'], dept_stats_pandas['salario_medio'])
axes[0, 1].set_title('Sal√°rio M√©dio por Departamento')
axes[0, 1].set_xlabel('Departamento')
axes[0, 1].set_ylabel('Sal√°rio M√©dio (R$)')
axes[0, 1].tick_params(axis='x', rotation=45)

# Gr√°fico 3: Idade m√©dia por departamento
axes[1, 0].bar(dept_stats_pandas['departamento'], dept_stats_pandas['idade_media'])
axes[1, 0].set_title('Idade M√©dia por Departamento')
axes[1, 0].set_xlabel('Departamento')
axes[1, 0].set_ylabel('Idade M√©dia (anos)')
axes[1, 0].tick_params(axis='x', rotation=45)

# Gr√°fico 4: Satisfa√ß√£o m√©dia por departamento
axes[1, 1].bar(dept_stats_pandas['departamento'], dept_stats_pandas['satisfacao_media'])
axes[1, 1].set_title('Satisfa√ß√£o M√©dia por Departamento')
axes[1, 1].set_xlabel('Departamento')
axes[1, 1].set_ylabel('Satisfa√ß√£o M√©dia (1-5)')
axes[1, 1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

## 8. An√°lise de Correla√ß√£o

In [None]:
# An√°lise de correla√ß√£o
numeric_columns = ['idade', 'salario', 'experiencia', 'satisfacao']
correlation_matrix = df_pandas[numeric_columns].corr()

# Heatmap de correla√ß√£o
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0,
            square=True, linewidths=0.5)
plt.title('Matriz de Correla√ß√£o')
plt.show()

# Scatter plot: Experi√™ncia vs Sal√°rio
plt.figure(figsize=(10, 6))
scatter = plt.scatter(df_pandas['experiencia'], df_pandas['salario'], 
                     c=df_pandas['satisfacao'], cmap='viridis', alpha=0.6)
plt.colorbar(scatter, label='Satisfa√ß√£o')
plt.xlabel('Experi√™ncia (anos)')
plt.ylabel('Sal√°rio (R$)')
plt.title('Rela√ß√£o entre Experi√™ncia, Sal√°rio e Satisfa√ß√£o')
plt.show()

## 9. Limpeza e Finaliza√ß√£o

In [None]:
# Finalizar sess√£o Spark
spark.stop()
print("Sess√£o Spark finalizada.")

print("\n=== Resumo da Execu√ß√£o ===")
print(f"‚úÖ Dados processados: {len(df_pandas)} registros")
print(f"‚úÖ Departamentos analisados: {df_pandas['departamento'].nunique()}")
print(f"‚úÖ Visualiza√ß√µes criadas: 6 gr√°ficos")
print(f"‚úÖ Dados salvos no MinIO")
print(f"\nüîó Acesse os outros servi√ßos:")
print(f"   ‚Ä¢ Airflow: http://localhost:8080 (admin/admin)")
print(f"   ‚Ä¢ Spark UI: http://localhost:8081")
print(f"   ‚Ä¢ MinIO: http://localhost:9001 (minioadmin/minioadmin123)")
print(f"   ‚Ä¢ Jenkins: http://localhost:8082 (admin/admin)")
print(f"   ‚Ä¢ Flower (Celery): http://localhost:5555")