#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# **Spark + Python = PySpark**

#### Esse notebook introduz os conceitos básicos do Spark através de sua interface com a linguagem Python. Como aplicação inicial faremos o clássico examplo de contador de palavras . Com esse exemplo é possível entender a lógica de programação funcional para as diversas tarefas de exploração de dados distribuídos.

#### Para isso utilizaremos o livro texto [Trabalhos completos de William Shakespeare](http://www.gutenberg.org/ebooks/100) obtidos do  [Projeto Gutenberg](http://www.gutenberg.org/wiki/Main_Page).  Veremos que esse mesmo algoritmo pode ser empregado em textos de qualquer tamanho.

#### ** Esse notebook contém:  **
#### *Parte 1:* Criando uma base RDD e RDDs de tuplas
#### *Parte 2:* Manipulando RDDs de tuplas
#### *Parte 3:* Encontrando palavras únicas e calculando médias
#### *Parte 4:* Aplicar contagem de palavras em um arquivo
#### Para os exercícios é aconselhável consultar a documentação da [API do PySpark](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

### ** Part 1: Criando e Manipulando RDDs **

#### Nessa parte do notebook vamos criar uma base RDD a partir de uma lista com o comando `parallelize`.

#### ** (1a) Criando uma base RDD **
#### Podemos criar uma base RDD de diversos tipos e fonte do Python com o comando `sc.parallelize(fonte, particoes)`, sendo fonte uma variável contendo os dados (ex.: uma lista) e particoes o número de partições para trabalhar em paralelo.

In [None]:
ListaPalavras = ['gato', 'elefante', 'rato', 'rato', 'gato']
palavrasRDD = sc.parallelize(ListaPalavras, 4)
print type(palavrasRDD)

#### ** (1b) Plural **

#### Vamos criar uma função que transforma uma palavra no plural adicionando uma letra 's' ao final da string. Em seguida vamos utilizar a função `map()` para aplicar a transformação em cada palavra do RDD.

#### Em Python (e muitas outras linguagens) a concatenação de strings é custosa. Uma alternativa melhor é criar uma nova string utilizando [`str.format()`](https://docs.python.org/2/library/string.html#format-string-syntax).

#### Nota: a string entre os conjuntos de três aspas representa a documentação da função. Essa documentação é exibida com o comando `help()`. Vamos utilizar a padronização de documentação sugerida para o Python, manteremos essa documentação em inglês.

