# Spark #

In Spark we express our computation through operations on distributed collections that
are automatically parallelized across the cluster. These collections are called Resilient
Distributed Datasets, or RDDs. RDDs are Spark’s fundamental abstraction for dis‐
tributed data and computation.
Before we say more about RDDs, let’s create one in the shell from a local text file and
do some very simple ad-hoc analysis by following the example below.

In [2]:
# Create an RDD called lines
lines = sc.textFile("dadosmalariaCEA15P14.csv")

In [3]:
# Count the number of items in this RDD
lines.count()

601

In [4]:
# First item in this RDD, i.e. first line of ids_date_last_modified.csv
lines.first() 

'id;grupo;data;datnasc;idade;natural;bairro;cidade;profissa;estciv;escola;raca;locresid;tempresid;qntcomod;qntpesshab;banheiro;luz;gerador;radio;tv;telfixo;cel;bicicleta;moto;carro;canoa;barcomot;geladeira;agua;piso;telhado;janela;telajan;alimcons;verdhort;leitderiv;carneovo;feijao;arrozmilh;frutas;extrativ;cacapesc;outro;auxil;renda;pressao;fuma;fumagest;qntfuma;remedio;qualremed;drogas;alcool;alcoolgest;qntalcool;antecpess;antecespe;qntgest;paridade;nascvivo;nascmort;abortprov;partocesar;partonorm;anormnasc;qualanor;termgest;amament;pesomenor;nascmaior;ig1;movbb;tommed;sulfferro;acfolico;vitprenat;complb;complgest;sangra;igsangra;anemia;pressalt;diabete;transfsang;cortinado;dormeemb;mosqimpreg;grupo_cortin;borrifa;preventmal;medmal;qualmedmal;frequsomed;malvida;qtsmal;malgestant;qtsmalant;malgestatu;primoinfec;qtsmalatua;n1qualgest;n1qntsmal;n2qualgest;n2qntsmal;n3qualgest;n3qntsmal;n4qualgest;n4qntsmal;n5qualgest;n5qntsmal;n6qualgest;n6qntsmal;viagemrec;ondeviag;objetviag;outrobjet;

O programa driver, ou main, irá acessar o Spark atraves de um objeto SparkContext que representa a conexão com uma computação cluster. No shell do pyspark que abrimos aqui, esse objeto é criado automaticamente com o nome sc. 

In [5]:
sc

<pyspark.context.SparkContext at 0x7f0d800b5240>

In [6]:
GUAJARALines = lines.filter(lambda line: "GUAJARA" in line)

In [7]:
GUAJARALines.first()

'2;1;07/12/2012;03/09/1994;18;GUAJARA;;GUAJARA;DO LAR;1;3;1;1;5;4;5;1;1;0;1;1;0;0;1;0;0;0;0;1;5;2;2;2;0;1;0;0;1;0;0;0;0;0;0;1;1;0;0;;;0;;0;0;;;0;;1;1;;;;;;;;;;;;17;0;0;;;;;0;;;;;;0;1;0;0;2- Tem, n�o dorme, n�o impreg;0;0;0;;;1;1;0;;1;3- Sim;2;1;2;;;;;;;;;;;1;SANTA LUZIA;2;;1;4;1;37,2;86;22;110;60;53,5;62;159;21,16;24,52;17;09/08/2012;16/05/2013;0;1;0;0;1;132;;1;1;3;17;1;15854;07/12/2012;1;06/12/2012;1;1;0;1;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;1;PARACETAMOL;0;;1;;;1;02/01/2013;1;1;;0;;;;;;;;;;;0;;1;1;4;23;1;995;15/01/2013;1;13/01/2013;1;1;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;0;1;PARACETAMOL;0;;1;;;1;30/01/2013;1;1;;0;;;;;;;;;;;0;;0;;;9999;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;9999;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;9999;;;;;;;;;;;;;;;;;;;;;;;;;;;;1;02/01/2013;21;54,5;120;70;80;21;0;18;128;1;;1;13/02/2013;27;56,2;110;60;86;20;0;22;148;1;35,5;1;25/04/2013;37;62;120;60;88;18;2;32;144;1;36,1;0;;;;;;;;;;;;;1;1;;0;9;9;9;0;9;;1;18/04/2013;;12;35,2;88,3;30,

Spark’s RDDs are by default recomputed each time you run an action on
them. If you would like to reuse an RDD in multiple actions, you can ask Spark to
persist it using RDD.persist() .

In practice, you will often use persist() to load a subset of your data into memory
and query it repeatedly.

In [8]:
GUAJARALines.persist()

PythonRDD[7] at RDD at PythonRDD.scala:43

In [9]:
GUAJARALines.count()

25

# Creating RDDs #

Spark provides two ways to create RDDs: loading an external dataset and paralleliz‐
ing a collection in your driver program.

In [10]:
lines_p = sc.parallelize(["pandas", "i like pandas"])

In [11]:
lines_p.count()

2

# Transformations #

Transformations are operations on RDDs that return a new RDD.

In [12]:
SABOEIROLines = lines.filter(lambda line: "SABOEIRO" in line) # Novo RDD

In [13]:
SABOEIROLines.count()

9

In [14]:
SABOEIRO_GUAJARA = SABOEIROLines.join(GUAJARALines)

