
#Introdução ao Apache Spark

Apache Spark é um framework de processamento de dados de código aberto projetado para ser rápido, fácil de usar e geral. Ele permite o processamento distribuído de grandes volumes de dados em clusters, suportando tarefas de processamento em lote e em tempo real. Spark é amplamente utilizado para análise de dados, aprendizado de máquina, processamento de streaming e muito mais.


#### História e Marcos do Apache Spark
*  2009: Desenvolvido originalmente na UC Berkeley’s AMPLab.
*  2010: Spark foi open-sourced.
*  2014: Tornou-se um projeto de alto nível da Apache Software Foundation.
*  2016: Spark 2.0 lançado, introduzindo DataFrames e Spark SQL.
*  2020: Spark 3.0 lançado, trazendo melhorias significativas em desempenho e novas APIs.
*  2024: Spark continua evoluindo, com foco em otimizações de desempenho e integração com tecnologias emergentes.

#Fundamentos do Apache Spark
#### RDDs (Resilient Distributed Datasets)

RDDs são a abstração fundamental no Spark, representando uma coleção distribuída de elementos que podem ser processados em paralelo. Eles são imutáveis e tolerantes a falhas.

##### Características:

* Imutabilidade: Uma vez criado, um RDD não pode ser alterado.
* Tolerância a falhas: Spark reconstrói automaticamente partes perdidas dos RDDs.
* Transformações e Ações: Operações que transformam os dados ou coletam resultados.

#### RDDs (Resilient Distributed Datasets)

#### **DataFrames**

DataFrames são uma abstração mais estruturada sobre RDDs, semelhantes a tabelas em bancos de dados relacionais. Eles permitem operações otimizadas e integração com Spark SQL.

**Vantagens:**

* Performance: Otimizações como o Catalyst Optimizer melhoram o desempenho.
* Facilidade de Uso: APIs mais amigáveis para manipulação de dados estruturados.
* Integração: Compatível com diversas fontes de dados e ferramentas de BI.


** Datasets Utilizados **

https://www.kaggle.com/datasets/shubhambathwal/flight-price-prediction

In [None]:
# Instalar PySpark via pip
!pip install pyspark



In [21]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("saeedsarrafzadeh/notable-ai-models-2024")

print("Path to dataset files:", path)

path_lol = kagglehub.dataset_download("derrickdaniel/klebsiella-meropenem-amr-genomic-data")

print("Path to dataset files:", path_lol)

Downloading from https://www.kaggle.com/api/v1/datasets/download/saeedsarrafzadeh/notable-ai-models-2024?dataset_version_number=1...


100%|██████████| 520k/520k [00:00<00:00, 53.2MB/s]

Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/saeedsarrafzadeh/notable-ai-models-2024/versions/1





Downloading from https://www.kaggle.com/api/v1/datasets/download/derrickdaniel/klebsiella-meropenem-amr-genomic-data?dataset_version_number=1...


100%|██████████| 150k/150k [00:00<00:00, 47.5MB/s]

Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/derrickdaniel/klebsiella-meropenem-amr-genomic-data/versions/1





In [None]:
# Iniciar uma Sessão Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ExemploPySpark") \
    .getOrCreate()

**Comparando métodos do pandas x métodos do PySpark**

Leitura de Dados:

In [27]:
import pandas as pd
df_pandas = pd.read_csv("/root/.cache/kagglehub/datasets/saeedsarrafzadeh/notable-ai-models-2024/versions/1/notable_ai_models.csv")

In [28]:
df_pandas.head(4)

Unnamed: 0,System,Domain,Organization,Organization categorization,Country (from Organization),Authors,Publication date,Reference,Link,Citations,...,Confidence,Abstract,Model accessibility,Base model,Finetune compute (FLOP),Finetune compute notes,Batch size,Batch size notes,Frontier model,Training power draw (W)
0,Qwen2.5-72B,Language,Alibaba,Industry,China,,2024-09-19,Qwen2.5: A Party of Foundation Models!,https://qwenlm.github.io/blog/qwen2.5/,,...,Confident,In the past three months since Qwen2’s release...,Open access (unrestricted),,,,,,,
1,Table Tennis Agent,Robotics,Google DeepMind,Industry,Multinational,"David B. D'Ambrosio, Saminda Abeyruwan, Laura ...",2024-08-07,Achieving Human Level Competitive Robot Table ...,https://deepmind.google/research/publications/...,,...,Likely,Achieving human-level speed and performance on...,Unreleased,,,,,,,
2,AFM-server,Language,Apple,Industry,United States of America,"Andy Narayanan, Aonan Zhang, Bowen Zhang, Chen...",2024-07-29,Apple Intelligence Foundation Language Models,https://machinelearning.apple.com/research/app...,,...,Confident,,Hosted access (no API),,,,18949750.0,Main pretraining uses sequence length of 4096 ...,,3466814.0
3,AFM-on-device,Language,Apple,Industry,United States of America,"Andy Narayanan, Aonan Zhang, Bowen Zhang, Chen...",2024-07-29,Apple Intelligence Foundation Language Models,https://machinelearning.apple.com/research/app...,,...,Confident,We present foundation language models develope...,Hosted access (no API),,,,18949750.0,Main pretraining uses sequence length of 4096 ...,,


In [29]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ExemploLeitura").getOrCreate()

# Leitura de um arquivo CSV
df_pyspark = spark.read.csv("/root/.cache/kagglehub/datasets/saeedsarrafzadeh/notable-ai-models-2024/versions/1/notable_ai_models.csv", header=True, inferSchema=True)

