<a href="https://colab.research.google.com/github/gabgovar/Introducao-PySpark/blob/main/First_Spark_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instalando dependências e iniciando o PySpark


## Como instalar o PySpark no Google Colab

Como instalar o PySpark no Google Colab é uma dúvida comum entre aqueles que estão migrando seus projetos de Data Science para ambientes na nuvem.

O termo Big Data está cada vez mais presente, e mesmo projetos pessoais podem assumir uma grande dimensionalidade devido à quantidade de dados disponíveis.

Para analisar grandes volumes de dados, Big Data, com velocidade, o Apache Spark é uma ferramenta muito utilizada, dada a sua capacidade de processamento de dados e computação paralela.

O Spark foi pensado para ser acessível, oferecendo diversas APIs e frameworks em Python, Scala, SQL e diversas outras linguagens.

## PySpark no Google Colab

PySpark é a interface alto nível que permite você conseguir acessar e usar o Spark por meio da linguagem Python. Usando o PySpark, você consegue escrever todo o seu código usando apenas o nosso estilo Python de escrever código.

Já o Google Colab é uma ferramenta incrível, poderosa e gratuita – com suporte de GPU inclusive. Uma vez que roda 100% na nuvem, você não tem a necessidade de instalar qualquer coisa na sua própria máquina.

No entanto, apesar da maioria das bibliotecas de Data Science estarem previamente instaladas no Colab, o mesmo não acontece com o PySpark. Para conseguir usar o PySpark é necessário alguns passos intermediários, que não são triviais para aqueles que estão começando.

Dessa maneira, preparei um tutorial simples e direto ensinando a instalar as dependências e a biblioteca.

## Instalando o PySpark no Google Colab

Instalar o PySpark não é um processo direto como de praxe em Python. Não basta usar um pip install apenas. Na verdade, antes de tudo é necessário instalar dependências como o Java 8, Apache Spark 2.3.2 junto com o Hadoop 2.7.

### Instalar as dependências

In [2]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

### Configurar as variáveis de ambiente

In [3]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

# Funções RDD - PySpark

## map()

In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("Map")
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/content/drive/MyDrive/PySpark/data/sample.txt')
rdd.collect()

In [20]:
rdd2 = rdd.map(lambda x: x.split(' '))

In [21]:
rdd2.collect()

[['1', '2', '3', '4', '5'],
 ['3', '4', '5', '66', '77'],
 ['12', '43', '6', '7', '8'],
 ['12', '12', '33']]

In [27]:
def foo(x):
    return x.split(' ')

rdd3 = rdd.map(foo)
rdd3.collect()

[['1', '2', '3', '4', '5'],
 ['3', '4', '5', '66', '77'],
 ['12', '43', '6', '7', '8'],
 ['12', '12', '33']]

Quiz

In [28]:
Sam = sc.textFile('/content/drive/MyDrive/PySpark/data/sample2.txt')

In [58]:
Sam2 = Sam.map(lambda x: x.split(' '))
Sam2.collect()

[['Hi', 'how', 'are', 'you?'], ['Hpe', 'you', 'are', 'doing'], ['great']]

In [54]:
def foo2(x):
    l = x.split(' ')
    l2 = []
    for s in l:
        l2.append(len(s))
    return l2

Sam3 = Sam.map(foo)
Sam3.collect()

[['Hi', 'how', 'are', 'you?'], ['Hpe', 'you', 'are', 'doing'], ['great']]

## flatMap()

In [57]:
conf = SparkConf().setAppName("FlatMap")
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/content/drive/MyDrive/PySpark/data/sample.txt')
rdd.collect()

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8', '12 12 33']

In [59]:
MappRdd = rdd.flatMap(lambda x: x.split(' '))
MappRdd.collect()

['1',
 '2',
 '3',
 '4',
 '5',
 '3',
 '4',
 '5',
 '66',
 '77',
 '12',
 '43',
 '6',
 '7',
 '8',
 '12',
 '12',
 '33']

## Filter()
* Filtros são utlizados para remover os elementos do RDD
* É criado um novo RDD
* rdd.filter(lambda x: x!=123) => se o x!= 123 é verdadeiro ele é considerado no output caso contrário não será.

In [64]:
rdd6 = rdd.filter(lambda x: x != '12 12 33')
rdd6.collect()

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8']

In [72]:
def foo(x):
    if x == '12 12 33':
        return False
    else:
        return True


rdd6 = rdd.filter(foo)
rdd6.collect()

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8']

### Exemplo

Escrever um filtro que vai remover todas as palavras que começam com a ou c do rdd