In [None]:
# EXERCICIO
def Plural(palavra):
    """Adds an 's' to `palavra`.

    Args:
        palavra (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    return <COMPLETAR>

print Plural('gato')

In [None]:
help(Plural)

In [None]:
assert Plural('rato')=='ratos', 'resultado incorreto!'
print 'OK'

#### ** (1c) Aplicando a função ao RDD **
#### Transforme cada palavra do nosso RDD em plural usando [map()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map) 

#### Em seguida, utilizaremos o comando  [collect()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) que retorna a RDD como uma lista do Python.

In [None]:
# EXERCICIO
pluralRDD = palavrasRDD.<COMPLETAR>
print pluralRDD.collect()

In [None]:
assert pluralRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print 'OK'

#### ** Nota: ** utilize o comando `collect()` apenas quando tiver certeza de que a lista caberá na memória. Para gravar os resultados de volta em arquivo texto ou base de dados utilizaremos outro comando.

#### ** (1d) Utilizando uma função `lambda` **
#### Repita a criação de um RDD de plurais, porém utilizando uma função lambda.

In [None]:
# EXERCICIO
pluralLambdaRDD = palavrasRDD.<COMPLETAR>
print pluralLambdaRDD.collect()

In [None]:
assert pluralLambdaRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print 'OK'

#### ** (1e) Tamanho de cada palavra **
#### Agora use `map()` e uma função `lambda` para retornar o número de caracteres em cada palavra. Utilize `collect()` para armazenar o resultado em forma de listas na variável destino.

In [None]:
# EXERCICIO
pluralTamanho = (pluralRDD
                 <COMPLETAR>
                 )
print pluralTamanho

In [None]:
assert pluralTamanho==[5,9,5,5,5], 'valores incorretos'
print "OK"

#### ** (1f) RDDs de pares e tuplas **

#### Para contar a frequência de cada palavra de maneira distribuída, primeiro devemos atribuir um valor para cada palavra do RDD. Isso irá gerar um base de dados (chave, valor). Desse modo podemos agrupar a base através da chave, calculando a soma dos valores atribuídos. No nosso caso, vamos atribuir o valor `1` para cada palavra.

#### Um RDD contendo a estrutura de tupla chave-valor `(k,v) ` é chamada de RDD de tuplas ou *pair RDD*.

#### Vamos criar nosso RDD de pares usando a transformação  `map()` com uma função `lambda()`.

In [None]:
# EXERCICIO
palavraPar = palavrasRDD.<COMPLETAR>
print palavraPar.collect()

In [None]:
assert palavraPar.collect() == [('gato',1),('elefante',1),('rato',1),('rato',1),('gato',1)], 'valores incorretos!'
print "OK"

### ** Parte 2: Manipulando RDD de tuplas **

#### Vamos manipular nossa RDD para contar as palavras do texto.

#### ** (2a) Função `groupByKey()`  **

#### A função[groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) agrupa todos os valores de um RDD através da chave (primeiro elemento da tupla) agregando os valores em uma lista.

#### Essa abordagem tem um ponto fraco pois:
  + #### A operação requer que os dados distribuídos sejam movidos em massa para que permaneçam na partição correta.
  + #### As listas podem se tornar muito grandes. Imagine contar todas as palavras do Wikipedia: termos comuns como "a", "e" formarão uma lista enorme de valores que pode não caber na memória do processo escravo.
 

In [None]:
# EXERCICIO
palavrasGrupo = palavraPar.groupByKey()
for chave, valor in palavrasGrupo.collect():
    print '{0}: {1}'.format(chave, list(valor))

In [None]:
assert sorted(palavrasGrupo.mapValues(lambda x: list(x)).collect()) == [('elefante', [1]), ('gato',[1, 1]), ('rato',[1, 2])],
        'Valores incorretos!'
print "OK"

#### ** (2b) Calculando as contagens **
#### Após o  `groupByKey()` nossa RDD contém elementos compostos da palavra, como chave, e um iterador contendo todos os valores correspondentes aquela chave.
#### Utilizando a transformação `map()` e a função `sum()`, contrua um novo RDD que consiste de tuplas (chave, soma).

In [None]:
# EXERCICIO
contagemGroup = palavrasGrupo.<COMPLETAR>
print contagemGroup.collect()

In [None]:
assert sorted(contagemGroup.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"

#### ** (2c) `reduceByKey` **
#### Um comando mais interessante para a contagem é o  [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) que cria uma nova RDD de tuplas.

#### Essa transformação aplica a transformação `reduce()` vista na aula anterior para os valores de cada chave. Dessa forma, a função de transformação pode ser aplicada em cada partição local para depois ser enviada para redistribuição de partições, reduzindo o total de dados sendo movidos e não mantendo listas grandes na memória.

In [None]:
# EXERCICIO
contagem = palavrasGrupo.<COMPLETAR>
print contagem.collect()

In [None]:
assert sorted(contagem.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"

#### ** (2d) Agrupando os comandos **

#### A forma mais usual de realizar essa tarefa, partindo do nosso RDD palavrasRDD, é encadear os comandos map e reduceByKey em uma linha de comando.

In [None]:
# EXERCICIO
contagemFinal = (palavrasRDD
                 <COMPLETAR>
                 <COMPLETAR>
                 .collect())
print contagemFinal

In [None]:
assert sorted(contagemFinal.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"

### ** Parte 3: Encontrando as palavras únicas e calculando a média de contagem **

#### ** (3a) Palavras Únicas **

#### Calcule a quantidade de palavras únicas do RDD. Utilize comandos de RDD da API do PySpark e alguma das últimas RDDs geradas nos exercícios anteriores.

In [None]:
# EXERCICIO
palavrasUnicas = <COMPLETAR>
print palavrasUnicas

In [None]:
assert palavrasUnicas==3, 'valor incorreto!'
print "OK"

#### ** (3b) Calculando a Média de contagem de palavras **

#### Encontre a média de frequência das palavras utilizando o RDD `contagem`.

#### Note que a função do comando `reduce()` é aplicada em cada tupla do RDD. Para realizar a soma das contagens, primeiro é necessário mapear o RDD para um RDD contendo apenas os valores das frequências (sem as chaves).

In [None]:
# EXERCICIO
# add é equivalente a lambda x,y: x+y
from operator import add
total = (contagem
         <COMPLETAR>
         <COMPLETAR>
         )
media = total / float(palavrasUnicas)
print total
print round(media, 2)

In [None]:
assert round(media, 2)==1.67, 'valores incorretos!'
print "OK"

### ** Parte 4: Aplicar nosso algoritmo em um arquivo **

#### In this section we will finish developing our word count application.  We'll have to build the `wordCount` function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data.

#### ** (4a) Função `contaPalavras` **

#### Para podermos aplicar nosso algoritmo genéricamente em diversos RDDs, vamos primeiro criar uma função para aplicá-lo em qualquer fonte de dados. Essa função recebe de entrada um RDD contendo uma lista de chaves (palavras) e retorna um RDD de tuplas com as chaves e a contagem delas nessa RDD

In [None]:
# TODO: Replace <FILL IN> with appropriate code
def contaPalavras(chavesRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        chavesRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return (chavesRDD
            <COMPLETAR>
            <COMPLETAR>
           )

print contaPalavras(palavrasRDD).collect()

In [None]:
assert sorted(contaPalavras(palavrasRDD).collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"

#### ** (4b) Normalizando o texto **

#### Quando trabalhamos com dados reais, geralmente precisamos padronizar os atributos de tal forma que diferenças sutis por conta de erro de medição ou diferença de normatização, sejam desconsideradas. Para o próximo passo vamos padronizar o texto para:
  + #### Padronizar a capitalização das palavras (tudo maiúsculo ou tudo minúsculo).
  + #### Remover pontuação.
  + #### Remover espaços no início e no final da palavra.
 
#### Crie uma função `removerPontuacao` que converte todo o texto para minúscula, remove qualquer pontuação e espaços em branco no início ou final da palavra. Para isso, utilize a biblioteca [re](https://docs.python.org/2/library/re.html) para remover todo texto que não seja letra, número ou espaço, encadeando com as funções de string para remover espaços em branco e converter para minúscula (veja [Strings](https://docs.python.org/2/library/stdtypes.html?highlight=str.lower#string-methods)).

In [9]:
# EXERCICIO
import re
def removerPontuacao(texto):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        texto (str): A string.

    Returns:
        str: The cleaned up string.
    """
    return <COMPLETAR>