In [30]:
#Nomes das colunas
for i in df_pandas.columns:
  print(i)

System
Domain
Organization
Organization categorization
Country (from Organization)
Authors
Publication date
Reference
Link
Citations
Notability criteria
Notability criteria notes
Parameters
Parameters notes
Training compute (FLOP)
Training compute notes
Training dataset
Training dataset notes
Training dataset size (datapoints)
Dataset size notes
Epochs
Training time (hours)
Training time notes
Training hardware
Hardware quantity
Hardware utilization
Training compute cost (2023 USD)
Compute cost notes
Confidence
Abstract
Model accessibility
Base model
Finetune compute (FLOP)
Finetune compute notes
Batch size
Batch size notes
Frontier model
Training power draw (W)


Seleção de Colunas:

In [31]:
# Seleciona colunas específicas - Pandas
df_selecionado_pandas = df_pandas[['System', 'Organization','Domain']]

In [32]:
# Seleciona colunas específicas - PySpark
df_selecionado_spark = df_pyspark.select('System', 'Organization','Domain')

Filtragem de Dados:

In [36]:
df_filtrado_pandas = df_pandas[df_pandas['Training time (hours)'] > 2]


In [37]:
# Filtra linhas onde coluna1 > 50 - PySpark
df_filtrado_spark = df_pyspark.filter(df_pyspark['Training time (hours)'] > 2)

Agregações:

In [39]:
df_agregado_pandas = df_pandas.groupby('System')['Training time (hours)'].mean().reset_index()

In [40]:
from pyspark.sql.functions import avg

df_agregado_spark = df_pyspark.groupBy("System").agg(avg("Training time (hours)").alias("media_training_time"))



In [41]:
df_selecionado_spark.show(5)
df_filtrado_spark.show(5)
df_agregado_spark.show(5)

+--------------------+--------------------+--------------------+
|              System|        Organization|              Domain|
+--------------------+--------------------+--------------------+
|         Qwen2.5-72B|             Alibaba|            Language|
|6 * 72.7 billion ...|"""In terms of Qw...|Unspecified unrel...|
|  Table Tennis Agent|     Google DeepMind|            Robotics|
|""Each policy is ...|                NULL|                NULL|
|[22] following th...|                NULL|                NULL|
+--------------------+--------------------+--------------------+
only showing top 5 rows

+--------------------+--------------------+--------------------+---------------------------+---------------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------------------------+--------------------+--------------------+-----------------------+----------------------+--------------------+--------------

Junções (Joins):

In [43]:
# Junção de dois DataFrames pandas
df1 = pd.read_csv("/root/.cache/kagglehub/datasets/saeedsarrafzadeh/notable-ai-models-2024/versions/1/notable_ai_models.csv")
df2 = pd.read_csv("/root/.cache/kagglehub/datasets/derrickdaniel/klebsiella-meropenem-amr-genomic-data/versions/1/Klebsiella_Meropenem_AMR_Genomics.csv")

# Convertendo para o tipo string (object) antes da junção
df1['Authors'] = df1['Authors'].astype(str)
df2['Authors'] = df2['Authors'].astype(str)

# Realiza a junção
df_juncao = pd.merge(df1, df2, on='Authors')


In [45]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("DataFrameJoin").getOrCreate()

df1 = spark.read.csv("/root/.cache/kagglehub/datasets/saeedsarrafzadeh/notable-ai-models-2024/versions/1/notable_ai_models.csv", header=True, inferSchema=True)
df2 = spark.read.csv("/root/.cache/kagglehub/datasets/derrickdaniel/klebsiella-meropenem-amr-genomic-data/versions/1/Klebsiella_Meropenem_AMR_Genomics.csv", header=True, inferSchema=True)

df1 = df1.withColumn("Authors", df1["Authors"].cast(StringType()))
df2 = df2.withColumn("Authors", df2["Authors"].cast(StringType()))


df_juncao = df1.join(df2, on="Authors", how="inner")

df_juncao.show()


+-------+------+------+------------+---------------------------+---------------------------+----------------+---------+----+---------+-------------------+-------------------------+----------+----------------+-----------------------+----------------------+----------------+----------------------+----------------------------------+------------------+------+---------------------+-------------------+-----------------+-----------------+--------------------+--------------------------------+------------------+----------+--------+-------------------+----------+-----------------------+----------------------+----------+----------------+--------------+-----------------------+---------+----------+----------+---------+---------+-----------+----------+-----------+-----------+-------------+------------+-----------+-----------+----------+----------+-----------+------------+------------+-----------+----------+----------+-----------+---------+---------+-----+------+------+-------+---------+--------+-----

Criação de Novas Colunas:

In [47]:
# Cria uma nova coluna 'nova_coluna' como a soma de 'coluna1' e 'coluna2'
df_pandas['nova_coluna'] = df_pandas['coluna1'] + df_pandas['coluna2']

KeyError: 'coluna1'

In [None]:
from pyspark.sql.functions import col

# Cria uma nova coluna 'nova_coluna' como a soma de 'coluna1' e 'coluna2'
df_pyspark = df_pyspark.withColumn("nova_coluna", col("coluna1") + col("coluna2"))


Ordenação de Dados:

In [None]:
# Ordena o DataFrame pelo 'coluna1' em ordem descendente
df_pandas_ordenado = df_pandas.sort_values(by='coluna1', ascending=False)

In [None]:
# Ordena o DataFrame pelo 'coluna1' em ordem descendente
df_pyspark_ordenado = df_pyspark.orderBy(df_pyspark.coluna1.desc())