# PySpark - Aula 1

Assim como vimos nas aulas anteriores, o Spark Core consege trabalhar com 4 linguagens de programação que possuem APIs próprias que são Scala, Java, R e Python. 

Já sabemos como realizar algumas transformações, e ações de processing no Scala e a ideia dessa aula é realizarmos algumas das mesmas operações usando o Python. Porém vamos elencar aqui duas diferenças fundamentais da API de Python que são:
- O Python é uma linguagem de programação de alto nível, então os RDDs podem armazenar objetos de múltiplos tipos.[Uma discussão importante sobre isso, está nessa discussão no Stack Overflow.](http://stackoverflow.com/questions/1517582/what-is-the-difference-between-statically-typed-and-dynamically-typed-languages). Resumindo: Linguagem de alto nível: O tipo da variável é associado em _run-time_ em linguagem de baixo nível, ela é alocada no tempo de compilação. 

- PySpark não tem suporte para algumas chamadas de API, como lookup, e input files que não sejam texto. 


In [59]:
import os
import sys

spark_path = "/Users/flavio.clesio/Documents/spark-2.1.0" 

os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path

sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip") # Must be the same version of your Spark Version

Para o uso do PySpark recomenda-se uma versão do Python acima da 2.6 (Atenção: Evitem utilizar qualquer tipo de versão abaixo da 3.0+, dado que uma míriade de aplicações e pacotes estão migrando para versões mais atualiadas do Python).

## Configuração de Ambiente

Para instanciar o seu ambiente, basta usar o comando `bin/pyspark` dentro do diretório do Spark da maquina `MORTAR`.

Ambiente instanciado, vamos primeiramente inicializar o contexto do Spark executando o comando abaixo no [REPL](https://en.wikipedia.org/wiki/Read%E2%80%93eval%E2%80%93print_loop) do PySpark.

Para fazer isso, vamos realizar alguns imports do contexto:

In [None]:
from pyspark import SparkContext
from pyspark import SparkConf

Após a realização do import das classes que vão instanciar o PySpark, vamos definir o nosso contexto, que neste caso será chamado de `sc`.

In [None]:
sc = SparkContext("local"
                  ,"Aula com PySpark")

Neste caso abrimos um contexto `local` (isto é, será executado na máquina que vocês estão utilizando) e o nome da aplicação será *Aula com PySpark*.

Vamos checar se o nosso contexto do Spark foi instanciado executando o comando abaixo:

In [None]:
sc

Com o nosso contexto inicializado, vamos ver qual é a versão do Spark que está em uso. 

In [None]:
sc.version 

Agora que sabemos a versão do Spark, vamos checar a nossa versão do Python. 

In [None]:
sc.pythonVer 

Para identificar qual é a máquina ou o IP de onde está instanciado a sessão do Spark, basta executar o comando `sc.master`.

In [None]:
sc.master 

Para buscar o caminho dos nós que estão parametrizados como workers, o comando `sparkHome` ajuda a identificar esses nós. 

In [None]:
str(sc.sparkHome) 

Para verificar qual é o usuário que está rodando o contexto do Spark, utilize o comando `sparkUser`.

In [None]:
str(sc.sparkUser()) 

Pasa saber o nome da aplicação que está sendo executada pelo Spark, use o comando `appName`. (Nota do Professor: Estamos falando de processamento em Big Data, o que pode indicar que vocês terão ambientes distribuiídos executando centenas de jobs em outras centenas de nós dentro do cluster. Eu recomendo *fortemente* o uso dos nomes nas aplicações, pois do contrário o ambiente pode ficar praticamente ingerenciável. Mais explicações em sala.)

In [None]:
sc.appName 

Para gerenciar as aplicações pelo id, basta buscar no contexto qual é o número da aplicação.

In [None]:
sc.applicationId 

Para identificar o grau de paralelismo em que o cluster está parametrizado, basta executar o comando abaixo:

In [None]:
sc.defaultParallelism 

## Carga de dados

Vamos realizar agora a carga de alguns dados, para exemplificar como realizar algumas operações com o PySpark.

In [None]:
rdd = sc.parallelize([('a',7),('a',2),('b',2)])

In [None]:
rdd.collect()

In [None]:
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])

