In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
# Criação de uma RDD simples

rdd1 = sc.parallelize([1, 2, 3])
rdd1.collect()

[1, 2, 3]

In [5]:
# Utilizando o NumPy
# Biblioteca do Python para trabalhar com Matemática

import numpy as np
A = np.array(range(100))
A

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
       34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
       51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
       68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84,
       85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99])

In [4]:
rdd2 = sc.parallelize(np.array(range(100)))
rdd2.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [6]:
# Verificando a paralelização
# Observe que a paralelização da RDD não ocorreu. Ela não foi dividida

rdd3 = sc.parallelize(np.array(range(100)))
print(rdd3.glom().collect())

[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], [12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23], [24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35], [36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47], [48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71], [72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83], [84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]]


# O particionamento das RDDs é feito por padrão pelo tamanho do Cluster, mas também podemos específicar explicitamente, de preferência com o número de cores do PC.

In [7]:
# Paralelizando explicitamente 

rdd4 = sc.parallelize(np.array(range(100)), 10)

In [8]:
# Verificando a paralelização

print(rdd4.glom().collect())

[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 45, 46, 47, 48, 49], [50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69], [70, 71, 72, 73, 74, 75, 76, 77, 78, 79], [80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]]


In [9]:
# Número de partições

print(rdd4.getNumPartitions())

10


# Paralelizando arquivos

In [16]:
# Lendo arquivo.

rddArquivo = sc.textFile("longa historia.txt")

print(rddArquivo.take(100))

['A longa duração é um conceito histórico criado pelo francês Fernand Braudel.', '', "Utilizado por ele em sua tese doutorado de 1949 intitulada La Méditerranée et le monde méditerranéen à l'époque de Philippe II, introduziu uma nova maneira de abordagem dos acontecimentos históricos: aqueles que transcorrem na longa duração.", '', 'Ao lado da história tradicional (dita fatual e condenada pela Escola dos Annales) das "oscilações breves, rápidas, nervosas"[1] e da história cíclica e conjuntural (história econômica e social) caracterizada pelas fases lentas, ele introduz a história quase imóvel que se interessa pelos fenômenos extremamente longos (evolução das paisagens, história do homem na sua relação com o meio). Em 1958, por ocasião de uma controvérsia com Claude Lévi-Strauss, ele teoriza em sua obra A Longa Duração o modelo da pluralidade dos tempos históricos (estrutural / conjuntural / fatual).']


In [17]:
print(rddArquivo.getNumPartitions())

2


In [18]:
rddArquivoPart = sc.textFile("longa historia.txt", 100)

In [19]:
print(rddArquivoPart.getNumPartitions())

105


In [20]:
print(rddArquivoPart.glom().collect())

[['A longa duração é um conceito histórico criado pelo francês Fernand Braudel.'], [], [], [], [], [], [], [], [], ['', "Utilizado por ele em sua tese doutorado de 1949 intitulada La Méditerranée et le monde méditerranéen à l'époque de Philippe II, introduziu uma nova maneira de abordagem dos acontecimentos históricos: aqueles que transcorrem na longa duração."], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], ['', 'Ao lado da história tradicional (dita fatual e condenada pela Escola dos Annales) das "oscilações breves, rápidas, nervosas"[1] e da história cíclica e conjuntural (história econômica e social) caracterizada pelas fases lentas, ele introduz a história quase imóvel que se interessa pelos fenômenos extremamente longos (evolução das paisagens, história do homem na sua relação com o meio). Em 1958, por ocasião de uma controvérsia com Claude Lévi-Strauss, ele teoriza em sua obra A Longa Duração o modelo da pluralidade d

## Leitura da Bíblia

In [None]:
# Verifique se o arquivo está em ANSI

rddANSI = sc.textFile("Biblia-ANSI.txt")

print(rddANSI.take(1000))

In [None]:
# Altere o arquivo para UTF-8

rddBiblia = sc.textFile("Biblia-UTF8.txt")

print(rddBiblia.take(1000))

# Transformações

1. São operações em um RDD que devolvem um novo RDD
2. Normalmente executam uma função anônima(lambda) sobre cada um dos elementos RDD
3. Operam sobre Lazy

# Utilizando o Intersect

In [24]:
dados1 = sc.parallelize(["A", "B", "C", "D", "E"])
dados2 = sc.parallelize(["A", "E", "I", "O", "U"])

result = dados1.intersection(dados2)
result.take(10)

['E', 'A']

In [None]:
jose = rddBiblia.filter(lambda linha: "José" in linha)

In [None]:
maria = rddBiblia.filter(lambda linha: "Maria" in linha)

In [None]:
biblia = jose.intersection(maria)

# Ações

1. Devolvem um resultado
2. Faz todas as transformações anteriores serem executadas

# Utilizando o takeSample para gerar amostras

In [28]:
# TRUE - Possibilita elementos REPETIDOS
# Numero = Indica o tamanho da amostra

# Criando um RDD com range de números
rdd5 = sc.parallelize(np.array(range(100)))

# Contando o número de elementos
contador = rdd5.count()
                      
# Imprimindo o número de elementos
print("Número de elementos do range {0}".format(contador))
                      
# Utilizando um FOR para imprimir cada elemento
# AMOSTRA ALEATÓRIA
for l in rdd5.takeSample(True, 8):
    print(l)

Número de elementos do range 100
38
21
2
51
97
63
58
60


In [27]:
type(rdd5)

pyspark.rdd.RDD

In [None]:
lines = biblia.count()

print("Número de linhas {0}".format(lines))

for l in biblia.takeSample(False, 5):
    print(l)

In [None]:
# Podendo vir elementos repetidos

for l in biblia.takeSample(True, 10):
    print(l)

# Podemos utilizar variáveis

In [None]:
rddBiblia = sc.textFile("Biblia-UTF8.txt")

A = "Jesus"
B = "Cristo"

linhas1 = rddBiblia.filter(lambda linha: A in linha)
linhas2 = rddBiblia.filter(lambda linha: B in linha)

inter = linhas1.intersection(linhas2)

lines = inter.count()

print("Número de linhas: " + str(lines))

for l in inter.takeSample(False, 10):
    print(l)

# Se quisermos todas as linhas?

In [None]:
rddBiblia = sc.textFile("Biblia-UTF8.txt")

A = "Jesus"
B = "Cristo"

linhas1 = rddBiblia.filter(lambda linha: A in linha)
linhas2 = rddBiblia.filter(lambda linha: B in linha)

inter = linhas1.intersection(linhas2)

lines = inter.count()

print("Número de linhas: " + str(lines))

inter.takeSample(False, lines)

# Podemos utilizar collect()

In [None]:
# Podemos iterar?

A = "Jesus"
B = "Cristo"

linhas1 = rddBiblia.filter(lambda linha: A in linha)
linhas2 = rddBiblia.filter(lambda linha: B in linha)

inter = linhas1.intersection(linhas2)

lines = inter.count()

print("Número de linhas: " + str(lines))
print("Tipo da RDD inter: " + str(type(inter)))

for l in inter.collect():
    print(l)

In [None]:
A = "Jesus"
B = "Cristo"

linhas1 = rddBiblia.filter(lambda linha: A in linha)
linhas2 = rddBiblia.filter(lambda linha: B in linha)

inter = linhas1.intersection(linhas2)

lines = inter.count()

print("Número de linhas: " + str(lines))

inter.collect()