# Introdução ao PySpark

PySpark é uma API Python para Apache SPARK que é denominado como o mecanismo de processamento analítico para aplicações de processamento de dados distribuídos em larga escala e aprendizado de máquina em tempo real, ou seja, para grandes volumes de dados, conhecido como Big Data.
O uso da biblioteca Pyspark possui diversas vantagens:
* É um mecanismo de processamento distribuído , na memória, que permite o processamento de dados de forma eficiente e de características distribuída.
* Com o uso do PySpark, é possível o processamento de dados em Hadoop (HDFS), AWS S3 e outros sistemas de arquivos.
* Possui bibliotecas de aprendizado de máquina e gráficos.
* Geralmente as aplicações criadas e executadas no PySpark são 100x mais rápidas que outras em sistemas de dados conhecidos.

Toda a execução dos scripts são realizados dentro do Apache Spark, que distribui o processamento dentro de um ambiente de cluster que são interligados aos NÓs que realizam a execução e transformação dos dados.

**Spark Workflow**<br/>
O Apache Spark segue os seguintes passos:
1. Carga de dados: Fontes que geram dados em tempo real, HDFS, NoSQL.
2. Transformação: Filtro, limpeza, join.
3. Processamento: Em memória, HDFS, NoSQL.
4. Análise interativa: Shell, SparkSQL.
5. Machine Learning: Modelos preditivos aplicados a Stream de Dados.
6. Ação: Tomada de decisão.

**Spark Driver e Workers**<br/>
Uma aplicação Spark possui 2 programas:
* *Driver Program*: É executado em uma única máquina.
* *Worker Program*: São executados em cada node do cluster. Os Dataframes são distribuídos através dos Clusters.

**Spark Contexts**<br/>
O Spark cria um programa chamado SparkContext, que é um objeto que define como e onde o Spark acessa o cluster. No shell do PySpark, o SparkContext é criado automaticamente. Já no Jupyter notebook ou em outra IDE, precisamos criar o SparkContext.
Na sequência, criamos o sqlContext, que é o objeto usado para criar os dataframes, as estruturas que vão armazenar os dados.

**Hadoop ou Apache Spark**
O Hadoop é mais recomendado quando o volume de dados for massivo na casa dos Petabytes, além disso, quando for bem estruturado.
Já o Apache Spark é mais recomendado quando o volume de dados não for tão grande ou quando os dados não são estruturados, ou seja, fluxo de dados.

<img src='assets/tabela01.png'/>

Quando o tamanho do conjunto de dados for menor que 1TB devemos utilizar o Apache Spark, já quando o tamanho do conjunto de dados foi maior que 1TB podemos utilizar o Hadoop para armazenamento e o Spark para processamento dos dados.

**MapReduce x Spark**<br/>
O Hadoop HDFS serve para o armazenamento distribuído em Clusters e o Hadoop MapReduce para processamento distribuído.
O MapReduce é um motor de computação distribuída fornecido pelo Hadoop. Enquanto que o HDFS fornece um sistema de arquivos distribuídos para armazenamento de grande conjunto de dados, o MapReduce fornece uma estrutura de computação para o processamento de grande conjunto de dados em paralelo através de um Cluster de computadores.

<img src='assets/figura01.png'/>

* O Spark suporta mais do que apenas as funções de Map e Reduce;
* Hadoop MapReduce grava os resultados intermediários em disco, enquanto o Spark grava os resultados intermediários em memória, o que é muito mais rápido.
* O Spark fornece APIs concisas e consistentes em Scala, Java e Python (e mais recentemente em R);
* O Spark oferece shell interativo para Scala, Python e R;
* O Spark pode utilizar o HDFS como uma de suas fontes de dados.