In [15]:
SABOEIRO_GUAJARA = SABOEIROLines.join(GUAJARALines)

# Actions #

They are the operations that return a final value to the driver
program or write data to an external storage system.

In [17]:
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "ERROR" in x)
warningsRDD = inputRDD.filter(lambda x: "WARN" in x)
badLinesRDD = errorsRDD.union(warningsRDD)

In [19]:
print ("Input had ", badLinesRDD.count()," concerning lines")
print ("Here are 10 examples:")
for line in badLinesRDD.take(10):
    print (line)

Input had  5  concerning lines
Here are 10 examples:
ERROR	php: dying for unknown reasons
ERROR	did mysql just barf?
ERROR	mysql cluster: replace with spark cluster
WARN	dave, are you angry at me?
WARN	xylons approaching


# Passing Functions to Spark

In [20]:
def containsError(s):
    return "error" in s

In [21]:
rdd = sc.textFile("log.txt")
word = rdd.filter(containsError)

# Element-wise transformations #

The two most common transformations you will likely be using are map() and filter().

<img src='map.png'>

Let’s look at a basic example of map() that squares all of the numbers in an RDD

In [22]:
nums = sc.parallelize([1, 2, 3, 4])

In [23]:
def quadrado(x):
    return x*x

In [24]:
numeros_ao_quadrado = nums.map(quadrado).collect()

In [26]:
for numero in numeros_ao_quadrado:
    print (numero)

1
4
9
16


Sometimes we want to produce multiple output elements for each input element. The
operation to do this is called flatMap().

<img src='img2.png'>

In [27]:
lines = sc.parallelize(["coffe panda", "happy panda", "happiest pandas party"])

In [28]:
words = lines.flatMap(lambda line: line.split(" "))

In [29]:
words.first()

'coffe'

In [31]:
for line in words.take(7):
    print (line)

coffe
panda
happy
panda
happiest
pandas
party


In [32]:
words2 = lines.map(lambda line: line.split())

In [34]:
for line in words2.take(3):
    print (line)

['coffe', 'panda']
['happy', 'panda']
['happiest', 'pandas', 'party']


# Pseudo set operations

RDDs support many of the operations of mathematical sets, such as union and inter‐
section, even when the RDDs themselves are not properly sets. Four operations are
shown in Figure 3-4. It’s important to note that all of these operations require that
the RDDs being operated on are of the same type.

<img src='set.png'>

In [35]:
RDD1 = sc.parallelize(['coffe', 'coffe', 'panda', 'monkey', 'tea'])

In [36]:
RDD2 = sc.parallelize(['coffe', 'monkey', 'kitty'])

In [41]:
distintos = RDD1.distinct()
for i in distintos.take(5):
    print(i)

coffe
panda
tea
monkey


In [43]:
uniao = RDD1.union(RDD2)
for i in uniao.take(10):
    print(i)

coffe
coffe
panda
monkey
tea
coffe
monkey
kitty


In [44]:
inter = RDD1.intersection(RDD2)
for i in inter.take(10):
    print(i)

coffe
monkey


In [46]:
sub = RDD1.subtract(RDD2)
for i in sub.take(10):
    print(i)

panda
tea


# Produto cartesiano

We can also compute a Cartesian product between two RDDs, as shown in
Figure 3-5. The cartesian(other) transformation returns all possible pairs of (a,
b) where a is in the source RDD and b is in the other RDD. The Cartesian product
can be useful when we wish to consider the similarity between all possible pairs, such
as computing every user’s expected interest in each offer. We can also take the Carte‐
sian product of an RDD with itself, which can be useful for tasks like user similarity.
Be warned, however, that the Cartesian product is very expensive for large RDDs.

<img src='cartesiano.png'>

In [47]:
RDD1 = sc.parallelize('Paus Ouros Copas Espadas'.split())

In [59]:
RDD2 = sc.parallelize([str(n) for n in range(2, 11)] + list('AKQJ'))

In [60]:
produto_cartesiano = RDD1.cartesian(RDD2)

In [61]:
for i in produto_cartesiano.take(60):
    print(i)

('Paus', '2')
('Paus', '3')
('Paus', '4')
('Paus', '5')
('Paus', '6')
('Paus', '7')
('Paus', '8')
('Paus', '9')
('Paus', '10')
('Paus', 'A')
('Paus', 'K')
('Paus', 'Q')
('Paus', 'J')
('Ouros', '2')
('Ouros', '3')
('Ouros', '4')
('Ouros', '5')
('Ouros', '6')
('Ouros', '7')
('Ouros', '8')
('Ouros', '9')
('Ouros', '10')
('Ouros', 'A')
('Ouros', 'K')
('Ouros', 'Q')
('Ouros', 'J')
('Copas', '2')
('Copas', '3')
('Copas', '4')
('Copas', '5')
('Copas', '6')
('Copas', '7')
('Copas', '8')
('Copas', '9')
('Copas', '10')
('Copas', 'A')
('Copas', 'K')
('Copas', 'Q')
('Copas', 'J')
('Espadas', '2')
('Espadas', '3')
('Espadas', '4')
('Espadas', '5')
('Espadas', '6')
('Espadas', '7')
('Espadas', '8')
('Espadas', '9')
('Espadas', '10')
('Espadas', 'A')
('Espadas', 'K')
('Espadas', 'Q')
('Espadas', 'J')
