Apache Spark é um framework para criação de programas de processamento de dados distribuídos e aprendizado de máquina em grande escala.

PySpark é uma biblioteca (API) escrita em Python para executar aplicativos usando recursos do Apache Spark.

In [None]:
from IPython.display import Image
Image(filename='images/pyspark.jpg') 

In [None]:
import findspark
findspark.init()import pyspark
sc = pyspark.SparkContext(appName="Aula 15 set")

In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

SparkContext.setSystemProperty('spark.executor.memory', '2g')

spark = SparkSession \
    .builder \
    .appName("Aula CEAUT - Pypark") \
    .config("spark.driver.cores", 2) \
    .master("spark://vm:7077") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
### Referências:

# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
# https://realpython.com/pyspark-intro/
# https://sparkbyexamples.com/pyspark-tutorial/
# https://spark.apache.org/docs/latest/quick-start.html
    
from IPython.display import Image
Image(filename='images/spark-definitive.jpg') 

Spark inicialmente foi escrito em Scala e Java. Mais tarde, teve suporte de outras APIs como Pyspark, SparkR e SQL.

Py4J é uma biblioteca Java que está integrada ao PySpark e permite que o python faça interface dinâmica com objetos JVM. Portanto, para executar o PySpark, você também precisa que o Java seja instalado junto com o Python e o Apache Spark.

In [None]:
from IPython.display import Image
Image(filename='images/pyspark-features.jpg') 

## Características do Pyspark



- Processamento em memória
- Processamento distribuído usando paralelização
- Pode ser usado com vários gerenciadores de cluster (Spark, Yarn, Mesos, Kubenetes)
- Tolerante a falhas
- Imutável
- Lazy load (avaliação preguiçosa)
- Cache e persistência
- Otimização embutida ao usar DataFrames
- Suporta ANSI SQL

## Vantagens do Spark

- O suporte a processamento em batches ou real time é nativo, inclusive no contexto de aprendizado de máquina, oferecendo bibliotecas similares às existentes no desenvolvimento python tradicional.
- Executa operações 100 vezes mais rápido do que o framework Hadoop.
- Agilidade para processamento de pipelines de ingestão de dados offline e/ou streaming.
- Permite trabalhar com dados do Hadoop HDFS, AWS S3, soquet, e muitos outros sistemas de arquivos e bancos de dados.
- Plotagem de gráficos e descrição de dados.

## Arquitetura

In [None]:
from IPython.display import Image
Image(filename='images/pyspark-archtecture.jpg') 

O Apache Spark trabalha em uma arquitetura master-slave onde o mestre é chamado de "Driver" e os escravos são chamados de "Workers". 

Ao executar uma aplicação, o Driver cria um contexto que é um ponto de entrada do programa e todas as operações (transformações e ações) são executadas nos workers, e os recursos são gerenciados pelo Cluster Manager.

## Tipos de gerenciadores de cluster

Atualmente, Spark oferece suporte aos seguintes gerenciadores de cluster:

- Standalone - gerenciador de cluster simples incluído no Spark que facilita a configuração de um cluster.
- Apache Mesos - gerenciador de cluster que também pode executar aplicativos Hadoop MapReduce e PySpark.
- Hadoop YARN - o gerenciador de recursos padrão do Hadoop 2. Ele é mais usado gerenciador de cluster.
- Kubernetes - um sistema de código aberto para automatizar a implantação, dimensionamento e gerenciamento de aplicativos em contêineres.

- local - não é realmente um gerenciador de cluster... mas usamos ele para executar o Spark localmente no laptop/computador.

## Pyspark módulos e pacotes