### RDD - Resilient Distributed Datasets
É como uma tabela de banco de dados, é a essência do funcionamento do Spark. É uma coleção de objetos distribuída e imutável, é read-only. Cada conjunto de dados no RDD é dividido em partições lógicas, que podem ser computadas em diferentes nodes do cluster.
Existem duas formas de criar o RDD:
Paralelizando uma coleção existente (função sc.parallelize);
Referenciando um dataset externo (HDFS, RDBMS, NoSQL, S3).
O Spark utiliza o conceito de RDDs para aplicar o MapReduce de maneira rápida. Por padrão, os RDDs são computados cada vez que executamos uma ação. Entretanto, podemos “persistir” o RDD na memória (ou mesmo no disco) de modo que os dados estejam disponíveis ao longo do cluster e possam ser processados de forma muito mais rápida pelas operações de análise de dados.<br/>
O RDD suporta dois tipos de operações:
<img src='assets/tabela02.png'/>
Cada transformação gera um novo RDD, pois os RDDs são imutáveis. As ações aplicam as transformações nos RDDs e retornam o resultado.

**Características dos RDDs:**
* Spark é baseado em RDDs. Criamos, transformamos e armazenamos RDDs em Spark;
* RDD representa uma coleção de elementos de dados particionados que podem ser operados em paralelo.
* RDDs são objetos imutáveis. Eles não podem ser alterados uma vez criados.
* RDDs podem ser colocados em cache e permitem persistência (mesmo objeto usado entre sessões diferentes).
* Ao aplicarmos Transformações em RDDs criamos novos RDDs.
* Ações aplicam as transformações nos RDDs e geram um resultado.

**Existem dois tipos de transformações:**
* *Narrow*: Resultado de funções como map() e filter() e os dados vem de uma única partição
* *Wide*: Resultado de funções como groupByKey() e os dados podem vir de diversas partições.


### Spark SQL
**Cloud e Cluster Computing**<br/>
Não temos como processar e armazenar Big Data em apenas um computador. É um sistema que compreende dois ou mais computadores, chamados de nodes. Os nodes funcionam como se fosse apenas um único computador.
O MapReduce é a principal tecnologia para o processamento em sistemas distribuídos.

**Linguagem SQL**<br/>
A linguagem sql surgiu por volta dos anos 70. É uma linguagem de script utilizada para controlar, manipular e selecionar dados em banco de dados.

**Spark SQL**<br/>
O Spark sql permite realizar consultas sql no spark. O Spark possui dataframes, semelhante com os dataframes do Pandas. O Spark SQL possui três principais componentes:
* Dataframe (Schema RDD);
* Spark Session;
* SQL Context.

### Spark Streaming
A proposta do Spark Stream é analisar dados em tempo real, e não esperar horas para fazer a análise e processamento. "*A vida não acontece em batches*".<br/>
Streaming de dados não é apenas para projetos altamente especializados. Computação baseada em Streaming está se tornando a regra para empresas orientadas a dados.<br/>
Uma das principais fontes de dados contínuos são os sensores, da internet das coisas.
Existem quatro areas principais que o Spark Streaming vem sendo utilizado:
* Streaming ETL;
* Detecção de anomalias;
* Enriquecimento de dados;
* Sessões complexas e aprendizado contínuo.

*Uma importante vantagem de usar o Spark para Big Data Analytics é a possibilidade de combinar processamento em batch e processamento de streaming em um único sistema*.<br/.
* **Batch**: Você inicia o processamento de um arquivo ou dataset finito, o spark processa as tarefas configuradas e conclui o trabalho.
* **Streaming**: Você processa um stream de dados contínuos; a execução não pára até que haja algum erro ou você termine a aplicação manualmente.

### DStreams: Discretized Streams
Assim como os RDDs são a base do Apache Spark, os DStreams são a base do Apache Spark Streaming.<br/>
O dstream é uma sequência de dados que são coletados ao longo do tempo. Internamente, um dstream é representado por uma sequência de RDDs coletados em cada intervalo de tempo. Pode ser criado por diversas fontes, como Kafka, Flume, Twitter, etc..<br/>
Uma vez que são criados, os dstreams oferecem dois tipos e operações:
* **Transformações**: Geram um novo dstream;
* **Ações (operações de output)**: Gravam os dados em um sistema de armazenamento ou outra fonte externa.
Os DStreams oferecem muitas das operações que podem ser realizadas com os RDDs, mais operações relacionadas ao tempo, como sliding windows.

