***
## **<center>Introdução ao Spark</center>**


**<center>Processamento de Dados Massivos</center>**
***

<br>
Este notebook tem o objetivo de apresentar uma primeira interação com a ferramenta Apache Spark.

Como será utilizada a API para Python chamada Pyspark, o primeiro passo é a instalação dela.

Com a biblioteca instalada, deve-se configurar para a instanciação de uma sessão Spark:

In [1]:
import os
os.environ["JAVA_HOME"] = "C:\\jdk-22.0.2"
os.environ["SPARK_HOME"] = "C:\\spark-3.5.2-bin-hadoop3\\spark-3.5.2-bin-hadoop3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_PYTHON"] = "python"

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Introducao Spark").getOrCreate()

***Pergunte ao ChatGPT a diferença entre o SparkSession e o SparkContext...***

In [3]:
spark

In [4]:
# criação da variável para o Spark Context
sc = spark.sparkContext

## **Criação de RDD**
Vamos criar um primeiro RDD a partir de uma estrutura de dados.


In [None]:
data = [1, 2, 3, 4, 5, 8, 9]
data

In [None]:
primeiroRDD = sc.parallelize(data)

In [None]:
primeiroRDD

In [None]:
# outras estruturas de dados
kv = [('a',7), ('a', 2), ('b', 2), ('b',4), ('c',1), ('c',2), ('c',3), ('c',4)]

segundoRDD = sc.parallelize(kv)
segundoRDD.collect()

Segue a lista de métodos que podem ser executados em um RDD:

**Transformações**

* map(func): Aplica uma função a cada elemento do RDD e retorna um novo RDD.
* filter(func): Retorna um novo RDD contendo apenas os elementos que satisfazem a função especificada.
* flatMap(func): Aplica uma função a cada elemento e "achata" o resultado, retornando um novo RDD.
* mapPartitions(func): Aplica uma função a cada partição do RDD e retorna um novo RDD.
* distinct(): Retorna um novo RDD com os elementos distintos (removendo duplicatas).
* union(otherRdd): Retorna um novo RDD contendo a união dos elementos do RDD original com outro RDD.
* intersection(otherRdd): Retorna um novo RDD contendo apenas os elementos presentes em ambos os RDDs.
* subtract(otherRdd): Retorna um novo RDD contendo os elementos do RDD original que não estão no outro RDD.
* cartesian(otherRdd): Retorna o produto cartesiano do RDD original com outro RDD.
* groupByKey(): Agrupa os pares (chave, valor) com a mesma chave em uma coleção de valores.
* reduceByKey(func): Combina os valores com a mesma chave usando uma função de redução.
* sortByKey(ascending=True): Retorna um RDD ordenado pelas chaves.
join(otherRdd): Realiza um join entre dois RDDs baseados em suas chaves.
* cogroup(otherRdd): Agrupa os dados de ambos os RDDs pelo valor da chave.

**Ações**
* collect(): Retorna todos os elementos do RDD como uma lista para o programa driver.
* count(): Retorna o número de elementos no RDD.
* take(n): Retorna os primeiros n elementos do RDD.
* top(n): Retorna os n maiores elementos do RDD.
* reduce(func): Agrega os elementos do RDD usando uma função de redução.
* takeOrdered(n, key=None): Retorna os primeiros n elementos ordenados de acordo com uma função de chave.
* foreach(func): Aplica uma função a cada elemento do RDD sem retornar nenhum valor ao driver.
* countByKey(): Retorna um dicionário com o número de elementos para cada chave.
* saveAsTextFile(path): Salva o conteúdo do RDD em um arquivo de texto no caminho especificado.
* saveAsSequenceFile(path): Salva o conteúdo do RDD como um arquivo SequenceFile (usado em Hadoop).
* saveAsObjectFile(path): Serializa o RDD e salva-o como um arquivo de objeto.
* takeSample(withReplacement, num, seed=None): Retorna uma amostra aleatória de elementos do RDD.

