## PySpark RDD – Conjunto de dados distribuído resiliente

PySpark RDD (Resilient Distributed Dataset)  é uma estrutura de dados fundamental do PySpark que é uma coleção de objetos distribuídos imutáveis ​​e tolerantes a falhas, o que significa que, uma vez que você cria um RDD, você não pode alterá-lo. Cada conjunto de dados no RDD é dividido em partições lógicas, que podem ser computadas em diferentes nós do cluster.

In [3]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.master('local[1]').appName('sparkByTests.com').getOrCreate()

In [4]:
rdd=spark.sparkContext.parallelize([1, 2, 3, 4, 5])

In [5]:
rdd.count()

5

#  Criação de RDD 

Para criar um RDD, primeiro você precisa criar um SparkSession que é um ponto de entrada para o aplicativo PySpark . SparkSession pode ser criado usando um builder()ou newSession()métodos do SparkSession.

A sessão do Spark cria internamente uma sparkContextvariável de SparkContext.

OBS: Você pode criar vários objetos SparkSession, mas apenas um SparkContext por JVM. Caso você queira criar outro novo SparkContext, você deve parar o Sparkcontext existente (usando  stop()) antes de criar um novo.

In [6]:
#Create RDD
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()  

master() - Se estiver executando no cluster, você precisa usar seu nome mestre como um argumento para master(). 

Use local[x]ao executar no modo autônomo. x deve ser um valor inteiro e deve ser maior que 0; isso representa quantas partições ele deve criar ao usar RDD, DataFrame e Dataset. Idealmente, o valor x deve ser o número de núcleos de CPU que você possui.

appName() – Usado para definir o nome do seu aplicativo.

getOrCreate() – Isso retorna um objeto SparkSession se já existir, cria um novo se não existir.

##  parallelize() - método é usado para criar um RDD de uma lista.

In [7]:
#Create RDD from parallelize    
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd=spark.sparkContext.parallelize(data)

###  textFile() - método é usado para criar um texto RDD 

In [8]:
# Create RDD from external Data source
rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

### wholeTextFiles() - criar um PairRDD com a chave sendo o caminho do arquivo e o valor sendo o conteúdo do arquivo.

In [9]:
#Reads entire file into a RDD as single record.
rdd3 = spark.sparkContext.wholeTextFiles("/path/textFile.txt")

### emptyRDD() - cria um RDD vazio

In [10]:
# Creates empty RDD with no partition    
rdd = spark.sparkContext.emptyRDD 
# rddString = spark.sparkContext.emptyRDD[String]

## Criando RDD vazio com partição

In [11]:
#Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],10) #This creates 10 partitions

### getNumPartitions() – Esta é uma função RDD que retorna várias partições em que nosso conjunto de dados foi dividido.

### repartition() - o método que embaralha dados de todos os nós também chamado full shuffle

## Operações de RDD

No PySpark RDD, você pode realizar dois tipos de operações: Transformações RDD e Ações RDD

In [12]:
#lendo um arquivo txt
rdd = spark.sparkContext.textFile('test.txt')

## Transformações RDD ## 
retornam outro RDD e as transformações são preguiçosas, o que significa que não são executadas até que você chame uma ação no RDD. Algumas transformações nos RDDs são  flatMap(),  map(),  reduceByKey(),  filter()e  sortByKey() retornam um novo RDD em vez de atualizar o atual.

In [13]:
# Imprimir RDD após coletar os dados
for element in rdd.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
This eBook is for the use
of anyone anywhere
at no cost and with
Project Gutenberg’s
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
This eBook is for the use
of anyone anywhere
at no cost and with
Project Gutenberg’s
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
This eBook is for the use
of anyone anywhere
at no cost and with
Project Gutenberg’s
Alice’s Adventures in Wonderland
by

### flatMap() - a transformação nivela o RDD após aplicar a função e retorna um novo RDD.

In [14]:
rdd2 = rdd.flatMap(lambda x: x.split(" "))
#Print rdd2 result to console
print(rdd2.collect())