In [None]:
rdd2.collect()

In [None]:
rdd3 = sc.parallelize(range(100))

In [None]:
rdd3.take(10)

In [None]:
rdd4 = sc.parallelize([("a",["x","y","z"]),("b",["p", "r"])])

In [None]:
rdd4.collect()

In [None]:
textFile = sc.textFile("https://raw.githubusercontent.com/fclesio/mack-processing-g/master/Scala%20Code/Dados/thegodfather_script.txt") 

## Operações com RDD

Como vimos nas aulas com o Scala, vamos executar algumas operações com os RDDs usando Python. 

In [None]:
rdd.getNumPartitions() 

In [None]:
rdd.count()

In [None]:
rdd.countByKey() 

In [None]:
rdd.countByValue() 

## Operações de sumarização

Agora, vamos executar alguns comandos de sumarização sobre o nosso RDD para verificar alguns valores relativos à estatística descritiva básica.

In [None]:
rdd3.max() 

In [None]:
rdd3.min()

In [None]:
rdd3.mean()

In [None]:
rdd3.stdev() 

In [None]:
rdd3.variance() 

In [None]:
rdd3.histogram(3)

In [None]:
rdd3.stats() 

## Aplicando Funções

In [None]:
rdd.map(lambda x: (x[1],x[0])).collect()

In [None]:
rdd5 = rdd.flatMap(lambda x: (x[1],x[0]))

In [None]:
rdd5.collect()

## Manipulação de Dados

### Seleção

In [None]:
rdd.collect() 

In [None]:
rdd.take(2)

In [None]:
rdd.first()

In [None]:
rdd.top(2) 

### Filtragem

In [None]:
rdd.filter(lambda x: "b" in x).collect()

In [None]:
rdd5.distinct().collect()

In [None]:
rdd.keys().collect() 

### Modificação

In [None]:
rdd.reduceByKey(lambda x,y : x+y).collect()

In [None]:
rdd.reduce(lambda a, b: a + b)

In [None]:
rdd3.groupBy(lambda x: x % 2).mapValues(list).take(10)

In [None]:
rdd.groupByKey().mapValues(list).collect()

In [None]:
seqOp = (lambda x,y: (x[0]+y,x[1]+1))

In [None]:
combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))

In [None]:
rdd3.aggregate((5,0),seqOp,combOp)

In [None]:
rdd.aggregateByKey((0,0),seqOp,combOp).collect()

### Operações Matemáticas

In [60]:
x = sc.parallelize([('a',1),('b',4),('b',5),('a',3)])
y = sc.parallelize([('a', 3), ('c', None)])
sorted(x.subtract(y).collect()) 

[('a', 1), ('b', 4), ('b', 5)]

In [61]:
rdd2.subtractByKey(rdd).collect() 

[('d', 1)]

In [62]:
rdd.cartesian(rdd2).collect()

[(('a', 7), ('a', 2)),
 (('a', 7), ('d', 1)),
 (('a', 7), ('b', 1)),
 (('a', 2), ('a', 2)),
 (('a', 2), ('d', 1)),
 (('a', 2), ('b', 1)),
 (('b', 2), ('a', 2)),
 (('b', 2), ('d', 1)),
 (('b', 2), ('b', 1))]

In [63]:
rdd.collect()

[('a', 7), ('a', 2), ('b', 2)]

In [64]:
rdd2.collect()

[('a', 2), ('d', 1), ('b', 1)]

In [65]:
rdd2.sortBy(lambda x: x[1]).collect()

[('d', 1), ('b', 1), ('a', 2)]

In [66]:
rdd2.sortByKey().collect() 

[('a', 2), ('b', 1), ('d', 1)]

In [67]:
rdd.repartition(4)

MapPartitionsRDD[104] at coalesce at NativeMethodAccessorImpl.java:0

## Salvando arquivos no disco e encerrando o contexto do Spark.

In [69]:
rdd.saveAsTextFile("rdd.txt")

In [70]:
sc.stop()