
#Introdução ao Apache Spark

Apache Spark é um framework de processamento de dados de código aberto projetado para ser rápido, fácil de usar e geral. Ele permite o processamento distribuído de grandes volumes de dados em clusters, suportando tarefas de processamento em lote e em tempo real. Spark é amplamente utilizado para análise de dados, aprendizado de máquina, processamento de streaming e muito mais.


#### História e Marcos do Apache Spark
*  2009: Desenvolvido originalmente na UC Berkeley’s AMPLab.
*  2010: Spark foi open-sourced.
*  2014: Tornou-se um projeto de alto nível da Apache Software Foundation.
*  2016: Spark 2.0 lançado, introduzindo DataFrames e Spark SQL.
*  2020: Spark 3.0 lançado, trazendo melhorias significativas em desempenho e novas APIs.
*  2024: Spark continua evoluindo, com foco em otimizações de desempenho e integração com tecnologias emergentes.

#Fundamentos do Apache Spark
#### RDDs (Resilient Distributed Datasets)

RDDs são a abstração fundamental no Spark, representando uma coleção distribuída de elementos que podem ser processados em paralelo. Eles são imutáveis e tolerantes a falhas.

##### Características:

* Imutabilidade: Uma vez criado, um RDD não pode ser alterado.
* Tolerância a falhas: Spark reconstrói automaticamente partes perdidas dos RDDs.
* Transformações e Ações: Operações que transformam os dados ou coletam resultados.

#### RDDs (Resilient Distributed Datasets)

#### **DataFrames**

DataFrames são uma abstração mais estruturada sobre RDDs, semelhantes a tabelas em bancos de dados relacionais. Eles permitem operações otimizadas e integração com Spark SQL.

**Vantagens:**

* Performance: Otimizações como o Catalyst Optimizer melhoram o desempenho.
* Facilidade de Uso: APIs mais amigáveis para manipulação de dados estruturados.
* Integração: Compatível com diversas fontes de dados e ferramentas de BI.


** Datasets Utilizados **

https://www.kaggle.com/datasets/shubhambathwal/flight-price-prediction

In [1]:
# Instalar PySpark via pip
!pip install pyspark



In [3]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("christofferms/pokemon-with-stats-and-image")

print("Path to dataset files:", path)

Path to dataset files: /root/.cache/kagglehub/datasets/christofferms/pokemon-with-stats-and-image/versions/1


In [4]:
# Iniciar uma Sessão Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ExemploPySpark") \
    .getOrCreate()

**Comparando métodos do pandas x métodos do PySpark**

Leitura de Dados:

In [5]:
import pandas as pd

df_pandas = pd.read_csv("/content/pokedex.csv")

In [6]:
df_pandas.head(4)

Unnamed: 0,Image,Index,Name,Type 1,Type 2,Total,HP,Attack,Defense,SP. Atk.,SP. Def,Speed
0,images/1.png,1,Bulbasaur,Grass,Poison,318,45,49,49,65,65,45
1,images/2.png,2,Ivysaur,Grass,Poison,405,60,62,63,80,80,60
2,images/3.png,3,Venusaur,Grass,Poison,525,80,82,83,100,100,80
3,images/4.png,3,Venusaur Mega Venusaur,Grass,Poison,625,80,100,123,122,120,80


In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ExemploLeitura").getOrCreate()

# Leitura de um arquivo CSV
df_pyspark = spark.read.csv("/content/pokedex.csv", header=True, inferSchema=True)

In [8]:
#Nomes das colunas
for i in df_pandas.columns:
  print(i)

Image
Index
Name
Type 1
Type 2
Total
HP
Attack
Defense
SP. Atk.
SP. Def
Speed


Seleção de Colunas:

In [10]:
# Seleciona colunas específicas - Pandas
df_selecionado_pandas = df_pandas[['Name', 'Type 1','HP', 'Attack']]

In [11]:
# Seleciona colunas específicas - PySpark
df_selecionado_spark = df_pyspark.select('Name', 'Type 1','HP', 'Attack')

Filtragem de Dados:

In [13]:
# Filtra linhas onde HP > 5 - Pandas
df_filtrado_pandas = df_pandas[df_pandas['HP'] > 5]

In [14]:
# Filtra linhas onde HP > 5 - PySpark
df_filtrado_spark = df_pyspark.filter(df_pyspark.HP > 5)

Agregações:

In [15]:
# Calcula a média de HP agrupada por Attack - Pandas
df_agregado_pandas = df_pandas.groupby('HP')['Attack'].mean().reset_index()

In [16]:
# Calcula a média de HP agrupada por Attack - PySpark
from pyspark.sql.functions import avg

# Calcula a média de coluna1 agrupada por coluna2
df_agregado_spark = df_pyspark.groupBy("HP").agg(avg("Attack").alias("media_days_left"))


In [17]:
df_selecionado_spark.show(5)
df_filtrado_spark.show(5)
df_agregado_spark.show(5)