- PySpark RDD (pyspark.RDD) (https://spark.apache.org/docs/latest/rdd-programming-guide.html)
- PySpark DataFrame and SQL (pyspark.sql) (https://spark.apache.org/docs/latest/sql-getting-started.html)
- PySpark Streaming (pyspark.streaming)
- PySpark MLib (pyspark.ml, pyspark.mllib)
- PySpark GraphFrames (GraphFrames)
- PySpark Resource (pyspark.resource) It’s new in PySpark 3.0

- Terceiros (https://spark-packages.org/)

## Instalação

### Java 8



sudo apt install openjdk-8-jdk



### Download do Spark (necessário para uso em cluster)



https://spark.apache.org/downloads.html

### Configurações e variáveis de ambiente

 JAVA_HOME, SPARK_HOME, HADOOP_HOME

### Instalação via gerenciador de pacote

sudo apt install pyspark

## Pyspark Shell

In [None]:
from IPython.display import Image
Image(filename='images/pyspark-shell.jpg') 

### Spark Web UI

Quando iniciado, o Apache Spark fornece um conjunto de UIs disponíveis em formato Web contendo guias para visualização de jobs, estágios, tarefas, armazenamento, ambiente, executores, e monitoração do status da aplicação.

http://localhost:4040

In [None]:
from IPython.display import Image
Image(filename='images/spark-web-ui.jpg') 

<h1 style='color:blue; font-size:30px'> PySpark RDD – Resilient Distributed Dataset </h1>

O PySpark RDD (Resilient Distributed Dataset) é uma estrutura de dados fundamental do PySpark que consiste em uma coleção de objetos distribuídos de maneira imutável e tolerante a falhas. 
Após a criação de um RDD, não é possível alterá-lo. 
Cada conjunto de dados em RDD é dividido em partições lógicas, que podem ser calculadas em diferentes nós do cluster.

## Criação de um RDD

Para criar um RDD, é necessário ter uma SparkSession, que é um ponto de entrada para o aplicativo PySpark. SparkSession pode ser criado usando um método builder() ou newSession() do SparkSession.

É possível criar vários objetos SparkSession, mas apenas um SparkContext por JVM. Caso queira criar outro novo SparkContext, você deve parar o Sparkcontext existente (usando stop ()) antes de criar um novo.

In [None]:
# Referências
# https://spark.apache.org/docs/latest/configuration.html

In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
#sc =SparkContext()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Aula Spark") \
    .config("spark.driver.cores", 1000) \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

## Criar um RDD com parallelize

SparkContext tem várias funções para usar com RDDs. Por exemplo, seu método parallelize () é usado para criar um RDD a partir de uma lista.

In [None]:
#Create RDD from parallelize    
dataList = [("Java", 1995), ("Python", 1992), ("Scala", 2001)]
rdd=sc.parallelize(dataList)

In [None]:
# https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
# Criar RDD a partir de um arquivo de texto
rdd2 = sc.textFile("dataset-aula/apache-spark-wiki.txt")

In [None]:
rdd.collect()

#for x in rdd.collect():
#    print (x)
    
rdd.take(2)

## Operações com RDD

In [None]:
# Referências

# https://sparkbyexamples.com/pyspark/pyspark-rdd-transformations/
# https://sparkbyexamples.com/pyspark/pyspark-rdd-actions/

- Transformations: As transformações no Spark RDD retornam outro RDD e as transformações são lentas, o que significa que não são executadas até que você chame uma ação no RDD. Algumas transformações em RDDs são flatMap(), map(), reduceByKey(), filter(), sortByKey() e retornar um novo RDD em vez de atualizar o atual.

- Actions - A operação de ação RDD retorna os valores de um RDD para um nó de driver. Em outras palavras, qualquer função RDD que retorna não RDD [T] é considerada uma ação. Algumas ações em RDDs são count(), collect(), first(), max(), reduce() e muito mais.

<h1 style='color:blue; font-size:30px'> Spark Dataframe </h1>

DataFrame é uma coleção distribuída de dados organizados em colunas nomeadas. É conceitualmente equivalente a uma tabela em um banco de dados relacional ou um quadro de dados em R / Python, porém mais otimizado. Os DataFrames podem ser construídos a partir de uma ampla variedade de fontes, como arquivos de dados estruturados, tabelas no Hive, bancos de dados externos ou RDDs existentes.

Por ser integrado ao Spark, permite execução em cluster, tornando-o mais rápido que o Pandas (python) tradicional.

In [None]:
from IPython.display import Image
Image(filename='images/spark-sql-sources.jpg') 

## Converter PySpark RDD para DataFrame

In [None]:
# Referências
# https://sparkbyexamples.com/pyspark/convert-pyspark-rdd-to-dataframe/

### Usando a função rdd.toDF()

In [None]:
dept = [("Finanças",10), 
        ("Marketing",20), 
        ("Vendas",30), 
        ("TI",40) 
      ]
rdd = spark.sparkContext.parallelize(dept)

In [None]:
df = rdd.toDF()

In [None]:
df.printSchema()

In [None]:
df.show()

In [None]:
deptColumns = ["dept_name","dept_id"]
df2 = rdd.toDF(deptColumns)
df2.printSchema()
df2.show(truncate=False)

### Usando createDataFrame() com StructType schema

In [None]:
# https://spark.apache.org/docs/latest/sql-ref-datatypes.html

from pyspark.sql.types import StructType,StructField, StringType, LongType
deptSchema = StructType([       
    StructField('dept_name', StringType(), True),
    StructField('dept_id', LongType(), True)
])

deptDF1 = spark.createDataFrame(data=dept, schema = deptSchema)
deptDF1.printSchema()
deptDF1.show(truncate=False)

<h1 style='color:blue; font-size:30px'> Spark SQL </h1>

In [None]:
# Referências

# https://spark.apache.org/docs/latest/sql-getting-started.html

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
sc = spark.sparkContext

In [None]:
from pyspark.sql import Row
l = [('Ana',25),('Beto',22),('Claudia',20),('Dani',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
df = spark.createDataFrame(people)

In [None]:
df.show()

In [None]:
# print do esquema em formato de árvore

df.printSchema()

In [None]:
# selecionar apenas a coluna name

df.select("name", "age").show()

In [None]:
# selecionar a coluna name, selecionar a coluna age e acrescentar 1

df.select(df['name'], df['age'] + 1).show()

In [None]:
# selecionar registros com age > 21

df.filter(df['age'] > 21).show()

In [None]:
# contar registros por age

df.groupBy("age").count().show()

In [None]:
# Registrar o DataFrame como uma view temporária

df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

# Inferindo o esquema via programação (estudar!)

In [None]:
# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("spark-master/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) 
          for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT * FROM people")

results.show()

<h1 style='color:blue; font-size:30px'> Exemplo prático - Dataset de servidores federais </h1>

# Necessário fazer download do dataset

In [None]:
from IPython.display import Image
Image(filename='images/portal-transparencia.jpg') 

## Procedimento para fazer download, renomear e mover os arquivos

In [None]:
# Fazer download dos dataset disponível em: http://www.portaltransparencia.gov.br/download-de-dados/servidores/202001_Servidores
# descompactar
# renomear 202001_Remuneracao.csv para remuneracao-servidores.csv
# renomear 202001_Cadastro.csv para cadastro-servidores.csv
# mover estes arquivos para o diretório 07-nov-2020/dataset-aulas

## Procedimento para converter arquivo para UTF-8

In [None]:
# Usando um terminal, navegar até o diretório dataset-aulas onde estão os arquivos csv 
# e executar os seguintes comandos

In [None]:
iconv -f ISO-8859-1 -t UTF-8 datasets/remuneracao-servidores.csv > remuneracao-servidores-utf8.csv

In [None]:
iconv -f ISO-8859-1 -t UTF-8 cadastro-servidores.csv > cadastro-servidores-utf8.csv

## Processamento do arquivo remuneracao-servidores

In [None]:
df = spark.read.csv("datasets/remuneracao-servidores-utf8.csv")
df.printSchema()

In [None]:
df = spark.read.format("com.databricks.spark.csv") \
.option("header","true") \
.option("delimiter", ";") \
.option("inferSchema", "true") \
.load("datasets/remuneracao-servidores-utf8.csv")

In [None]:
df.printSchema()

In [None]:
# 554389

In [None]:
df.count()

In [None]:
df.show()

In [None]:
df.show(1, truncate=False)

In [None]:
df.select("NOME", "REMUNERAÇÃO BÁSICA BRUTA (R$)").show(truncate=False)


In [None]:
to_value = lambda v: float(v.replace(",","."))

In [None]:
to_value("90,7")

### UDF User Defined Function

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

In [None]:
pyspark

In [None]:
# udf(função, retorno)
udf_to_value = F.udf(to_value, FloatType())

In [None]:
df2 = df.withColumn("value",udf_to_value(df["REMUNERAÇÃO BÁSICA BRUTA (R$)"]))

In [None]:
df2.printSchema()

In [None]:
df2.select("value").show(truncate=False)

In [None]:
df2.describe("REMUNERAÇÃO BÁSICA BRUTA (R$)").show()

## Processamento do arquivo cadastro-servidores

In [None]:
df = spark.read.format("com.databricks.spark.csv") \
.option("header","true") \
.option("delimiter", ";") \
.option("inferSchema", "true") \
.load("datasets/cadastro-servidores-utf8.csv")

In [None]:
df.printSchema()

In [None]:
df.count()

In [None]:
df.filter((df["DESCRICAO_CARGO"]=="DATILOGRAFO")).count()

In [None]:
df.filter((df["DESCRICAO_CARGO"]=="DATILOGRAFO") & (df["NOME"]=="NOME...")).show(truncate=False)

In [None]:
df.filter(df["ORGSUP_LOTACAO"]=="Ministério da Economia").count()

In [None]:
df.select("ORGSUP_LOTACAO").distinct().count()

In [None]:
df.groupBy("ORGSUP_LOTACAO").count().orderBy("count", ascending=False).show(truncate=False)