<a href="https://colab.research.google.com/github/Rogerio-mack/data-engineering/blob/main/how_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Apache Spark

O Apache Spark é uma ferramenta de processamento de dados em grande escala para executar análises complexas de forma eficiente distribuindo o processamento em clusters de computadores.

O Spark mantém os dados para processamento em memória, mas pode se conectar a vários tipos de fontes de dados, incluindo:

* Sistemas de arquivos: HDFS, S3, local
* Bancos de dados relacionais: MySQL, PostgreSQL, Oracle
* Bancos de dados NoSQL: Cassandra, MongoDB
* Formatos de dados: CSV, JSON, Parquet, Avro
* Streaming: Kafka, Flume

Para isso o Spark utiliza conectores específicos para cada tipo de dado. Esses conectores são responsáveis por ler os dados da fonte e transformá-los em RDDs (Resilient Distributed Datasets) ou DataFrames, as estruturas de dados básicas do Spark.

Os RDDs podem ser particionados em diferentes "executers" do Spark (diferentes máquinas) que mantem um processo principal ("driver") para coordenar a execução dos diferentes nós. Essas configurações são feitas através de ferramentas como o Spark Submit ou através gerenciadores de cluster como YARN ou Mesos.



# PySpark

O PySpark é, basicamente, uma interface do Apache Spark para a linguagem Python. Ele oferece acesso a quase todas as funcionalidades do Spark, incluindo o Spark SQL, suporte a dataframes Spark e RDDs, conexão com diferentes bases de dados e configurações (interface para interagir com o YARN) etc.

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=02d346793fc9668db8d028d285773af96cc0cfcc6725297f2778837887c9a293
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
!wget https://files.grouplens.org/datasets/movielens/ml-20m.zip


--2024-09-11 01:25:51--  https://files.grouplens.org/datasets/movielens/ml-20m.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 198702078 (189M) [application/zip]
Saving to: ‘ml-20m.zip’


2024-09-11 01:25:52 (142 MB/s) - ‘ml-20m.zip’ saved [198702078/198702078]



In [None]:
!unzip ml-20m.zip

Archive:  ml-20m.zip
   creating: ml-20m/
  inflating: ml-20m/genome-scores.csv  
  inflating: ml-20m/genome-tags.csv  
  inflating: ml-20m/links.csv        
  inflating: ml-20m/movies.csv       
  inflating: ml-20m/ratings.csv      
  inflating: ml-20m/README.txt       
  inflating: ml-20m/tags.csv         


In [None]:
import pandas as pd

df = pd.read_csv('/content/ml-20m/movies.csv')
df.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


## Iniciando uma sessão do `Pyspark`

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
# spark = SparkSession.builder.appName("MyAppName").getOrCreate()

## Lendo um `dataframe` `Pyspark`

In [None]:
movies_with_genres_df = spark.read.csv("/content/ml-20m/movies.csv", header=True, inferSchema=True)
movies_with_genres_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



## Schemas `Pyspark`

In [None]:
from pyspark.sql.types import *
#working only on movies.csv right now
movies_with_genres_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType()),
   StructField('genres',StringType())]
  )

movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
  ) #dropping

In [None]:
movies_with_genres_df = spark.read.csv("/content/ml-20m/movies.csv", header=True, schema=movies_with_genres_df_schema)
movies_with_genres_df.show(5)

+---+--------------------+--------------------+
| ID|               title|              genres|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|Adventure|Animati...|
|  2|      Jumanji (1995)|Adventure|Childre...|
|  3|Grumpier Old Men ...|      Comedy|Romance|
|  4|Waiting to Exhale...|Comedy|Drama|Romance|
|  5|Father of the Bri...|              Comedy|
+---+--------------------+--------------------+
only showing top 5 rows



In [None]:
movies_df = spark.read.csv("/content/ml-20m/movies.csv", header=True, schema=movies_df_schema)
movies_df.show(5)

+---+--------------------+
| ID|               title|
+---+--------------------+
|  1|    Toy Story (1995)|
|  2|      Jumanji (1995)|
|  3|Grumpier Old Men ...|
|  4|Waiting to Exhale...|
|  5|Father of the Bri...|
+---+--------------------+
only showing top 5 rows



In [None]:
movies_df.printSchema()
movies_with_genres_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- title: string (nullable = true)

root
 |-- ID: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



# De Para: `Pandas` 2 `PySpark`

In [None]:
movies_with_genres_df.columns

['ID', 'title', 'genres']

In [None]:
movies_with_genres_df.count()

