# Introdução ao Spark e Computação Distribuída

O Apache Spark é um sistema de computação distribuída de código aberto, projetado para lidar com o processamento de grandes volumes de dados de forma eficiente.

Comparado a ferramentas tradicionais como o Pandas, o Spark é muito mais escalável para conjuntos de dados grandes que não cabem na memória de uma única máquina.

## Pandas vs Spark

O Pandas opera na memória (in-memory), o que significa que ele armazena e processa dados em uma única máquina.
Isso funciona bem para conjuntos de dados pequenos ou médios, mas pode levar a limitações de memória ao lidar com grandes volumes de dados que não cabem na memória.

O Spark, por outro lado, foi projetado para computação distribuída. Ele divide grandes conjuntos de dados em pedaços chamados partições,
e processa essas partições em paralelo em múltiplos nós de um cluster. Essa arquitetura distribuída permite que o Spark lide com grandes volumes de dados de forma eficiente.

# Visualizando as diferenças:
- Pandas --> Processamento em Memória, em uma única máquina.
- Spark   --> Processamento Paralelo, Distribuído em um Cluster.

In [0]:
from sklearn.datasets import load_iris
import pandas as pd
from pyspark.sql import SparkSession

# Carregar o conjunto de dados Iris como um DataFrame Pandas
iris = load_iris(as_frame=True)
iris_df = iris.frame

# Converter o DataFrame Pandas para um DataFrame Spark
spark_df = spark.createDataFrame(iris_df)
spark_df



### Avaliação Preguiçosa (Lazy Evaluation)
- Uma das otimizações de desempenho chave do Spark é a avaliação preguiçosa (lazy evaluation). Quando você define uma transformação em um DataFrame ou RDD do Spark, o Spark não executa imediatamente a operação.
- Em vez disso, ele constrói uma DAG de transformações e só as executa quando uma ação (por exemplo, `count()`, `collect()`, `show()`) é chamada.
- Isso permite que o Spark otimize todo o plano de execução antes de rodá-lo

In [0]:
from pyspark.sql.functions import col

# ---- Transformações preguiçosas no Spark ----
# Aplicar um filtro no Spark (nenhuma computação acontece ainda)
spark_lazy_transformation = spark_df.filter(col('sepal length (cm)') > 5.0)  # Nenhuma execução

In [0]:
# Somente quando uma ação é chamada (por exemplo, count ou show), o Spark realiza o trabalho
spark_lazy_transformation.count() # Ação que dispara a execução

Podemos observar através do método ```.explain()``` o plano de execução que é criado

In [0]:

from pyspark.sql.functions import col
# ---- Otimização no Spark ----
# Aplicar um filtro e depois agrupar, com plano de execução otimizado
processed_spark_df = (
    spark_df
    .filter(col('sepal length (cm)') > 5.0)  # Filtragem antecipada para reduzir dados
    .groupBy('target')
    .agg({"sepal length (cm)": "avg"})  # Agrupar e calcular a média
)

# Exibir o plano de execução otimizado pelo Catalyst
print("Plano de execução otimizado pelo Catalyst:")
processed_spark_df.explain(True)

%md
### Explicação do Plano de Execução no Spark

#### **1. Parsed Logical Plan**
- Este é o **plano lógico inicial**, representado antes da resolução de nomes e tipos:
  - Define um **agrupamento** por `target`.
  - Inclui uma **agregação não resolvida** para a média de `sepal length (cm)` (`unresolvedalias('avg(sepal length (cm)#32)`).
  - Mostra o **filtro** `sepal length (cm) > 5.0`.
  - A entrada é um `LocalRelation`, que contém as colunas disponíveis: `sepal length (cm)`, `sepal width (cm)`, `petal length (cm)`, `petal width (cm)`, e `target`.

#### **2. Analyzed Logical Plan**
- Após a resolução de colunas e tipos de dados:
  - `avg(sepal length (cm))` é resolvido como um alias para a agregação `avg(sepal length (cm)#32)`.
  - As colunas envolvidas são:
    - `target` (bigint).
    - `avg(sepal length (cm))` (double).
  - Mantém o **filtro** `sepal length (cm) > 5.0` antes do **agrupamento**.

#### **3. Optimized Logical Plan**
- Após as otimizações do Catalyst:
  - Remove colunas irrelevantes do plano (`sepal width (cm)`, `petal length (cm)`, `petal width (cm)`), mantendo apenas `sepal length (cm)` e `target`.
  - Aplica o **filtro** antes do agrupamento, reduzindo o volume de dados processados.
  - A entrada permanece como um `LocalRelation`, indicando que os dados são locais e já carregados na memória.