<img src="assets/figura02.png"/>

**O que pode ser feito com DStreams:**
* Map;
* FlatMap;
* Filter;
* ReduceByKey;
* Join;
* Window;
* Manter o controle de estado dos dados (Stateful Data)

**Windowing**
Computação em uma janela de tempo. A cada janela de tempo, um RDD é criado no DStream, podemos querer ver o que acontece em um determinado intervalo de tempo.
* Window length: Duração da window;
* Sliding interval: Intervalo entre as windows.

Exemplo de uso:
> ssc = StreamingContext(sc, INTERVALO_BATCH);<br/>
window(windowDuration: Duration, slideDuration: Duration): DStream[T]<br/>

Windowing permite computar os resultados ao longo de períodos de tempo maiores que o batch interval.

São três os intervalos de tempo que devemos considerar ao trabalhar com streaming:
* Batch interval: Frequência com que os dados são capturados em um DStream;
* Frequência com que uma window é aplicada;
* Intervalo de tepo capturado para computação e geração de resultados;

**Principais mecanismos de tolerância a falhas:**
* Todos os dados são replicados para no mínimo 2 worker nodes;
* *ssc.checkpoint()*: Um diretório de checkpoint pode ser usado para armazenar o estado do streaming de dados, no caso em que é necessário reiniciar o streaming.
* *Falha no Receiver*: Alguns receivers são melhores que outros. Receiver como Twitter, Kafka e Flume não permitem recuperação de dados. Se o receiver falha, os dados do streaming são perdidos. Outros garantem a recuperação dos dados em caso de falhas: HDFS, Directly-consumed Kafka, Pull-based Flume.
* *Falha no Driver Context*: Embora os dados sejam replicados para os worker nodes, o DriverContext é executado no node master e este pode ser um ponto único de falha. Podemos usar o checkpoint() para recuperar dados em caso de falhas e usamos a função streamingContext.getOrCreate() para continuar o processamento de onde ele foi interrompido em caso de falha.
Em caso de falha no script sendo executado no DriverContext, podemos reiniciar automaticamente o processo de streaming, usando o Zookeeper (no modo supervise). O zookeeper é um cluster manager usado pelo spark.

**Integração com outros sistemas: Kafka, Flume e Kinesis.**<br/>
É um sistema para gerenciamento de fluxos de dados em tempo real. O Linkedin é o maior ambiente Kafka atualmente. Foi desenvolvido com o propósito de servir como repositório de dados gerados em tempo real. Foi desenvolvido com dois objetivos em mente:
* Transportar dados entre diversos sistemas de dados;
* Enriquecer a análise de dados.

Kafka com spark: Instalar o pacote **spark-streaming-kafka**

**Apache Flume**<br/>
É parecido com o Kafka, mas é específico para logs em servidores, e pode ser muito útil para o processo de análise de dados.
Instalação:
* *Spark-streaming-flume*: Push-based;
* *Spark-streaming-flume-sink*: Pull-Based.

**Amazon Kinesis**<br/>
É similar ao kafka, mas pode ser integrado direto com a nuvem da amazon.
* *Spark-streaming-kinesis-asl*: Requer a licença da Amazon.



### Para acompanhar a execução dos jobs basta acessar o seguinte endereço:
Acesse http://localhost:4040 sempre que quiser acompanhar a execução dos jobs

## Introdução ao PySpark

In [2]:
import sys
print(sys.version)
print(sc)
print(sc.version)

3.9.7 (default, Sep 16 2021, 16:59:28) [MSC v.1916 64 bit (AMD64)]
<SparkContext master=local[*] appName=PySparkShell>
3.0.3


In [3]:
# Testando o Spark e criando uma RDD
lst = [25, 90, 81, 37, 776, 3320]
testData = sc.parallelize(lst)

In [4]:
?sc.parallelize

In [5]:
type(testData)

pyspark.rdd.RDD

In [6]:
testData.count()

6

In [7]:
testData.collect()

[25, 90, 81, 37, 776, 3320]