(27278, 1, StorageLevel(False, False, False, False, 1))

 | Operação                      | Pandas                          | PySpark                                                                   | Observações                                                               |
|-------------------------------|---------------------------------|---------------------------------------------------------------------------|---------------------------------------------------------------------------|
| Criação                       | pd.DataFrame(data)              | spark.createDataFrame(data)                                               | data pode ser uma lista de listas, um dicionário ou um Pandas DataFrame.  |
| Verificar shape               | df.shape                        | df.count(), len(df.columns)                                               | Retorna (número de linhas, número de colunas).                            |
| Verificar valores ausentes    | df.isnull().sum()               | df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show() | Conta valores ausentes por coluna.                                        |
| Seleção de Colunas            | df['coluna']                    | df['coluna'] ou df.col('coluna')                                          | Sintaxe similar, mas PySpark DataFrame é imutável.                        |
| Seleção de Linhas por rótulo  | df.loc[rótulo]                  | Não diretamente análogo. Use filter ou where para condições.              | PySpark não tem um índice como o Pandas.                                  |
| Seleção de Linhas por posição | df.iloc[índice]                 | Não diretamente análogo. Use take para selecionar linhas por posição.     |                                                                           |
| Filtragem                     | df[df['coluna'] > valor]        | df.filter(df['coluna'] > valor)                                           | A filtragem em PySpark é feita com base em condições.                     |
| Agrupamento                   | df.groupby('coluna').mean()     | df.groupBy('coluna').mean()                                               | Sintaxe similar, mas o PySpark DataFrame é distribuído.                   |
| Agregação                     | df.sum(), df.mean(), df.count() | df.agg({'coluna': 'sum'}), df.agg({'coluna': 'mean'}), df.count()         | O PySpark oferece mais flexibilidade para combinar diferentes agregações. |

In [None]:
# Selecionando apenas as colunas "ID" e "title"
df_selected = movies_with_year_df.select("ID", "title")

df_selected.show(5)

+---+--------------------+
| ID|               title|
+---+--------------------+
|  1|    Toy Story (1995)|
|  2|      Jumanji (1995)|
|  3|Grumpier Old Men ...|
|  4|Waiting to Exhale...|
|  5|Father of the Bri...|
+---+--------------------+
only showing top 5 rows



In [None]:
df_filtered = movies_with_year_df.filter(movies_with_year_df.year == 1990)

df_filtered.show(5)

+---+--------------------+----+
| ID|               title|year|
+---+--------------------+----+
|586|   Home Alone (1990)|1990|
|587|        Ghost (1990)|1990|
|590|Dances with Wolve...|1990|
|597| Pretty Woman (1990)|1990|
|625|  Asfour Stah (1990)|1990|
+---+--------------------+----+
only showing top 5 rows



In [None]:
df_filtered = movies_with_year_df.filter(movies_with_year_df.year == 1990).select("ID", "title")

df_filtered.show(5)

+---+--------------------+
| ID|               title|
+---+--------------------+
|586|   Home Alone (1990)|
|587|        Ghost (1990)|
|590|Dances with Wolve...|
|597| Pretty Woman (1990)|
|625|  Asfour Stah (1990)|
+---+--------------------+
only showing top 5 rows



In [None]:

df_filtered = movies_with_year_df.where(movies_with_year_df.ID > 500)

df_filtered.show(5)

+---+--------------------+----+
| ID|               title|year|
+---+--------------------+----+
|501|        Naked (1993)|1993|
|502|Next Karate Kid, ...|1994|
|503| New Age, The (1994)|1994|
|504|    No Escape (1994)|1994|
|505|        North (1994)|1994|
+---+--------------------+----+
only showing top 5 rows



# RDD

**RDD (Resilient Distributed Dataset)**  é a unidade fundamental de dados no Spark. Essencialmente, um RDD é uma coleção imutável e distribuída de elementos que podem ser processados em paralelo em um cluster. Pense nele como uma lista gigante que pode ser dividida em pedaços e processados em diferentes máquinas.

In [None]:
movies_with_genres_df.rdd.getNumPartitions(), movies_with_genres_df.rdd.getStorageLevel()

(1, StorageLevel(False, False, False, False, 1))

A API do DataFrame é mais fácil de empregar, mas não oferece todo o potencial do Spark. Ao converter um DataFrame em um RDD, você ganha acesso a um conjunto mais amplo de operações que podem não estar diretamente disponíveis na API de DataFrames. Essas operações geralmente envolvem transformações mais complexas funções de hash, operações com pares chave-valor (ReduceKey e CombineByKey), operações eficientes de conjuntos (distinct, union) etc. No entanto, é importante lembrar que as modificações feitas em um afetarão o outro, pois ambos referenciam o mesmo conjunto de dados.