+--------------------+------+---+------+
|                Name|Type 1| HP|Attack|
+--------------------+------+---+------+
|           Bulbasaur| Grass| 45|    49|
|             Ivysaur| Grass| 60|    62|
|            Venusaur| Grass| 80|    82|
|Venusaur Mega Ven...| Grass| 80|   100|
|          Charmander|  Fire| 39|    52|
+--------------------+------+---+------+
only showing top 5 rows

+------------+-----+--------------------+------+------+-----+---+------+-------+--------+-------+-----+
|       Image|Index|                Name|Type 1|Type 2|Total| HP|Attack|Defense|SP. Atk.|SP. Def|Speed|
+------------+-----+--------------------+------+------+-----+---+------+-------+--------+-------+-----+
|images/1.png|    1|           Bulbasaur| Grass|Poison|  318| 45|    49|     49|      65|     65|   45|
|images/2.png|    2|             Ivysaur| Grass|Poison|  405| 60|    62|     63|      80|     80|   60|
|images/3.png|    3|            Venusaur| Grass|Poison|  525| 80|    82|     83|     1

Junções (Joins):

In [21]:
# Junção de dois DataFrames pandas
df1 = pd.read_csv("/content/Pokemon.csv")
df2 = pd.read_csv("/content/complete_pokemon.csv")
df2 = df2.rename(columns={'name': 'Name'}) # renomeando a coluna, pq ela tinha o nome com N minusculo.
df_juncao = pd.merge(df1, df2, on="Name", how="left")

In [22]:
# Junção de dois DataFrames PySpark
df1 = spark.read.csv("/content/Pokemon.csv", header=True, inferSchema=True)
df2 = spark.read.csv("/content/complete_pokemon.csv", header=True, inferSchema=True)
df_juncao = df1.join(df2, on="Name", how="left")

Criação de Novas Colunas:

In [26]:
# Cria uma nova coluna 'nova_coluna' como a soma de 'Name' e 'Name'
df_pandas['nova_coluna'] = df_pandas['Name'] + df_pandas['Name']

In [27]:
from pyspark.sql.functions import col

# Cria uma nova coluna 'nova_coluna' como a soma de 'coluna1' e 'coluna2'
df_pyspark = df_pyspark.withColumn("nova_coluna", col("Name") + col("Name"))


Ordenação de Dados:

In [28]:
# Ordena o DataFrame pelo 'coluna1' em ordem descendente
df_pandas_ordenado = df_pandas.sort_values(by='Name', ascending=False)

In [31]:
print(df_pandas_ordenado.head(5))

              Image  Index                    Name  Type 1  Type 2  Total  \
865  images/866.png    718  Zygarde Complete Forme  Dragon  Ground    708   
863  images/864.png    718       Zygarde 50% Forme  Dragon  Ground    600   
864  images/865.png    718       Zygarde 10% Forme  Dragon  Ground    486   
760  images/761.png    634                Zweilous    Dark  Dragon    420   
54    images/55.png     41                   Zubat  Poison  Flying    245   

      HP  Attack  Defense  SP. Atk.  SP. Def  Speed  \
865  216     100      121        91       95     85   
863  108     100      121        81       95     95   
864   54     100       71        61       85    115   
760   72      85       70        65       70     58   
54    40      45       35        30       40     55   

                                      nova_coluna  
865  Zygarde Complete FormeZygarde Complete Forme  
863            Zygarde 50% FormeZygarde 50% Forme  
864            Zygarde 10% FormeZygarde 10% Forme 

In [30]:
# Ordena o DataFrame pelo 'coluna1' em ordem descendente
df_pyspark_ordenado = df_pyspark.orderBy(df_pyspark.Name.desc())

In [32]:
print(df_pyspark_ordenado.show(5))

+--------------+-----+--------------------+------+------+-----+---+------+-------+--------+-------+-----+-----------+
|         Image|Index|                Name|Type 1|Type 2|Total| HP|Attack|Defense|SP. Atk.|SP. Def|Speed|nova_coluna|
+--------------+-----+--------------------+------+------+-----+---+------+-------+--------+-------+-----+-----------+
|images/866.png|  718|Zygarde Complete ...|Dragon|Ground|  708|216|   100|    121|      91|     95|   85|       NULL|
|images/864.png|  718|   Zygarde 50% Forme|Dragon|Ground|  600|108|   100|    121|      81|     95|   95|       NULL|
|images/865.png|  718|   Zygarde 10% Forme|Dragon|Ground|  486| 54|   100|     71|      61|     85|  115|       NULL|
|images/761.png|  634|            Zweilous|  Dark|Dragon|  420| 72|    85|     70|      65|     70|   58|       NULL|
| images/55.png|   41|               Zubat|Poison|Flying|  245| 40|    45|     35|      30|     40|   55|       NULL|
+--------------+-----+--------------------+------+------