# Aula 1 - Spark Fundamentos

## Configuração de ambiente

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install -q pyspark

[K    100% |████████████████████████████████| 215.7MB 75kB/s 
[K    100% |████████████████████████████████| 204kB 27.3MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h

In [0]:
#Variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

## Let's code!

Depois de tudo instalado, finalmente está na hora da diversão! Irei executar alguns comandos no Google Colab. Para entender todas as possibilidades que você tem, basta checar a documentação da API do PySpark em [6]. Primeiro, é necessário importar a API que será utilizada. Para esse exemplo, irei importar tudo da API pyspark.sql.

from pyspark.sql import *

Com esse simples comando, todas as funções do pyspark.sql estão disponíveis para uso. Iremos utilizar uma base de exemplos disponíveis dentro do próprio Google Colab. Para ler a base, basta executar o comando abaixo. Para mais informações referente a base, consulte [7].

In [0]:
#Iniciando uma sessão no Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Após a leitura, executei também o comando dataset.columns, que irá apresentar as colunas disponíveis no dataframe. 

In [4]:
#Importando um arquivo json
dataset = spark.read.format("json").option("multiLine",True).load("sample_data/anscombe.json")
dataset.columns

['Series', 'X', 'Y']

É possível você usar o comando show(), para verificar os valores. Conforme abaixo

In [5]:
#Vendo os 10 primeiros itens do nosso dataset
dataset.show(10)

+------+----+-----+
|Series|   X|    Y|
+------+----+-----+
|     I|10.0| 8.04|
|     I| 8.0| 6.95|
|     I|13.0| 7.58|
|     I| 9.0| 8.81|
|     I|11.0| 8.33|
|     I|14.0| 9.96|
|     I| 6.0| 7.24|
|     I| 4.0| 4.26|
|     I|12.0|10.84|
|     I| 7.0| 4.81|
+------+----+-----+
only showing top 10 rows



Agora que carregamos nossa base, verificamos que X e Y são possivelmente valores contínuos. Series é um campo categórico. Para isso, iremos usar o comando dtypes, para verificar as colunas.

In [6]:
#Verificando o Schema 
dataset.printSchema()

root
 |-- Series: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)



Ótimo, agora temos certeza que X e Y são valores contínuos e que Series é categórico. Por fim, vamos realizar um agrupamento na coluna Series e fazer a média dos valores de X e Y. Conforme descrição da base, ela contém 4 diferentes datasets com basicamente os mesmos valores de estatística descritiva. Será que conseguimos encontrar isso?

In [7]:

from pyspark.sql import functions as F
dataset_agrupado = dataset.groupBy("Series") \
                          .agg(F.avg("X").alias("X_agrupado"), F.avg("Y").alias("Y_agrupado"))  \
                          .orderBy("Series")

dataset_agrupado.show()

+------+----------+-----------------+
|Series|X_agrupado|       Y_agrupado|
+------+----------+-----------------+
|     I|       9.0|              7.5|
|    II|       9.0|7.500909090909091|
|   III|       9.0|7.500000000000001|
|    IV|       9.0| 7.50090909090909|
+------+----------+-----------------+



Excelente! O resultado é exatamente o esperado. Agora, vamos dar uma olhada nesse código mais a fundo. Na primeira linha eu importo novamente a API de funções com o alias F. Essa é uma das formas de se realizar o procedimento. Eu poderia simplesmente ter escrito “functions.avg()” no lugar de “F.avg()”, porém, prefiro usar o alias “F” nos meus códigos.

Na segunda linha é definido todo o processamento em Spark para como se obter o valor agrupado de X e Y. Primeiro eu realizo um “groupBy”, que é uma função agregadora. Basicamente o que essa função faz é pegar os valores distintos da coluna “Séries” e usar como agrupador para os próximos comandos. Na segunda linha eu acho o comando .agg() é utilizado para realizar operação de agregação. F.avg é a função utilizada para calcular a média de um determinado campo e, por fim, .alias() é utilizado para definir o nome do campo.

Se você executar esse comando sem o .show(), verá que ele é instantâneo. Isso não significa que o Spark já processou tudo e tem tudo calculado na memória, na verdade, isso apenas significa que ele montou o pipeline de execução para obter o valor que você deseja visualizar. Para que ele efetivamente processe o comando, você precisa usar uma ação, no nosso caso, um .show().

Para dar uma pequena espiada em como o Spark funciona, execute o comando abaixo.

In [8]:
dataset_agrupado.explain()

== Physical Plan ==
*(3) Sort [Series#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(Series#0 ASC NULLS FIRST, 200)
   +- *(2) HashAggregate(keys=[Series#0], functions=[avg(X#1), avg(Y#2)])
      +- Exchange hashpartitioning(Series#0, 200)
         +- *(1) HashAggregate(keys=[Series#0], functions=[partial_avg(X#1), partial_avg(Y#2)])
            +- *(1) FileScan json [Series#0,X#1,Y#2] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/content/sample_data/anscombe.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Series:string,X:double,Y:double>