In [None]:
primeiroRDD.collect()

Comando `collect` mostra os elementos do RDD.

Outra possibilidade é o `take`.

In [None]:
primeiroRDD.take(2)

### Pergunta de seleção

Entre o `collect` e o `take`, qual seria o mais indicado para trabalho com Big Data?

In [None]:
somar_todos = primeiroRDD.reduce(lambda x, y: x + y)

### Exercício

Testar pelo menos 5 funções que não foram exemplificadas até aqui. Use o ChatGPT se preciso...

Criando um RDD a partir de um arquivo:

In [None]:
import requests

request = requests.get('https://github.com/alexvaroz/data_science_alem_do_basico/raw/master/frases_estoicas.txt')
arquivo_base = open("frases.txt", 'w')
arquivo_base.write(str(request.text))
arquivo_base.close()

In [None]:
frases = sc.textFile("frases.txt")

In [None]:
#frases.collect()

Execução de comando sobre os elementos do RDD usando a função `map`

In [None]:
frases_lowercase = frases.map(lambda x: x.lower())

O comando `flatMap` é utilizado quando se quer agrupar os elementos de um RDD para um único. Devido ao caráter imutável do RDD, um novo RDD é gerado.

In [None]:
palavras = frases_lowercase.flatMap(lambda x: x.split(" "))

In [None]:
palavras.collect()

Comando `filter`:

In [None]:
filtered = palavras.filter(lambda x: len(x) > 2)
filtered.collect()

A partir do Spark 2, está disponibilizado o DataFrame, uma outra abstração evoluída do RDD, trazendo vantagens para o processamento e integração com outras fontes de dados.

In [None]:
# O DataFrame contemplará o nome de algumas capitais e o respectivo ano de fundação.
# Será criado um array de tuplas
dados = [('Brasília', 1960), ('Rio de Janeiro',1565), ('Vitória', 1551), ('Manaus',1669 ), ('Campo Grande',1872)]

In [None]:
df = spark.createDataFrame(dados, ['cidade','dt_fundacao'])

O DataFrame foi criado. Pode-se verificar o formato com o comando "printSchema()". Para mostrar os dados, deve-se executar a ação "show()"

In [None]:
df.printSchema()

In [None]:
#df.count()

Detalhe para a ausência de uma coluna de índice... Característica dos DataFrames do Spark.

In [None]:
# Se quiser apresentar a primeira linha
df.show(1)

Como o DataFrame é uma extensão do RDD, todos os métodos apresentados anteriormente servem, entretanto, existem uns específicos para os DataFrames.

**Transformações Exclusivas de DataFrames**

* select(*cols): Permite selecionar colunas específicas de um DataFrame, semelhante à operação SELECT em SQL. No RDD, você teria que mapear manualmente para acessar ou manipular colunas específicas.
* withColumn(colName, col): Adiciona ou substitui uma coluna no DataFrame. No RDD, adicionar ou modificar uma coluna exige mapear e criar uma nova estrutura manualmente.
* drop(*cols): Remove uma ou mais colunas de um DataFrame. No RDD, isso exigiria a remoção manual de elementos em cada registro.
* filter(expr) (usando expressões SQL ou Column): Filtra linhas com base em uma expressão SQL ou uma Column de DataFrame. No RDD, o filtro é limitado a funções lambda.
* groupBy(*cols): Agrupa os dados com base em uma ou mais colunas. No RDD, agrupamentos semelhantes exigem transformações como groupByKey ou reduceByKey, que são mais limitadas.
* agg(*exprs): Permite realizar operações de agregação, como somas, médias, contagens, etc., usando expressões SQL ou colunas. O RDD precisa de métodos mais complexos e menos expressivos para realizar agregações.
* join(other, on=None, how=None): Realiza operações de join com suporte a diferentes tipos (inner, left, right, outer, etc.), diretamente sobre DataFrames. No RDD, joins precisam ser implementados manualmente com chaves e combinações, o que é menos eficiente e mais propenso a erros.
* pivot(index, *cols): Cria uma tabela dinâmica (pivot table) no estilo SQL, permitindo agregações e rearranjos de dados. O RDD não possui suporte direto a essa operação.
* dropDuplicates(subset=None): Remove duplicatas de um DataFrame com base em todas ou em um subconjunto de colunas. No RDD, a remoção de duplicatas precisa ser feita com distinct, mas sem suporte a colunas específicas.
* orderBy(*cols, **kwargs): Ordena o DataFrame com base em uma ou mais colunas. No RDD, a ordenação é limitada ao sortBy e sortByKey, sem a capacidade de usar múltiplas colunas diretamente.