RDD's são coleções distribuídas de itens. RDD's podem ser criadas a partir do Hadoop (arquivos no HDFS), através da transformação de outras RDD's, a partir de bancos de dados (relacionais e não-relacionais) ou a partir de arquivos locais.

In [8]:
# Criando uma RDD a partir de um arquivo csv
sentimentoRDD = sc.textFile("data/sentimentos.csv")

In [9]:
type(sentimentoRDD)

pyspark.rdd.RDD

In [10]:
# Ação - Contando o número de registros
sentimentoRDD.count()

100

In [11]:
# Listando os 5 primeiros registros
sentimentoRDD.take(5)

['positivo,Esse livro é incrível.',
 'positivo,Um dos melhores livros que eu já li.',
 'positivo,um dos melhores livros que eu já li',
 'positivo,Acho que ele tem um conteúdo que vai além do que está em sua descrição.',
 'positivo,O Sol é para todos é profundo e emocionante']

In [12]:
# Transformando os dados - transformação para letras maiúsculas
transfRDD = sentimentoRDD.map(lambda x : x.upper())
transfRDD.take(5)

['POSITIVO,ESSE LIVRO É INCRÍVEL.',
 'POSITIVO,UM DOS MELHORES LIVROS QUE EU JÁ LI.',
 'POSITIVO,UM DOS MELHORES LIVROS QUE EU JÁ LI',
 'POSITIVO,ACHO QUE ELE TEM UM CONTEÚDO QUE VAI ALÉM DO QUE ESTÁ EM SUA DESCRIÇÃO.',
 'POSITIVO,O SOL É PARA TODOS É PROFUNDO E EMOCIONANTE']

In [13]:
sentimentoRDD.take(5)

['positivo,Esse livro é incrível.',
 'positivo,Um dos melhores livros que eu já li.',
 'positivo,um dos melhores livros que eu já li',
 'positivo,Acho que ele tem um conteúdo que vai além do que está em sua descrição.',
 'positivo,O Sol é para todos é profundo e emocionante']

In [15]:
arquivo = sc.textFile("data/sentimentos.csv")
type(arquivo)

pyspark.rdd.RDD

In [16]:
arquivo.count()

100

In [17]:
arquivo.first()

'positivo,Esse livro é incrível.'

In [18]:
linhasComSol = arquivo.filter(lambda line: "Sol" in line)

In [19]:
type(linhasComSol)

pyspark.rdd.PipelinedRDD

In [20]:
linhasComSol.count()

3

Primeiro a função map() determina o comprimento de cada linha do arquivo, criando uma RDD. A função reduce() é chamada para encontrar a linha com maior número de caracteres. O argumento para as funções map() e reduce() são funções anônimas criadas com lambda (da linguagem Python).

In [21]:
arquivo.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)

27

Esta linha pode ser reescrita da seguinte forma:

In [23]:
def max(a, b):
    if a > b:
        return a
    else:
        return b

arquivo.map(lambda line: len(line.split())).reduce(max)

27

## Operação de MapReduce

As operações de MapReduce foram popularizadas pelo Hadoop e podem ser feitas com Spark até 100x mais rápido.

In [24]:
contaPalavras = arquivo.flatMap(lambda line: line.split()).map(lambda palavra: (palavra, 1)).reduceByKey(lambda a, b: a+b)

In [25]:
contaPalavras.collect()

