# Apache Spark

## Características
- Plataforma de computação em Cluster rápida, tolerante a falhas e de propósito geral.
- 100x mais rápido que Mapreduce em memória.
- 10x mais rápido que Mapreduce em disco.
- Compatível com Hadoop.
- Open Source.
- Desenvolvido em Scala.
- Aplicações em Java, Scala, Python e R.
- Bibliotecas para SQL, Streaming, Machine Learning e grafos.

## Componentes
![Componentes](https://s3-sa-east-1.amazonaws.com/lcpi/89684467-39f5-4a10-9b1d-5a7788603adf.png)

## SparkContext

- É o ponto de entrada da sessão Spark
- Pode ser usado para criar RDDs, acumuladores e variáveis de transmissão no cluster
- Em modo local (spark-shell ou pyspark) um objeto SparkContext é criado automaticamente e a variável sc refere-se ao objeto SparkContext

In [0]:
sc

In [0]:
spark

## RDD
Um RDD significa Conjuntos de dados distribuídos resilientes. É uma coleção de registros de partição somente leitura. RDD é a estrutura de dados fundamental do Spark. Ele permite que um programador execute cálculos na memória em grandes grupos de maneira tolerante a falhas . Assim, acelere a tarefa.

RDD era a principal API voltada para o usuário no Spark desde o seu início. No núcleo, um **RDD é uma coleção distribuída imutável** de elementos de seus dados, particionada em nós no cluster que pode ser operada em paralelo com uma API de baixo nível que oferece transformações e ações.

![RDD](https://s3-sa-east-1.amazonaws.com/lcpi/85bc10e4-4dd4-4c83-b64b-50a815b77009.png)

Exemplos de transformações:
- map() - Aplica uma função a cada elemento no RDD e retorna um RDD do resultado
- filter() - Retorna um RDD com os elementos que correspondem condição de filtro
- union() – Retorna um RDD contendo elementos de ambos os RDDs.

Exemplos de ações:
- collect() - Retornar todos os elementos do RDD.
- count() - Retorna o número de elementos do RDD.
- take(10) - Retorna 10 elementos do RDD.
- foreach(func) - Aplica a função fornecida a cada elemento do RDD.

## Operações básicas

In [0]:
num_list =  [1,2,3,4,5,6,7,8,9,0]
numeros = sc.parallelize(num_list)

In [0]:
numeros.collect()

Out[7]: [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]

In [0]:
numeros.sum()

Out[8]: 45

In [0]:
numeros.mean()

Out[14]: 4.5

Mostra primeiro elemento:
    
    numeros.first()
 
    
Mostra os 5 maiores:

    numeros.top(5)


Mostra todos elementos:

    numeros.collect()


Contar os elementos:

    numeros.count()
    

Media dos numeros:

    numeros.mean()
    
   
Somar os elementos:

    numeros.sum()


Mostra maior elemento:

    numeros.max()


Mostra menor elemento:

    numeros.min()
    
 
Calcula desvio padrão:

    numeros.stdev()

## Dataframe
Ao contrário de um RDD, os dados são organizados em colunas nomeadas. Por exemplo, uma tabela em um banco de dados relacional. É uma coleção imutável de dados distribuídos. O DataFrame no Spark permite que os desenvolvedores imponham uma estrutura em uma coleção distribuída de dados, permitindo abstração de nível superior.

- Um DataFrame é uma coleção distribuída de dados organizados em colunas nomeadas. É conceitualmente igual a uma tabela em um banco de dados relacional.

- Funciona apenas em dados estruturados e semiestruturados. Ele organiza os dados na coluna nomeada. Os DataFrames permitem que o Spark gerencie o esquema.

- A API da fonte de dados permite o processamento de dados em diferentes formatos (AVRO, CSV, JSON e sistema de armazenamento HDFS , tabelas HIVE , MySQL). Ele pode ler e gravar de várias fontes de dados mencionadas acima.

- Após a transformação no DataFrame, não é possível regenerar um objeto de domínio. Por exemplo, se você gerar testDF a partir de testRDD, não poderá recuperar o RDD original da classe de teste.

# PySpark

## Criar Data Frames a partir de `arquivos`

Baixe a base do Kaggle, em CSV, disponível no link abaixo:
https://www.kaggle.com/martj42/international-football-results-from-1872-to-2017

Criar diretório `football`

In [0]:
dbutils.fs.mkdirs('FileStore/tables/football')

Out[15]: True

Fazer upload do arquivo results.csv no diretório recém criado

In [0]:
dbutils.fs.cp('FileStore/results.csv','FileStore/tables/football')

Out[16]: True

### Criar uma variável RDD a partir do CSV

In [0]:
matches = sc.textFile('FileStore/tables/football/results.csv')
matches.collect()

Out[19]: ['date,home_team,away_team,home_score,away_score,tournament,city,country,neutral',
 '1872-11-30,Scotland,England,0,0,Friendly,Glasgow,Scotland,FALSE',
 '1873-03-08,England,Scotland,4,2,Friendly,London,England,FALSE',
 '1874-03-07,Scotland,England,2,1,Friendly,Glasgow,Scotland,FALSE',
 '1875-03-06,England,Scotland,2,2,Friendly,London,England,FALSE',
 '1876-03-04,Scotland,England,3,0,Friendly,Glasgow,Scotland,FALSE',
 '1876-03-25,Scotland,Wales,4,0,Friendly,Glasgow,Scotland,FALSE',
 '1877-03-03,England,Scotland,1,3,Friendly,London,England,FALSE',
 '1877-03-05,Wales,Scotland,0,2,Friendly,Wrexham,Wales,FALSE',
 '1878-03-02,Scotland,England,7,2,Friendly,Glasgow,Scotland,FALSE',
 '1878-03-23,Scotland,Wales,9,0,Friendly,Glasgow,Scotland,FALSE',
 '1879-01-18,England,Wales,2,1,Friendly,London,England,FALSE',
 '1879-04-05,England,Scotland,5,4,Friendly,London,England,FALSE',
 '1879-04-07,Wales,Scotland,0,3,Friendly,Wrexham,Wales,FALSE',
 '1880-03-13,Scotland,England,5,4,Friendly,Glasgow,

### Acessar um arquivo csv via pyspark

In [0]:
df_football = spark.read.csv('/FileStore/tables/football/results.csv')
df_football.show()

+----------+---------+---------+----------+----------+----------+---------+--------+-------+
|       _c0|      _c1|      _c2|       _c3|       _c4|       _c5|      _c6|     _c7|    _c8|
+----------+---------+---------+----------+----------+----------+---------+--------+-------+
|      date|home_team|away_team|home_score|away_score|tournament|     city| country|neutral|
|1872-11-30| Scotland|  England|         0|         0|  Friendly|  Glasgow|Scotland|  FALSE|
|1873-03-08|  England| Scotland|         4|         2|  Friendly|   London| England|  FALSE|
|1874-03-07| Scotland|  England|         2|         1|  Friendly|  Glasgow|Scotland|  FALSE|
|1875-03-06|  England| Scotland|         2|         2|  Friendly|   London| England|  FALSE|
|1876-03-04| Scotland|  England|         3|         0|  Friendly|  Glasgow|Scotland|  FALSE|
|1876-03-25| Scotland|    Wales|         4|         0|  Friendly|  Glasgow|Scotland|  FALSE|
|1877-03-03|  England| Scotland|         1|         3|  Friendly|   Lo

In [0]:
df_football = spark.read.csv('/FileStore/tables/football/results.csv', header=True, inferSchema=True)
df_football.show()

+-------------------+----------------+---------+----------+----------+----------+---------+--------+-------+
|               date|       home_team|away_team|home_score|away_score|tournament|     city| country|neutral|
+-------------------+----------------+---------+----------+----------+----------+---------+--------+-------+
|1872-11-30 00:00:00|        Scotland|  England|         0|         0|  Friendly|  Glasgow|Scotland|  false|
|1873-03-08 00:00:00|         England| Scotland|         4|         2|  Friendly|   London| England|  false|
|1874-03-07 00:00:00|        Scotland|  England|         2|         1|  Friendly|  Glasgow|Scotland|  false|
|1875-03-06 00:00:00|         England| Scotland|         2|         2|  Friendly|   London| England|  false|
|1876-03-04 00:00:00|        Scotland|  England|         3|         0|  Friendly|  Glasgow|Scotland|  false|
|1876-03-25 00:00:00|        Scotland|    Wales|         4|         0|  Friendly|  Glasgow|Scotland|  false|
|1877-03-03 00:00:0

In [0]:
df_football.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- home_team: string (nullable = true)
 |-- away_team: string (nullable = true)
 |-- home_score: integer (nullable = true)
 |-- away_score: integer (nullable = true)
 |-- tournament: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- neutral: boolean (nullable = true)



## Criar Data Frames a partir de `tabelas`

In [0]:
%sql
CREATE TABLE football
USING csv
LOCATION "/FileStore/tables/football"
OPTIONS (header "true", inferSchema "true")

In [0]:
%sql
DESCRIBE EXTENDED football

col_name,data_type,comment
date,timestamp,
home_team,string,
away_team,string,
home_score,int,
away_score,int,
tournament,string,
city,string,
country,string,
neutral,boolean,
,,


In [0]:
df = spark.table('football')
display(df)

date,home_team,away_team,home_score,away_score,tournament,city,country,neutral
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False
1873-03-08T00:00:00.000+0000,England,Scotland,4,2,Friendly,London,England,False
1874-03-07T00:00:00.000+0000,Scotland,England,2,1,Friendly,Glasgow,Scotland,False
1875-03-06T00:00:00.000+0000,England,Scotland,2,2,Friendly,London,England,False
1876-03-04T00:00:00.000+0000,Scotland,England,3,0,Friendly,Glasgow,Scotland,False
1876-03-25T00:00:00.000+0000,Scotland,Wales,4,0,Friendly,Glasgow,Scotland,False
1877-03-03T00:00:00.000+0000,England,Scotland,1,3,Friendly,London,England,False
1877-03-05T00:00:00.000+0000,Wales,Scotland,0,2,Friendly,Wrexham,Wales,False
1878-03-02T00:00:00.000+0000,Scotland,England,7,2,Friendly,Glasgow,Scotland,False
1878-03-23T00:00:00.000+0000,Scotland,Wales,9,0,Friendly,Glasgow,Scotland,False


# Analíses com pyspark

## select

In [0]:
df_football.select('home_team', 'home_score').show()

+----------------+----------+
|       home_team|home_score|
+----------------+----------+
|        Scotland|         0|
|         England|         4|
|        Scotland|         2|
|         England|         2|
|        Scotland|         3|
|        Scotland|         4|
|         England|         1|
|           Wales|         0|
|        Scotland|         7|
|        Scotland|         9|
|         England|         2|
|         England|         5|
|           Wales|         0|
|        Scotland|         5|
|           Wales|         2|
|        Scotland|         5|
|         England|         0|
|         England|         1|
|           Wales|         1|
|Northern Ireland|         0|
+----------------+----------+
only showing top 20 rows



In [0]:
df_football[['home_team', 'home_score']].show()

+----------------+----------+
|       home_team|home_score|
+----------------+----------+
|        Scotland|         0|
|         England|         4|
|        Scotland|         2|
|         England|         2|
|        Scotland|         3|
|        Scotland|         4|
|         England|         1|
|           Wales|         0|
|        Scotland|         7|
|        Scotland|         9|
|         England|         2|
|         England|         5|
|           Wales|         0|
|        Scotland|         5|
|           Wales|         2|
|        Scotland|         5|
|         England|         0|
|         England|         1|
|           Wales|         1|
|Northern Ireland|         0|
+----------------+----------+
only showing top 20 rows



## filter e/ou where

In [0]:
df_football.select('home_team','home_score').where('home_score = 2').show()

+----------------+----------+
|       home_team|home_score|
+----------------+----------+
|        Scotland|         2|
|         England|         2|
|         England|         2|
|           Wales|         2|
|         England|         2|
|Northern Ireland|         2|
|Northern Ireland|         2|
|         England|         2|
|Northern Ireland|         2|
|         England|         2|
|        Scotland|         2|
|         England|         2|
|Northern Ireland|         2|
|Northern Ireland|         2|
|        Scotland|         2|
|Northern Ireland|         2|
|           Wales|         2|
|        Scotland|         2|
|           Wales|         2|
|Northern Ireland|         2|
+----------------+----------+
only showing top 20 rows



In [0]:
df_football.select('home_team', 'home_score') \
        .where('home_score = 2 or home_score = 1') \
        .show()

+----------------+----------+
|       home_team|home_score|
+----------------+----------+
|        Scotland|         2|
|         England|         2|
|         England|         1|
|         England|         2|
|           Wales|         2|
|         England|         1|
|           Wales|         1|
|         England|         2|
|Northern Ireland|         1|
|Northern Ireland|         1|
|        Scotland|         1|
|         England|         1|
|         England|         1|
|           Wales|         1|
|Northern Ireland|         2|
|Northern Ireland|         1|
|Northern Ireland|         2|
|        Scotland|         1|
|           Wales|         1|
|         England|         2|
+----------------+----------+
only showing top 20 rows



In [0]:
df_football.select('home_team','home_score') \
        .where(df_football.home_score == 2) \
        .show()

+----------------+----------+
|       home_team|home_score|
+----------------+----------+
|        Scotland|         2|
|         England|         2|
|         England|         2|
|           Wales|         2|
|         England|         2|
|Northern Ireland|         2|
|Northern Ireland|         2|
|         England|         2|
|Northern Ireland|         2|
|         England|         2|
|        Scotland|         2|
|         England|         2|
|Northern Ireland|         2|
|Northern Ireland|         2|
|        Scotland|         2|
|Northern Ireland|         2|
|           Wales|         2|
|        Scotland|         2|
|           Wales|         2|
|Northern Ireland|         2|
+----------------+----------+
only showing top 20 rows



In [0]:
df_football.select('home_team','home_score') \
        .where((df_football.home_score == 2) | (df_football.home_score  == 1)) \
        .show()

+----------------+----------+
|       home_team|home_score|
+----------------+----------+
|        Scotland|         2|
|         England|         2|
|         England|         1|
|         England|         2|
|           Wales|         2|
|         England|         1|
|           Wales|         1|
|         England|         2|
|Northern Ireland|         1|
|Northern Ireland|         1|
|        Scotland|         1|
|         England|         1|
|         England|         1|
|           Wales|         1|
|Northern Ireland|         2|
|Northern Ireland|         1|
|Northern Ireland|         2|
|        Scotland|         1|
|           Wales|         1|
|         England|         2|
+----------------+----------+
only showing top 20 rows



In [0]:
df_football.filter(df_football.home_score == 2).show()

+-------------------+----------------+----------------+----------+----------+--------------------+---------+--------+-------+
|               date|       home_team|       away_team|home_score|away_score|          tournament|     city| country|neutral|
+-------------------+----------------+----------------+----------+----------+--------------------+---------+--------+-------+
|1874-03-07 00:00:00|        Scotland|         England|         2|         1|            Friendly|  Glasgow|Scotland|  false|
|1875-03-06 00:00:00|         England|        Scotland|         2|         2|            Friendly|   London| England|  false|
|1879-01-18 00:00:00|         England|           Wales|         2|         1|            Friendly|   London| England|  false|
|1880-03-15 00:00:00|           Wales|         England|         2|         3|            Friendly|  Wrexham|   Wales|  false|
|1883-03-10 00:00:00|         England|        Scotland|         2|         3|            Friendly|Sheffield| England| 

## sum, min, max, mean

In [0]:
df_football.select('home_team','home_score').groupby('home_team').min('home_score').show()

+-------------+---------------+
|    home_team|min(home_score)|
+-------------+---------------+
|      Kabylia|              0|
|       Kernow|              1|
|         Chad|              0|
|     Provence|              0|
|       Russia|              0|
|     Paraguay|              0|
|     Anguilla|              0|
|        Yemen|              0|
|South Ossetia|              0|
|      Senegal|              0|
|       Madrid|              1|
|       Sweden|              0|
|        Frøya|              0|
|     Kiribati|              0|
|     Ynys Môn|              0|
|       Guyana|              0|
|  Philippines|              0|
|       Jersey|              0|
|      Eritrea|              0|
|        Tibet|              0|
+-------------+---------------+
only showing top 20 rows



In [0]:
df_football.select('home_score').groupby().min().show()

+---------------+
|min(home_score)|
+---------------+
|              0|
+---------------+



In [0]:
df_football.select('home_score').groupby().max().show()

+---------------+
|max(home_score)|
+---------------+
|             31|
+---------------+



In [0]:
df_football.select('home_score').groupby().mean().show()

+-----------------+
|  avg(home_score)|
+-----------------+
|1.742544225247754|
+-----------------+



In [0]:
df_football.select('away_score').groupby().mean().show()

+-----------------+
|  avg(away_score)|
+-----------------+
|1.182666481430027|
+-----------------+



## agg

In [0]:
df_football.agg({'away_score':'mean'}).show()

+-----------------+
|  avg(away_score)|
+-----------------+
|1.182666481430027|
+-----------------+



In [0]:
df_football.agg({'home_score':'stddev'}).show()

+------------------+
|stddev(home_score)|
+------------------+
| 1.753054878995388|
+------------------+



## [join](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html)

In [0]:
df_football2 = spark.table('football')

In [0]:
df_join = df_football2.join(df_football, on=df_football2.home_team==df_football.home_team,how='left')
display(df_join)

date,home_team,away_team,home_score,away_score,tournament,city,country,neutral,date.1,home_team.1,away_team.1,home_score.1,away_score.1,tournament.1,city.1,country.1,neutral.1
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2021-11-15T00:00:00.000+0000,Scotland,Denmark,2,0,FIFA World Cup qualification,Glasgow,Scotland,False
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2021-10-09T00:00:00.000+0000,Scotland,Israel,3,2,FIFA World Cup qualification,Glasgow,Scotland,False
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2021-09-04T00:00:00.000+0000,Scotland,Moldova,1,0,FIFA World Cup qualification,Glasgow,Scotland,False
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2021-06-22T00:00:00.000+0000,Scotland,Croatia,1,3,UEFA Euro,Glasgow,Scotland,False
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2021-06-14T00:00:00.000+0000,Scotland,Czech Republic,0,2,UEFA Euro,Glasgow,Scotland,False
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2021-03-31T00:00:00.000+0000,Scotland,Faroe Islands,4,0,FIFA World Cup qualification,Glasgow,Scotland,False
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2021-03-25T00:00:00.000+0000,Scotland,Austria,2,2,FIFA World Cup qualification,Glasgow,Scotland,False
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2020-10-14T00:00:00.000+0000,Scotland,Czech Republic,1,0,UEFA Nations League,Glasgow,Scotland,False
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2020-10-11T00:00:00.000+0000,Scotland,Slovakia,1,0,UEFA Nations League,Glasgow,Scotland,False
1872-11-30T00:00:00.000+0000,Scotland,England,0,0,Friendly,Glasgow,Scotland,False,2020-10-08T00:00:00.000+0000,Scotland,Israel,0,0,UEFA Euro qualification,Glasgow,Scotland,False


In [0]:
type(df_join)

Out[66]: pyspark.sql.dataframe.DataFrame

## toPandas

In [0]:
df_pandas = df_football.toPandas()
df_pandas

Unnamed: 0,date,home_team,away_team,home_score,away_score,tournament,city,country,neutral
0,1872-11-30,Scotland,England,0,0,Friendly,Glasgow,Scotland,False
1,1873-03-08,England,Scotland,4,2,Friendly,London,England,False
2,1874-03-07,Scotland,England,2,1,Friendly,Glasgow,Scotland,False
3,1875-03-06,England,Scotland,2,2,Friendly,London,England,False
4,1876-03-04,Scotland,England,3,0,Friendly,Glasgow,Scotland,False
...,...,...,...,...,...,...,...,...,...
43183,2022-02-01,Suriname,Guyana,2,1,Friendly,Paramaribo,Suriname,False
43184,2022-02-02,Burkina Faso,Senegal,1,3,African Cup of Nations,Yaoundé,Cameroon,True
43185,2022-02-03,Cameroon,Egypt,0,0,African Cup of Nations,Yaoundé,Cameroon,False
43186,2022-02-05,Cameroon,Burkina Faso,3,3,African Cup of Nations,Yaoundé,Cameroon,False


In [0]:
type(df_pandas)

Out[69]: pandas.core.frame.DataFrame

In [0]:
import pandas as pd

In [0]:
import pyspark.pandas as ps

## Bibliografia e Indicações

https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html

## Exercícios

Quantos registro existem no dataframe?

In [0]:
df_football.count()

Out[71]: 43188

Quantas equipes únicas mandantes existem no dataframe?

In [0]:
df_football.select('home_team').distinct().count()

Out[72]: 306

Quantas vezes as equipes mandantes saíram vitoriosas?

In [0]:
df_football.where('home_score > away_score').count()

Out[73]: 21009

Quantas vezes as equipes visitantes saíram vitoriosas?

In [0]:
df_football.where('home_score < away_score').count()

Out[76]: 12224

In [0]:
df_partida_pais = df_football.select('country').groupby('country').count()

A partir de um dataframe, crie uma tabela no Hive com o nome de partida_pais com o total de partida em cadas país.

In [0]:
dbutils.fs.mkdirs('FileStore/tables/football_df')

Out[88]: True

In [0]:
(df_partida_pais.write
.mode("overwrite")
.format("parquet")
.save("FileStore/tables/football_df"))

In [0]:
dbutils.fs.ls('FileStore/tables/football_df')

Out[90]: [FileInfo(path='dbfs:/FileStore/tables/football_df/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1665537001000),
 FileInfo(path='dbfs:/FileStore/tables/football_df/_committed_9221081157951080718', name='_committed_9221081157951080718', size=124, modificationTime=1665537000000),
 FileInfo(path='dbfs:/FileStore/tables/football_df/_started_9221081157951080718', name='_started_9221081157951080718', size=0, modificationTime=1665537000000),
 FileInfo(path='dbfs:/FileStore/tables/football_df/part-00000-tid-9221081157951080718-3bd9f6f3-1635-4312-89c0-77fbe566b46c-135-1-c000.snappy.parquet', name='part-00000-tid-9221081157951080718-3bd9f6f3-1635-4312-89c0-77fbe566b46c-135-1-c000.snappy.parquet', size=4570, modificationTime=1665537000000)]

In [0]:
%sql
CREATE TABLE football_df
USING parquet
LOCATION '/FileStore/tables/football_df'

Utilizando o Apache Hive e a tabela partida_pais, em qual país ocorreram mais partidas?

In [0]:
%sql
SELECT * FROM football_df

country,count
Chad,38
Russia,232
Paraguay,226
Anguilla,25
Yemen,58
Senegal,215
Sweden,655
Guyana,103
Philippines,85
Burma,19


In [0]:
%sql
SELECT *
FROM football_df
ORDER BY 2 DESC
LIMIT 1

country,count
United States,1237