In [87]:
conf = SparkConf().setAppName("FlatMap")
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/content/drive/MyDrive/PySpark/data/sample3.txt')
rdd.collect()

['this mango company animal',
 'cat dog ant mic laptop',
 'chair swich moble am charge cover',
 'amanda nay alarm ant']

In [88]:
FlatMappedRdd = rdd.flatMap(lambda x: x.split(' '))
FlatMappedRdd.collect()

['this',
 'mango',
 'company',
 'animal',
 'cat',
 'dog',
 'ant',
 'mic',
 'laptop',
 'chair',
 'swich',
 'moble',
 'am',
 'charge',
 'cover',
 'amanda',
 'nay',
 'alarm',
 'ant']

In [89]:
def filterAandC(x):
    if(x.startswith('a') or x.startswith('c')):
        return False
    else:
        return True

filteredRdd = FlatMappedRdd.filter(filterAandC)
filteredRdd.collect()

['this', 'mango', 'dog', 'mic', 'laptop', 'swich', 'moble', 'nay']

In [94]:
filterRddLambda = FlatMappedRdd.filter(lambda x: not(x.startswith('a') or x.startswith('c')) )
filterRddLambda.collect()

['this', 'mango', 'dog', 'mic', 'laptop', 'swich', 'moble', 'nay']

## distinct()

* Distinct é usado para obter elementos distintos do RDD
* Ele vai criar um novo RDD
* rdd.distinct()

In [95]:
conf = SparkConf().setAppName("Distinct")
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/content/drive/MyDrive/PySpark/data/sample.txt')

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8', '12 12 33']

In [96]:
rdd2 = rdd.distinct()
rdd2.collect()

['3 4 5 66 77', '12 43 6 7 8', '12 12 33', '1 2 3 4 5']

In [99]:
rdd2 = rdd.flatMap(lambda x: x.split(' '))
rdd3 = rdd2.distinct()
rdd3.collect()

['1', '4', '66', '77', '12', '8', '33', '2', '3', '5', '43', '6', '7']

In [100]:
rdd.flatMap(lambda x: x.split(' ')).distinct().collect()

['1', '4', '66', '77', '12', '8', '33', '2', '3', '5', '43', '6', '7']

## groupByKey()

* GroupByKey é utilizado para criar grupos baseados em uma chave do RDD
* Para o groupByKey funcionar corretamente os dados devem estar no formato de (k,v),(k,v),(k2,v),(k2,v2)
* Exemplo: ("Apple",1),("Ball",1),("Apple",1)
* Vai criar um novo RDD
* rdd.groupByKey()
* mapValues(list) é usualmente utilizado para obter os dados do grupo

In [112]:
conf = SparkConf().setAppName("GroupByKey")
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/content/drive/MyDrive/PySpark/data/sample3.txt')

In [113]:
rdd.map(lambda x: (x, 1)).collect()

[('this mango company animal', 1),
 ('cat dog ant mic laptop', 1),
 ('chair swich moble am charge cover', 1),
 ('amanda any alarm ant', 1)]

In [115]:
rdd.map(lambda x: (x, len(x.split()))).collect()

[('this mango company animal', 4),
 ('cat dog ant mic laptop', 5),
 ('chair swich moble am charge cover', 6),
 ('amanda any alarm ant', 4)]

In [121]:
rdd2 = rdd.flatMap(lambda x: x.split(' '))
rdd2 .collect()

['this',
 'mango',
 'company',
 'animal',
 'cat',
 'dog',
 'ant',
 'mic',
 'laptop',
 'chair',
 'swich',
 'moble',
 'am',
 'charge',
 'cover',
 'amanda',
 'any',
 'alarm',
 'ant']

In [126]:
rdd3 = rdd2.map(lambda x: (x,len(x)))

In [130]:
rdd3.groupByKey().mapValues(list).collect()

[('this', [4]),
 ('mango', [5]),
 ('cat', [3]),
 ('ant', [3, 3]),
 ('laptop', [6]),
 ('chair', [5]),
 ('moble', [5]),
 ('am', [2]),
 ('charge', [6]),
 ('company', [7]),
 ('animal', [6]),
 ('dog', [3]),
 ('mic', [3]),
 ('swich', [5]),
 ('cover', [5]),
 ('amanda', [6]),
 ('any', [3]),
 ('alarm', [5])]

## reduceByKey()