In [None]:
# Criando um DataFrame
df = spark.createDataFrame([(1, 5), (2, 7)], ["id", "value"])
df.show()

# Convertendo para RDD
rdd = df.rdd

# Aplicando uma transformação no RDD
rdd_squared = rdd.map(lambda x: (x[0], x[1] * x[1]))

# Convertendo o RDD de volta para DataFrame
df_squared = rdd_squared.toDF(["id", "value_squared"])

df_squared.show()

+---+-----+
| id|value|
+---+-----+
|  1|    5|
|  2|    7|
+---+-----+

+---+-------------+
| id|value_squared|
+---+-------------+
|  1|           25|
|  2|           49|
+---+-------------+



## Persistência

In [None]:
from pyspark import StorageLevel

df = spark.range(0, 10)

# Persistir o DataFrame em memória
df.cache()

# Contando o número de linhas
count = df.count()
print(count)

# Persistir com um nível de persistência específico
df.persist(StorageLevel.DISK_ONLY)

10


DataFrame[id: bigint]

Embora o pyspark.StorageLevel.DISK_ONLY indique que os dados serão armazenados em disco, a localização exata desse arquivo não é fixa e pode variar. O mesmo pode ser aplicado aos rdds.

In [None]:
df.unpersist()

# Particionamento

In [None]:
# DataFrame de exemplo
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
df = spark.createDataFrame(data, ["id", "name", "age"])

# Salvando o DataFrame em um bucket do Google Cloud Storage, particionado pela coluna "age"
# df.write.partitionBy("age").parquet("gs://meu-bucket/dados_particionados")

# Salvando local
df.write.partitionBy("age").parquet("/content/meu-bucket/dados_particionados")

In [None]:
!ls /content/meu-bucket/dados_particionados

'age=25'  'age=30'  'age=35'   _SUCCESS


O Parquet é um formato de arquivo de dados colunares! Mas poderíamos ter empregado .csv, .json ou mesmo outros sistemas de arquivos.

In [None]:
# Salvando local
movies_with_year_df.write.partitionBy("year").parquet("/content/meu-bucket/movies")

In [None]:
!ls /content/meu-bucket/movies

 _SUCCESS    'year=1917'  'year=1938'  'year=1959'  'year=1980'  'year=2001'
'year=06'    'year=1918'  'year=1939'  'year=1960'  'year=1981'  'year=2002'
'year=1891'  'year=1919'  'year=1940'  'year=1961'  'year=1982'  'year=2003'
'year=1893'  'year=1920'  'year=1941'  'year=1962'  'year=1983'  'year=2004'
'year=1894'  'year=1921'  'year=1942'  'year=1963'  'year=1984'  'year=2005'
'year=1895'  'year=1922'  'year=1943'  'year=1964'  'year=1985'  'year=2006'
'year=1896'  'year=1923'  'year=1944'  'year=1965'  'year=1986'  'year=2007'
'year=1898'  'year=1924'  'year=1945'  'year=1966'  'year=1987'  'year=2008'
'year=1899'  'year=1925'  'year=1946'  'year=1967'  'year=1988'  'year=2009'
'year=1900'  'year=1926'  'year=1947'  'year=1968'  'year=1989'  'year=2010'
'year=1901'  'year=1927'  'year=1948'  'year=1969'  'year=1990'  'year=2011'
'year=1902'  'year=1928'  'year=1949'  'year=1970'  'year=1991'  'year=2012'
'year=1903'  'year=1929'  'year=1950'  'year=1971'  'year=1992'  'year=2013'

In [None]:
spark.conf.set("spark.default.parallelism", 3)

In [None]:
from pyspark.sql.functions import split, regexp_extract

movies_with_year_df = movies_df.select('ID','title',regexp_extract('title',r'\((\d+)\)',1).alias('year'))
movies_with_year_df.show(5)

+---+--------------------+----+
| ID|               title|year|
+---+--------------------+----+
|  1|    Toy Story (1995)|1995|
|  2|      Jumanji (1995)|1995|
|  3|Grumpier Old Men ...|1995|
|  4|Waiting to Exhale...|1995|
|  5|Father of the Bri...|1995|
+---+--------------------+----+
only showing top 5 rows



Comandos comuns de RDD:

* `map`: Aplica uma função a cada elemento do RDD.
* `filter`: Filtra elementos com base em uma condição.
* `flatMap`: Aplica uma função a cada elemento e achata o resultado.
* `reduceByKey`: Agrupa elementos por chave e aplica uma função de redução.
* `join`: Junta dois RDDs com base em uma chave comum.
* `sortByKey`: Ordena um RDD por chave.
* `collect`: Traz todos os dados de volta para o driver.

In [None]:
# Extraindo a coluna title e convertendo para RDD
titles_rdd = movies_with_genres_df.select("title").rdd.flatMap(lambda x: x[0].split(" "))

# Contando as palavras
word_counts = titles_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Mostrando os resultados
word_counts.collect()

[('Toy', 11),
 ('Story', 195),
 ('(1995)', 474),
 ('Jumanji', 1),
 ('Grumpier', 1),
 ('Old', 46),
 ('Men', 104),
 ('Waiting', 13),
 ('to', 642),
 ('Exhale', 1),
 ('Father', 33),
 ('of', 3152),
 ('the', 2982),
 ('Bride', 33),
 ('Part', 73),
 ('II', 94),
 ('Heat', 15),
 ('Sabrina', 2),
 ('Tom', 23),
 ('and', 1050),
 ('Huck', 2),
 ('Sudden', 5),
 ('Death', 130),
 ('GoldenEye', 1),
 ('American', 132),
 ('President,', 4),
 ('The', 6301),
 ('Dracula:', 3),
 ('Dead', 175),
 ('Loving', 8),
 ('It', 133),
 ('Balto', 1),
 ('Nixon', 4),
 ('Cutthroat', 1),
 ('Island', 57),
 ('Casino', 6),
 ('Sense', 7),
 ('Sensibility', 2),
 ('Four', 52),
 ('Rooms', 4),
 ('Ace', 9),
 ('Ventura:', 2),
 ('When', 59),
 ('Nature', 9),
 ('Calls', 5),
 ('Money', 43),
 ('Train', 36),
 ('Get', 56),
 ('Shorty', 1),
 ('Copycat', 1),
 ('Assassins', 8),
 ('Powder', 5),
 ('Leaving', 4),
 ('Las', 10),
 ('Vegas', 12),
 ('Othello', 4),
 ('Now', 36),
 ('Then', 11),
 ('Persuasion', 4),
 ('City', 119),
 ('Lost', 97),
 ('Children,', 5

# SQL, Hive e HiveQL

O Hive é um data warehouse construído sobre o Hadoop, que possui uma interface SQL para consultar e manipular dados armazenados em sistemas de arquivos distribuídos. Ele utiliza o HiveQL, uma linguagem semelhante ao SQL.

O Spark possui uma integração nativa com o Hive. Isso significa que o Spark pode ser utilizado como motor de execução para as consultas HiveQL.

> **nota**. Inicialmente, o Hive utiliza(va?) o MapReduce como motor de execução. O MapReduce, embora seja um paradigma poderoso para processamento de grandes volumes de dados, pode ser lento para consultas iterativas e complexas. O Spark, com seu modelo de programação em memória, oferece assim um desempenho superior.

Tabelas Hive podem ser criadas diretamente através do Spark.

In [None]:
# Criar um DataFrame
data = [("Alice", 25), ("Bob", 30)]
df = spark.createDataFrame(data, ["name", "age"])

# Criar o esquema (se necessário)
spark.sql("CREATE SCHEMA my_database")

# Salvar o DataFrame na tabela
df.write.format("parquet").mode("overwrite").saveAsTable("my_database.my_table")

In [None]:
!ls /content/spark-warehouse/my_database.db/my_table

part-00000-e732b080-bb4d-4e44-8777-e5577b8de0cb-c000.snappy.parquet  _SUCCESS
part-00001-e732b080-bb4d-4e44-8777-e5577b8de0cb-c000.snappy.parquet


In [None]:
result = spark.sql("SELECT * FROM my_database.my_table WHERE age == 25")

result.show(5)

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
+-----+---+



## SQL em dataframes e RDDs

In [None]:
movies_with_year_df.createOrReplaceTempView("movies")

# Executando uma consulta SQL
result = spark.sql("SELECT * FROM movies WHERE year == 1990")

result.show(5)

+---+--------------------+----+
| ID|               title|year|
+---+--------------------+----+
|586|   Home Alone (1990)|1990|
|587|        Ghost (1990)|1990|
|590|Dances with Wolve...|1990|
|597| Pretty Woman (1990)|1990|
|625|  Asfour Stah (1990)|1990|
+---+--------------------+----+
only showing top 5 rows