print removerPontuacao('Ola, quem esta ai??!')
print removerPontuacao(' Sem espaco e_sublinhado!')

ola  quem esta ai
sem espaco e sublinhado


In [None]:
assert removerPontuacao(' O uso de virgulas, embora permitido, nao deve contar. ')=='o uso de virgulas embora permitido nao deve contar', 'string incorreta!'
print "OK"

#### ** (4c) Carregando arquivo texto  **

#### Para a próxima parte vamos utilizar o livro [Trabalhos completos de William Shakespeare](http://www.gutenberg.org/ebooks/100) do [Projeto Gutenberg](http://www.gutenberg.org/wiki/Main_Page). 

#### Para converter um texto em uma RDD, utilizamos a função `textFile()` que recebe como entrada o nome do arquivo texto que queremos utilizar e o número de partições.

#### O nome do arquivo texto pode se referir a um arquivo local ou uma URI de arquivo distribuído (ex.: hdfs://).

#### Vamos também aplicar a função `removerPontuacao()` para normalizar o texto e verificar as 15 primeiras linhas com o comando `take()`.

In [None]:
# Apenas execute a célula
import os.path
import urllib

url = 'http://www.gutenberg.org/cache/epub/100/pg100.txt' # url do livro

arquivo = os.path.join('Data','Aula02','shakespeare.txt') # local de destino: 'Data/Aula02/shakespeare.txt'