* ReduceByKey é utilizado para combinar dados baseado nas chaves do RDD
* Para o groupByKey funcionar corretamente os dados devem estar no formato de (k,v),(k,v),(k2,v),(k2,v2)
* Exemplo: ("Apple",1),("Ball",1),("Apple",1)
* Vai criar um novo RDD
* rdd.reduceByKey(lambda x, y: x + y)

In [147]:
conf = SparkConf().setAppName("ReduceByKey")
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/content/drive/MyDrive/PySpark/data/sample.txt')
rdd.collect()

['1 2 3 4 5', '3 4 5 66 77', '12 43 6 7 8', '12 12 33']

In [148]:
rdd2 = rdd.flatMap(lambda x: x.split(' '))

In [149]:
rdd3 = rdd2.map(lambda x: (x,1))

In [152]:
rdd3.reduceByKey(lambda x, y : x + y).collect()

[('1', 1),
 ('4', 2),
 ('66', 1),
 ('77', 1),
 ('12', 3),
 ('8', 1),
 ('33', 1),
 ('2', 1),
 ('3', 2),
 ('5', 2),
 ('43', 1),
 ('6', 1),
 ('7', 1)]

### Exemplo

Escreva um fluxo de transformação que retornará a contagem de palavras de cada palavra presente no arquivo como par (valor-chave)

In [157]:
conf = SparkConf().setAppName("Exemplo-Map-Reduce")
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/content/drive/MyDrive/PySpark/data/sample4.txt')
rdd.collect()

['this mango company',
 'cat mango ant animal laptop',
 'chair switch mango am charge cover',
 'animalany mango ant laptop laptop',
 'this']

In [158]:
rdd2 = rdd.flatMap(lambda x: x.split(' '))

In [159]:
rdd3 = rdd2.map(lambda x: (x,1))

In [160]:
rdd3.reduceByKey(lambda x, y : x + y).collect()

[('this', 2),
 ('mango', 4),
 ('cat', 1),
 ('ant', 2),
 ('laptop', 3),
 ('chair', 1),
 ('switch', 1),
 ('am', 1),
 ('charge', 1),
 ('company', 1),
 ('animal', 1),
 ('cover', 1),
 ('animalany', 1)]

In [165]:
rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x ,1)).reduceByKey(lambda x ,y : x + y).collect()

[('this', 2),
 ('mango', 4),
 ('cat', 1),
 ('ant', 2),
 ('laptop', 3),
 ('chair', 1),
 ('switch', 1),
 ('am', 1),
 ('charge', 1),
 ('company', 1),
 ('animal', 1),
 ('cover', 1),
 ('animalany', 1)]

## count()

* count retorna o número de elementos no RDD
* count é uma ação
* rdd.count()

In [166]:
conf = SparkConf().setAppName("count")
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/content/drive/MyDrive/PySpark/data/sample4.txt')
rdd.collect()

['this mango company',
 'cat mango ant animal laptop',
 'chair switch mango am charge cover',
 'animalany mango ant laptop laptop',
 'this']

In [170]:
rdd2 = rdd.flatMap(lambda x: x.split(' '))

In [171]:
rdd2.count()

20

## coutByValue()

* contByValue fornece quantas vezes cada valor ocorre no RDD
* count é uma ação
* rdd.countByValue()

In [174]:
rdd.countByValue()

defaultdict(int,
            {'animalany mango ant laptop laptop': 1,
             'cat mango ant animal laptop': 1,
             'chair switch mango am charge cover': 1,
             'this': 1,
             'this mango company': 1})

In [177]:
rdd2 = rdd.flatMap(lambda x: x.split(' '))
rdd2.collect()

['this',
 'mango',
 'company',
 'cat',
 'mango',
 'ant',
 'animal',
 'laptop',
 'chair',
 'switch',
 'mango',
 'am',
 'charge',
 'cover',
 'animalany',
 'mango',
 'ant',
 'laptop',
 'laptop',
 'this']

In [178]:
rdd2.countByValue()

defaultdict(int,
            {'am': 1,
             'animal': 1,
             'animalany': 1,
             'ant': 2,
             'cat': 1,
             'chair': 1,
             'charge': 1,
             'company': 1,
             'cover': 1,
             'laptop': 3,
             'mango': 4,
             'switch': 1,
             'this': 2})

In [179]:
rdd.flatMap(lambda x: x.split(' ')).countByValue()

defaultdict(int,
            {'am': 1,
             'animal': 1,
             'animalany': 1,
             'ant': 2,
             'cat': 1,
             'chair': 1,
             'charge': 1,
             'company': 1,
             'cover': 1,
             'laptop': 3,
             'mango': 4,
             'switch': 1,
             'this': 2})