[('livro', 5),
 ('que', 13),
 ('li.', 4),
 ('positivo,um', 3),
 ('li', 1),
 ('positivo,Acho', 1),
 ('tem', 1),
 ('um', 3),
 ('vai', 1),
 ('do', 2),
 ('em', 1),
 ('descrição.', 1),
 ('positivo,O', 2),
 ('para', 5),
 ('todos', 4),
 ('positivo,Me', 1),
 ('este', 1),
 ('livro,', 1),
 ('antigo', 1),
 ('uma', 4),
 ('história', 1),
 ('antiga', 1),
 ('positivo,The', 6),
 ('Da', 38),
 ('Vinci', 45),
 ('Code', 24),
 ('is', 17),
 ('good', 3),
 ('movie...', 1),
 ('thought', 2),
 ('was', 4),
 ('pretty', 1),
 ('book.', 4),
 ('realmente', 1),
 ('deveria', 1),
 ('todas', 1),
 ('as', 1),
 ('pessoas.', 1),
 ('an', 6),
 ('*', 2),
 ('book', 2),
 ('turn', 1),
 ('positivo,Harper', 1),
 ('aborda', 1),
 ('muito', 3),
 ('polêmicos,', 1),
 ('como', 1),
 ('Bullying,', 1),
 ('olhos', 1),
 ('inocentes', 1),
 ('positivo,i', 4),
 ('love', 6),
 ('da', 13),
 ('code....', 1),
 ('loved', 5),
 ('code..', 2),
 ('VINCI', 4),
 ('BEAUTIFUL', 1),
 ('positivo,THE', 1),
 ('slash', 1),
 ('race.', 1),
 ('positivo,Hey', 1),
 ('The

# Transformações

In [1]:
# Criando uma lista em Python
lista1 = [124, 901, 652, 102, 397]

In [2]:
type(lista1)

list

In [3]:
# Carregando dados de uma coleção
lstRDD = sc.parallelize(lista1)

In [4]:
type(lstRDD)

pyspark.rdd.RDD

In [5]:
lstRDD.collect()

[124, 901, 652, 102, 397]

In [6]:
lstRDD.count()

5

In [7]:
# Carregando um arquivo e criando um RDD. 
autoDataRDD = sc.textFile("carros.csv")

In [8]:
type(autoDataRDD)

pyspark.rdd.RDD

In [9]:
# Operação de Ação. 
autoDataRDD.first()

'MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE'

In [10]:
autoDataRDD.take(5)

['MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118',
 'chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151',
 'mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348']

In [11]:
# Cada ação gera um novo processo de computação dos dados. 
# Mas podemos persistir os dados em cache para que ele possa ser usado por outras ações, sem a necessidade 
# de nova computação.
autoDataRDD.cache()

carros.csv MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [12]:
for line in autoDataRDD.collect():
    print(line)

MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE
subaru,gas,std,two,hatchback,fwd,four,69,4900,31,36,5118
chevrolet,gas,std,two,hatchback,fwd,three,48,5100,47,53,5151
mazda,gas,std,two,hatchback,fwd,four,68,5000,30,31,5195
toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,37,41,5389
honda,gas,std,two,hatchback,fwd,four,60,5500,38,42,5399
nissan,gas,std,two,sedan,fwd,four,69,5200,31,37,5499
dodge,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
plymouth,gas,std,two,hatchback,fwd,four,68,5500,37,41,5572
mazda,gas,std,two,hatchback,fwd,four,68,5000,31,38,6095
mitsubishi,gas,std,two,hatchback,fwd,four,68,5500,31,38,6189
dodge,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
plymouth,gas,std,four,hatchback,fwd,four,68,5500,31,38,6229
chevrolet,gas,std,two,hatchback,fwd,four,70,5400,38,43,6295
toyota,gas,std,two,hatchback,fwd,four,62,4800,31,38,6338
dodge,gas,std,two,hatchback,fwd,four,68,5500,31,38,6377

In [13]:
# Map() e criação de um novo RDD - Transformação - Lazy Evaluation
tsvData = autoDataRDD.map(lambda x : x.replace(",","\t"))
tsvData.take(5)

['MAKE\tFUELTYPE\tASPIRE\tDOORS\tBODY\tDRIVE\tCYLINDERS\tHP\tRPM\tMPG-CITY\tMPG-HWY\tPRICE',
 'subaru\tgas\tstd\ttwo\thatchback\tfwd\tfour\t69\t4900\t31\t36\t5118',
 'chevrolet\tgas\tstd\ttwo\thatchback\tfwd\tthree\t48\t5100\t47\t53\t5151',
 'mazda\tgas\tstd\ttwo\thatchback\tfwd\tfour\t68\t5000\t30\t31\t5195',
 'toyota\tgas\tstd\ttwo\thatchback\tfwd\tfour\t62\t4800\t35\t39\t5348']

In [14]:
autoDataRDD.first()

'MAKE,FUELTYPE,ASPIRE,DOORS,BODY,DRIVE,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE'

In [15]:
# Filter() e criação de um novo RDD - Transformação - Lazy Evaluation
toyotaData = autoDataRDD.filter(lambda x: "toyota" in x)

In [16]:
# Ação
toyotaData.count()

32

In [17]:
# Ação
toyotaData.take(20)

['toyota,gas,std,two,hatchback,fwd,four,62,4800,35,39,5348',
 'toyota,gas,std,two,hatchback,fwd,four,62,4800,31,38,6338',
 'toyota,gas,std,four,hatchback,fwd,four,62,4800,31,38,6488',
 'toyota,gas,std,four,wagon,fwd,four,62,4800,31,37,6918',
 'toyota,gas,std,four,sedan,fwd,four,70,4800,30,37,6938',
 'toyota,gas,std,four,hatchback,fwd,four,70,4800,30,37,7198',
 'toyota,gas,std,four,sedan,fwd,four,70,4800,38,47,7738',
 'toyota,diesel,std,four,hatchback,fwd,four,56,4500,38,47,7788',
 'toyota,gas,std,four,wagon,4wd,four,62,4800,27,32,7898',
 'toyota,diesel,std,four,sedan,fwd,four,56,4500,34,36,7898',
 'toyota,gas,std,two,sedan,rwd,four,70,4800,29,34,8058',
 'toyota,gas,std,two,hatchback,rwd,four,70,4800,29,34,8238',
 'toyota,gas,std,four,hatchback,fwd,four,70,4800,28,34,8358',
 'toyota,gas,std,two,hardtop,rwd,four,116,4800,24,30,8449',
 'toyota,gas,std,four,wagon,4wd,four,62,4800,27,32,8778',
 'toyota,gas,std,four,sedan,fwd,four,92,4200,29,34,8948',
 'toyota,gas,std,four,sedan,fwd,four,70,

In [18]:
# Pode salvar o conjunto de dados, o RDD. 
# Nesse caso, o Spark solicita os dados ao processo Master e então gera um arquivo de saída.
savedRDD = open("carros_v2.csv","w")
savedRDD.write("\n".join(autoDataRDD.collect()))
savedRDD.close()

## Operações Set

In [19]:
# Set operations
palavras1 = sc.parallelize(["Big Data","Data Science","Analytics","Visualization"])
palavras2 = sc.parallelize(["Big Data","R","Python","Scala"])

In [20]:
# União
for unions in palavras1.union(palavras2).distinct().collect():
    print(unions)

Data Science
R
Visualization
Scala
Big Data
Python
Analytics


In [21]:
# Interseção
for intersects in palavras1.intersection(palavras2).collect():
    print(intersects)

Big Data


In [22]:
rdd01 = sc.parallelize(range(1,10))
rdd02 = sc.parallelize(range(10,21))
rdd01.union(rdd02).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [23]:
rdd01 = sc.parallelize(range(1,10))
rdd02 = sc.parallelize(range(5,15))
rdd01.intersection(rdd02).collect()

[8, 9, 5, 6, 7]

## Left/Right Outer Join

In [24]:
names1 = sc.parallelize(("banana", "uva", "laranja")).map(lambda a: (a, 1))
names2 = sc.parallelize(("laranja", "abacaxi", "manga")).map(lambda a: (a, 1))
names1.join(names2).collect()

[('laranja', (1, 1))]

In [25]:
names1.leftOuterJoin(names2).collect()

[('uva', (1, None)), ('banana', (1, None)), ('laranja', (1, 1))]

In [26]:
names1.rightOuterJoin(names2).collect()

[('manga', (None, 1)), ('laranja', (1, 1)), ('abacaxi', (None, 1))]

## Distinct

In [27]:
# Distinct
lista1 = [124, 901, 652, 102, 397, 124, 901, 652]
lstRDD = sc.parallelize(lista1)
for numbData in lstRDD.distinct().collect():
    print(numbData)

124
652
901
397
102


## Transformação e Limpeza

In [28]:
# Transformação e Limpeza
def LimpaRDD(autoStr) :
    if isinstance(autoStr, int) :
        return autoStr
    attList = autoStr.split(",")
    
    # Converte o número de portas para um num
    if attList[3] == "two" :
         attList[3] = "2"
    else:
         attList[3] = "4"
    
    # Convert o modelo do carro para uppercase
    attList[5] = attList[4].upper()
    return ",".join(attList)

In [29]:
RDD_limpo = autoDataRDD.map(LimpaRDD)
RDD_limpo.collect()

['MAKE,FUELTYPE,ASPIRE,4,BODY,BODY,CYLINDERS,HP,RPM,MPG-CITY,MPG-HWY,PRICE',
 'subaru,gas,std,2,hatchback,HATCHBACK,four,69,4900,31,36,5118',
 'chevrolet,gas,std,2,hatchback,HATCHBACK,three,48,5100,47,53,5151',
 'mazda,gas,std,2,hatchback,HATCHBACK,four,68,5000,30,31,5195',
 'toyota,gas,std,2,hatchback,HATCHBACK,four,62,4800,35,39,5348',
 'mitsubishi,gas,std,2,hatchback,HATCHBACK,four,68,5500,37,41,5389',
 'honda,gas,std,2,hatchback,HATCHBACK,four,60,5500,38,42,5399',
 'nissan,gas,std,2,sedan,SEDAN,four,69,5200,31,37,5499',
 'dodge,gas,std,2,hatchback,HATCHBACK,four,68,5500,37,41,5572',
 'plymouth,gas,std,2,hatchback,HATCHBACK,four,68,5500,37,41,5572',
 'mazda,gas,std,2,hatchback,HATCHBACK,four,68,5000,31,38,6095',
 'mitsubishi,gas,std,2,hatchback,HATCHBACK,four,68,5500,31,38,6189',
 'dodge,gas,std,4,hatchback,HATCHBACK,four,68,5500,31,38,6229',
 'plymouth,gas,std,4,hatchback,HATCHBACK,four,68,5500,31,38,6229',
 'chevrolet,gas,std,2,hatchback,HATCHBACK,four,70,5400,38,43,6295',
 'toyot

## Ações

In [31]:
# reduce() - soma de valores
lista2 = [124, 901, 652, 102, 397, 124, 901, 652]
lstRDD = sc.parallelize(lista2)
lstRDD.collect()
lstRDD.reduce(lambda x,y: x + y)

3853

In [32]:
# Encontrando a linha com menor número de caracteres
autoDataRDD.reduce(lambda x,y: x if len(x) < len(y) else y)

'bmw,gas,std,two,sedan,rwd,six,182,5400,16,22,41315'

In [33]:
# Criando uma função para redução
def getMPG( autoStr) :
    if isinstance(autoStr, int) :
        return autoStr
    attList = autoStr.split(",")
    if attList[9].isdigit() :
        return int(attList[9])
    else:
        return 0

In [34]:
# Encontrando a média de MPG para todos os carros
autoDataRDD.reduce(lambda x,y : getMPG(x) + getMPG(y)) / (autoDataRDD.count() -1)

25.15228426395939

In [38]:
teams = sc.parallelize(("Flamengo", "Vasco", "Botafogo", "Fluminense", "Palmeiras", "Bahia"))
teams.takeSample(True, 3)

['Vasco', 'Flamengo', 'Flamengo']

In [39]:
teams = sc.parallelize(("Flamengo", "Vasco", "Botafogo", "Fluminense", "Palmeiras", "Bahia", "Bahia", "Vasco"))
teams.map(lambda k: (k,1)).countByKey().items()

dict_items([('Flamengo', 1), ('Vasco', 2), ('Botafogo', 1), ('Fluminense', 1), ('Palmeiras', 1), ('Bahia', 2)])

In [40]:
autoDataRDD.saveAsTextFile("autoDataRDD.txt")

# Fim

### Obrigado - Data Science Academy - <a href=http://facebook.com/dsacademy>facebook.com/dsacademybr</a>