**Ações Exclusivas de DataFrames**

* show(n=20, truncate=True): Exibe as primeiras n linhas de um DataFrame em um formato tabular, útil para inspeção rápida dos dados. Não há equivalente direto no RDD.
* describe(*cols): Gera estatísticas resumidas (como count, mean, stddev, min, max) para as colunas especificadas. No RDD, seria necessário calcular manualmente essas estatísticas.
* head(n=1): Retorna as primeiras n linhas do DataFrame. O RDD tem take, mas head é mais específico e intuitivo em DataFrames.
* countDistinct(*cols): Retorna a contagem de valores distintos para as colunas especificadas. O RDD não possui um método específico para isso.
* corr(col1, col2): Calcula a correlação entre duas colunas. No RDD, isso teria que ser implementado manualmente.
* cov(col1, col2): Calcula a covariância entre duas colunas. Novamente, no RDD, seria necessário calcular manualmente.
* crosstab(col1, col2): Cria uma tabela cruzada (cross-tabulation) para as duas colunas especificadas. Essa operação não existe em RDDs.
* approxQuantile(col, probabilities, relativeError): Calcula quantis aproximados para uma coluna com base em probabilidades especificadas. O RDD não suporta isso diretamente.
* toPandas(): Converte um DataFrame Spark em um DataFrame Pandas. No RDD, a conversão para Pandas não é tão direta.

### Exercício

Testar pelo menos 5 funções exclusivas do Dataframe que não foram exemplificadas até aqui. Mais uma vez, use o ChatGPT se preciso...

Para avançar nos comandos básicos de manipulação de um DataFrame, vamos baixar um conjunto de dados de tamanho interessante.

O conjunto de dados escolhido é o registro dos pagamentos do programa "Bolsa Família" disponibilizado em https://portaldatransparencia.gov.br/download-de-dados/novo-bolsa-familia/202405.

O arquivo traz informações dos beneficiários do programa no mês 05 do ano 2024.

Para evitar ter que baixar o arquivo (202MB) na máquina e depois fazer o upload para o drive, vamos baixar diretamente para o drive em uma pasta criada especificamente para receber os dados.



In [None]:
!wget -P '/content/drive/MyDrive/ProcessamentoDadosMassivos-02_2024/dados/bolsafamilia' https://portaldatransparencia.gov.br/download-de-dados/novo-bolsa-familia/202405

In [None]:
# Descompactar o arquivo baixado no mesmo diretório
!unzip '/content/drive/MyDrive/ProcessamentoDadosMassivos-02_2024/dados/bolsafamilia/202405' -d \
'/content/drive/MyDrive/ProcessamentoDadosMassivos-02_2024/dados/bolsafamilia/'

In [None]:
ARQUIVO = '/content/drive/MyDrive/ProcessamentoDadosMassivos-02_2024/dados/bolsafamilia/202405_NovoBolsaFamilia.csv'
df = spark.read.option("header","true")\
          .option("encoding", "latin1")\
          .option("sep",";")\
          .option('inferSchema', 'true')\
          .csv(ARQUIVO)


In [None]:
# Mostrar as 20 primeiras linhas


In [None]:
df.printSchema()