#### **4. Physical Plan**
- Este é o plano executável no cluster Spark:
  - **AdaptiveSparkPlan**:
    - Indica que a execução adaptativa está habilitada, ajustando o plano físico durante a execução para otimização.
  - **HashAggregate**:
    - Divide o cálculo da média em duas etapas:
      - **Agregação Parcial**: Calcula soma (`sum`) e contagem (`count`) localmente em cada partição.
      - **Agregação Final**: Mescla os resultados das partições para calcular a média final.
  - **Exchange**:
    - Redistribui os dados por hash com base na coluna `target`, garantindo que os dados com o mesmo valor de `target` fiquem na mesma partição.
  - **LocalTableScan**:
    - Lê apenas as colunas necessárias (`sepal length (cm)` e `target`) diretamente da relação local.



- No Pandas, as operações são executadas imediatamente, enquanto no Spark, a execução é adiada até que uma ação seja chamada.

---

## Comparações Pandas e Spark DataFrames


### Importar Bibliotecas Necessárias

In [0]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Inicializar a sessão Spark
spark = SparkSession.builder.appName("PandasxSpark").getOrCreate()

### Carregar Dados diretamente com Spark

In [0]:
# Salvar o dataset Iris como um arquivo CSV temporário no Databricks
dbutils.fs.mkdirs("/tmp/iris_data")  # Criar diretório temporário no DBFS
iris_df.to_csv("/dbfs/tmp/iris_data/iris.csv", index=False)  # Salvar o CSV no DBFS


In [0]:
# Carregar o CSV diretamente como um DataFrame Spark
spark_iris_from_csv = spark.read.csv("/tmp/iris_data/iris.csv", header=True, inferSchema=True)
spark_iris_from_csv.show(5)

### Seleção de Colunas

In [0]:
# ---- Pandas ----
# Selecionar colunas 'sepal length (cm)' e 'sepal width (cm)' usando Pandas
pandas_selected = iris_df[['sepal length (cm)', 'sepal width (cm)']]
print("Pandas - Colunas Selecionadas:")
pandas_selected.head() # Exibir as primeiras 5 linhas


In [0]:
# ---- Spark ----
# Selecionar colunas 'sepal length (cm)' e 'sepal width (cm)' usando Spark
spark_selected = spark_df.select('sepal length (cm)', 'sepal width (cm)')
print("Spark - Colunas Selecionadas:")
spark_selected.show(5)  # Exibir as primeiras 5 linhas

### Filtrar Linhas

In [0]:
# Filtrar linhas onde 'sepal length (cm)' > 5.0
pandas_filtered = iris_df[iris_df['sepal length (cm)'] > 5.0]
print("Pandas - Linhas Filtradas (sepal length (cm) > 5.0):")
pandas_filtered.head()  # Exibir as primeiras 5 linhas

In [0]:
# ---- Spark ----
# Filtrar linhas onde 'sepal length (cm)' > 5.0
spark_filtered = spark_df.filter(col('sepal length (cm)') > 5.0)
print("Spark - Linhas Filtradas (sepal length (cm) > 5.0):")
spark_filtered.show(5)  # Exibir as primeiras 5 linhas

### Agrupamento e Agregação

In [0]:
# ---- Pandas ----
# Agrupar pelo target e calcular a média de 'sepal length (cm)'
pandas_grouped = iris_df.groupby('target').agg(mean_sepal_length=('sepal length (cm)', 'mean'))
pandas_grouped.head(2)

In [0]:
# ---- Spark ----
# Agrupar pelo target e calcular a média de 'sepal length (cm)'
spark_grouped = spark_df.groupBy('target').agg({"sepal length (cm)": "avg"})
spark_grouped.show(2)

### Adicionar Novas Colunas

In [0]:
from pyspark.sql.functions import col

# ---- Pandas ----
# Adicionar uma nova coluna que calcula a razão entre 'sepal length (cm)' e 'sepal width (cm)'
iris_df['sepal_ratio'] = iris_df['sepal length (cm)'] / iris_df['sepal width (cm)']
print("Pandas - Nova Coluna (sepal_ratio):")
iris_df[['sepal length (cm)', 'sepal width (cm)', 'sepal_ratio']].head(2)

In [0]:
# ---- Spark ----
# Adicionar uma nova coluna que calcula a razão entre 'sepal length (cm)' e 'sepal width (cm)'
spark_df = spark_df.withColumn('sepal_ratio', col('sepal length (cm)') / col('sepal width (cm)'))

# Mostrar que a avaliação é preguiçosa no Spark
print("Spark - Nova Coluna (sepal_ratio):")
spark_df.select('sepal length (cm)', 'sepal width (cm)', 'sepal_ratio').show(2)

### Converter Spark DataFrame para Pandas

In [0]:
# Converter DataFrame Spark para Pandas
pandas_from_spark = spark_df.toPandas()

pandas_from_spark.head(2)

### Converter Pandas DataFrame para Spark

In [0]:
# Criar DataFrame Spark
spark_from_pandas = spark.createDataFrame(iris_df)
spark_from_pandas.show(2)

- O Pandas é excelente para conjuntos de dados pequenos e manipulações rápidas.

- O Spark é essencial para operações em escala, lidando com grandes conjuntos de dados distribuídos por clusters.


---