['Project', 'Gutenberg’s', 'Alice’s', 'Adventures', 'in', 'Wonderland', 'by', 'Lewis', 'Carroll', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'Alice’s', 'Adventures', 'in', 'Wonderland', 'by', 'Lewis', 'Carroll', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'Project', 'Gutenberg’s', 'Alice’s', 'Adventures', 'in', 'Wonderland', 'by', 'Lewis', 'Carroll', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'Alice’s', 'Adventures', 'in', 'Wonderland', 'by', 'Lewis', 'Carroll', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'This', 'eBook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with', 'Project', 'Gutenberg

### map() - a transformação é usada para aplicar qualquer operação complexa, como adicionar uma coluna, atualizar uma coluna, etc., a saída das transformações do mapa sempre teria o mesmo número de registros que a entrada.

In [15]:
rdd3 = rdd2.map(lambda x: (x,1))
#Print rdd3 result to console
print(rdd3.collect())

[('Project', 1), ('Gutenberg’s', 1), ('Alice’s', 1), ('Adventures', 1), ('in', 1), ('Wonderland', 1), ('by', 1), ('Lewis', 1), ('Carroll', 1), ('This', 1), ('eBook', 1), ('is', 1), ('for', 1), ('the', 1), ('use', 1), ('of', 1), ('anyone', 1), ('anywhere', 1), ('at', 1), ('no', 1), ('cost', 1), ('and', 1), ('with', 1), ('Alice’s', 1), ('Adventures', 1), ('in', 1), ('Wonderland', 1), ('by', 1), ('Lewis', 1), ('Carroll', 1), ('This', 1), ('eBook', 1), ('is', 1), ('for', 1), ('the', 1), ('use', 1), ('of', 1), ('anyone', 1), ('anywhere', 1), ('at', 1), ('no', 1), ('cost', 1), ('and', 1), ('with', 1), ('This', 1), ('eBook', 1), ('is', 1), ('for', 1), ('the', 1), ('use', 1), ('of', 1), ('anyone', 1), ('anywhere', 1), ('at', 1), ('no', 1), ('cost', 1), ('and', 1), ('with', 1), ('Project', 1), ('Gutenberg’s', 1), ('Alice’s', 1), ('Adventures', 1), ('in', 1), ('Wonderland', 1), ('by', 1), ('Lewis', 1), ('Carroll', 1), ('This', 1), ('eBook', 1), ('is', 1), ('for', 1), ('the', 1), ('use', 1), ('of

### reduceByKey() - mescla os valores de cada chave com a função especificada. 

In [16]:
rdd4 = rdd3.reduceByKey(lambda a,b: a + b)
#Print rdd4 result to console
print(rdd4.collect())

[('Project', 9), ('Gutenberg’s', 9), ('Alice’s', 18), ('Adventures', 18), ('in', 18), ('Wonderland', 18), ('by', 18), ('Lewis', 18), ('Carroll', 18), ('This', 27), ('eBook', 27), ('is', 27), ('for', 27), ('the', 27), ('use', 27), ('of', 27), ('anyone', 27), ('anywhere', 27), ('at', 27), ('no', 27), ('cost', 27), ('and', 27), ('with', 27)]


### sortByKey() - a transformação é usada para classificar elementos RDD na chave. 

In [17]:
rdd5 = rdd4.map(lambda x: (x[1],x[0])).sortByKey()
#Print rdd5 result to console
print(rdd5.collect())

[(9, 'Project'), (9, 'Gutenberg’s'), (18, 'Alice’s'), (18, 'Adventures'), (18, 'in'), (18, 'Wonderland'), (18, 'by'), (18, 'Lewis'), (18, 'Carroll'), (27, 'This'), (27, 'eBook'), (27, 'is'), (27, 'for'), (27, 'the'), (27, 'use'), (27, 'of'), (27, 'anyone'), (27, 'anywhere'), (27, 'at'), (27, 'no'), (27, 'cost'), (27, 'and'), (27, 'with')]


### filter() -  a transformação é usada para filtrar os registros em um RDD. Em nosso exemplo, estamos filtrando todas as palavras que começam com “a”.

In [18]:
rdd6 = rdd5.filter(lambda x : 'a' in x[1])
print(rdd6.collect())

[(18, 'Wonderland'), (18, 'Carroll'), (27, 'anyone'), (27, 'anywhere'), (27, 'at'), (27, 'and')]


## Ações de RDD
retorna os valores de um RDD para um nó de driver. Em outras palavras, qualquer função RDD que retorne não RDD[T] é considerada uma ação. 

Algumas ações em RDDs são count(), collect(), first(), max()e reduce()muito mais.

### count () – Retorna o número de registros em um RDD

In [19]:
# Action - count
print("Count : "+str(rdd6.count()))

Count : 6


### first () – Retorna o primeiro registro.

In [20]:
# Action - first
firstRec = rdd6.first()
print("First Record : "+str(firstRec[0]) + ","+ firstRec[1])

First Record : 18,Wonderland


### max () – Retorna o registro máximo.

In [21]:
# Action - max
datMax = rdd6.max()
print("Max Record : "+str(datMax[0]) + ","+ datMax[1])

Max Record : 27,at


### reduce () – Reduz os registros para single, podemos usar isso para contar ou somar.

In [22]:
# Action - reduce
totalWordCount = rdd6.reduce(lambda a,b: (a[0]+b[0],a[1]))
print("dataReduce Record : "+str(totalWordCount[0]))

dataReduce Record : 144


### take () – Retorna o registro especificado como argumento.

In [23]:
# Action - take
data3 = rdd6.take(3)
for f in data3:
    print("data3 Key:"+ str(f[0]) +", Value:"+f[1])

data3 Key:18, Value:Wonderland
data3 Key:18, Value:Carroll
data3 Key:27, Value:anyone


### collect () – Retorna todos os dados do RDD como um array. Tenha cuidado ao usar essa ação quando estiver trabalhando com RDD enorme com milhões e bilhões de dados, pois pode ficar sem memória no driver

In [24]:
# Action - collect
data = rdd6.collect()
for f in data:
    print("Key:"+ str(f[0]) +", Value:"+f[1])

Key:18, Value:Wonderland
Key:18, Value:Carroll
Key:27, Value:anyone
Key:27, Value:anywhere
Key:27, Value:at
Key:27, Value:and


### saveAsTextFile () – Usando a ação saveAsTestFile, podemos escrever o RDD em um arquivo de texto.

In [25]:
rdd6.saveAsTextFile("/tmp/wordCount")