if os.path.isfile(arquivo):     # verifica se já fizemos download do arquivo
    print 'Arquivo já existe!'
else:
    try:
        urllib.urlretrieve(url, arquivo) # salva conteúdo da url em arquivo
    except IOError:
        print 'Impossível fazer o download: {0}'.format(url)

# lê o arquivo com textFile e aplica a função removerPontuacao        
shakesRDD = (sc
             .textFile(arquivo, 8)
             .map(removerPontuacao)
             )

# zipWithIndex gera tuplas (conteudo, indice) onde indice é a posição do conteudo na lista sequencial
# Ex.: sc.parallelize(['gato','cachorro','boi']).zipWithIndex() ==> [('gato',0), ('cachorro',1), ('boi',2)]
# sep.join() junta as strings de uma lista através do separador sep. Ex.: ','.join(['a','b','c']) ==> 'a,b,c'
print '\n'.join(shakesRDD
                .zipWithIndex()
                .map(lambda (linha, num): '{0}: {1}'.format(num,linha))
                .take(15)
               )

#### ** (4d) Extraindo as palavras **
#### Antes de poder usar nossa função Before we can use the `contaPalavras()`, temos ainda que trabalhar em cima da nossa RDD:
  + #### Precisamos gerar listas de palavras ao invés de listas de sentenças.
  + #### Eliminar linhas vazias.
 
#### As strings em Python tem o método [split()](https://docs.python.org/2/library/string.html#string.split) que faz a separação de uma string por separador. No nosso caso, queremos separar as strings por espaço. 

#### Utilize a função `map()` para gerar um novo RDD como uma lista de palavras.

In [None]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.<COMPLETAR>
total = shakesPalavrasRDD.count()
print shakesPalavrasRDD.take(5)
print total

#### Conforme deve ter percebido, o uso da função `map()` gera uma lista para cada linha, criando um RDD contendo uma lista de listas.

#### Para resolver esse problema, o Spark possui uma função análoga chamada `flatMap()` que aplica a transformação do `map()`, porém *achatando* o retorno em forma de lista para uma lista unidimensional.

In [None]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.<COMPLETAR>
total = shakesPalavrasRDD.count()
print shakesPalavrasRDD.take(5)
print total

In [None]:
assert total==927631 or total == 928908, "valor incorreto de palavras!"
print "OK"
assert shakesPalavrasRDD.top(5)==[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds'],'lista incorreta de palavras'
print "OK"

#### ** (4e) Remover linhas vazias **

#### Para o próximo passo vamos filtrar as linhas vazias com o comando `filter()`. Uma linha vazia é uma string sem nenhum conteúdo.

In [None]:
# EXERCICIO
shakesLimpoRDD = shakesPalavrasRDD.<COMPLETAR>
total = shakesLimpoRDD.count()
print shakeWordCount

In [None]:
assert total==882996, 'valor incorreto!'
print "OK"

#### ** (4f) Contagem de palavras **
#### Agora que nossa RDD contém uma lista de palavras, podemos aplicar nossa função `contaPalavras()`.

#### Aplique a função em nossa RDD e utilize a função `takeOrdered` para imprimir as 15 palavras mais frequentes.

#### `takeOrdered()` pode receber um segundo parâmetro que instrui o Spark em como ordenar os elementos. Ex.:

#### `takeOrdered(15, key=lambda x: -x)`: ordem decrescente dos valores de x

In [None]:
# EXERCICIO
top15 = <COMPLETAR>
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15))

In [None]:
assert top15 == [(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463),
                   (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890),
                   (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678)],'valores incorretos!'
print "OK"