Lembrando que o Spark carrega uma referência na memória para os dados. Somente quando se efetua uma **ação** ele consolida. Essa é uma característica prepoderante para lidar com dados massivos.

In [None]:
# Contagem do número de linhas do dataframe


In [None]:
# Apresentar os dados da coluna "NOME FAVORECIDO" usando o comando 'select'

Como se trata de uma transformação, o resultado não é mostrado. Devendo ser acionada uma "ação", no caso a "show()"

In [None]:
#df.select('NOME FAVORECIDO').show(1,truncate=False)

In [None]:
# Para selecionar mais que uma coluna


Para filtrar registros, transformação "filter()"

In [None]:
#df.filter(df["NOME FAVORECIDO"][0:9]=="ALEXANDRE")

Novamente, para poder mostrar algum valor, é preciso realizar uma **ação**.

Selecionar registros distintos.

In [None]:
df.select("NOME FAVORECIDO").distinct()

Agrupamento de registros. Quantos registros possuem cada estado??

In [None]:
df.groupBy('UF').count().show()

Vamos agrupar pelo NIS.

Apesar da coluna "VALOR PARCELA" constar como *string* será que o spark passa a considerar como número para operações matemáticas?

Para transformar essa coluna em número, deverá ser aplicada uma função sobre a coluna que transforme a ',' em '.' e altere o tipo para "float".

Então chegamos ao conceito de UDF.


### **UDF (User Defined Function)**

A função para ser executada no Spark precisa ser registrada no contexto. Esse registro ocorre com a criação de uma UDF. Primeiro se cria a função que seria executada sobre um elemento. E em seguida a registra como uma UDF indicando o tipo de retorno.

Para isso será necessário importar o módulo "functions".


In [None]:
# Função para transformar o número expresso em string em float
to_float = lambda v:float(v.replace(',','.'))

In [None]:
# teste
to_float('9,45')

In [None]:
# O segundo passo é registrar essa função como UDF
from pyspark.sql import functions as F
udf_to_float = F.udf(to_float, pyspark.sql.types.FloatType())

Com a função registrada, pode-se aplicar sobre o dataframe e criar uma coluna nova com o valor numérico. Entretanto, dado que o DataFrame é uma estrutura imutável, não é possível alterar o tipo da coluna, logo, deverá ser criada uma coluna adicional em um novo Dataframe.

In [None]:
df = df.withColumn("VALOR_PARCELA_FLOAT", udf_to_float(df['VALOR PARCELA']))

In [None]:
df.printSchema()

In [None]:
# Agora será possível somar os valores e identificar qual estado possui o maior valor em benefícios pagos
df.groupBy("UF").sum('VALOR_PARCELA_FLOAT')

In [None]:
# Para ordenar


In [None]:
# Para mostrar mais valores descritivos por UF
df.groupBy("UF").agg(F.count('VALOR_PARCELA_FLOAT').alias('QTDE'),\
                     F.avg('VALOR_PARCELA_FLOAT').alias('VALOR_MEDIO'),\
                     F.sum('VALOR_PARCELA_FLOAT').alias('SOMA_VALORES'))\
                     .orderBy('VALOR_MEDIO', ascending = False).show(30)

In [None]:
# Será que algum beneficiário recebeu um valor fora do normal??
# Como é a divisão de valores ao longo do ano?? Qual o mês que ocorreu maior desembolso para o programa??
# Qual será o primeiro nome mais comum?????

In [None]:
# Qual será o primeiro nome mais comum?????
def get_first_name(name):
  return name.split()[0]

In [None]:
df.printSchema()

In [None]:
df.groupBy('PRIMEIRO_NOME').count().orderBy('count', ascending = True).show()

Tente fazer essa mesma análise no Pandas... Observa algum ganho???

Documentação oficial em http://spark.apache.org/

Para consulta rápida aos comandos básicos, acessar: https://intellipaat.com/mediaFiles/2019/03/PySpark-SQL-cheat-sheet.pdf.