## Exercício DStream - Leitura de Dados

### Correção de Exercício DStream - Leitura de Dados

### 1. Instalar o NetCat no container do spark

* apt update
* apt install netcat

### 2. Criar uma aplicação para ler os dados da porta 9999 e exibir no console

In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from time import sleep

In [2]:
conf = SparkConf().setMaster("local[*]").setAppName("Dstream Python")
sc = SparkContext.getOrCreate(conf)
ssc = StreamingContext(sc, 5)

In [3]:
dstream = ssc.socketTextStream("localhost", 9999)

In [4]:
dstream.pprint()

In [5]:
# Antes de executar o comando abaixo, teremos que executar este comando no terminal primeiro.
# root@jupyter-spark:/# nc -lp 9999
ssc.start()
sleep(20)
ssc.stop()

-------------------------------------------
Time: 2022-04-19 21:41:25
-------------------------------------------
ola
tudo bem
Manoel
oi

-------------------------------------------
Time: 2022-04-19 21:41:30
-------------------------------------------
heita
tudo certo

-------------------------------------------
Time: 2022-04-19 21:41:35
-------------------------------------------
por ai
meu
amigo

-------------------------------------------
Time: 2022-04-19 21:41:40
-------------------------------------------



## Exercício DStream - Word Count

### 1. Criar o diretório no hdfs “/user/rodrigo/stream”

In [6]:
!hdfs dfs -mkdir /user/manoel/stream

### 2. Criar uma aplicação para contar palavras a cada 10 segundos da porta 9998 e exibir no console durante 50 segundos

In [9]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from time import sleep

In [10]:
conf = SparkConf().setMaster("local[*]").setAppName("Dstream WordCount")
sc = SparkContext.getOrCreate(conf)
ssc = StreamingContext(sc, 10)

In [11]:
dstream = ssc.socketTextStream("localhost", 9998)

In [13]:
wordcount = dstream.flatMap(lambda linha: linha.split(" "))\
                   .map(lambda palavra: (palavra,1))\
                   .reduceByKey(lambda chave1, chave2: chave1+chave2)
wordcount.pprint()

In [14]:
ssc.start()
sleep(50)
ssc.stop()

-------------------------------------------
Time: 2022-04-19 22:43:40
-------------------------------------------

-------------------------------------------
Time: 2022-04-19 22:43:50
-------------------------------------------
('Manoel', 1)
('teste', 1)

-------------------------------------------
Time: 2022-04-19 22:44:00
-------------------------------------------
('Nascimento', 1)

-------------------------------------------
Time: 2022-04-19 22:44:10
-------------------------------------------
('d', 1)
('b', 1)
('a', 1)
('c', 1)

-------------------------------------------
Time: 2022-04-19 22:44:20
-------------------------------------------



### 3. Criar uma aplicação para contar palavras a cada 10 segundos da porta 9998 e salvar os dados no namenode no diretório “hdfs://namenode/user/rodrigo/stream/word_count” durante 50 segundos

In [15]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from time import sleep

In [16]:
conf = SparkConf().setMaster("local[*]").setAppName("Dstream WordCount")
sc = SparkContext.getOrCreate(conf)
ssc = StreamingContext(sc, 10)

In [17]:
dstream = ssc.socketTextStream("localhost", 9998)

In [18]:
wordcount = dstream.flatMap(lambda linha: linha.split(" "))\
                   .map(lambda palavra: (palavra,1))\
                   .reduceByKey(lambda chave1, chave2: chave1+chave2)

In [19]:
wordcount.saveAsTextFiles("/user/manoel/stream/word_count")

In [20]:
ssc.start()
sleep(50)
ssc.stop()

In [21]:
!hdfs dfs -ls /user/manoel/stream

Found 5 items
drwxr-xr-x   - root supergroup          0 2022-04-19 22:55 /user/manoel/stream/word_count-1650408920000
drwxr-xr-x   - root supergroup          0 2022-04-19 22:55 /user/manoel/stream/word_count-1650408930000
drwxr-xr-x   - root supergroup          0 2022-04-19 22:55 /user/manoel/stream/word_count-1650408940000
drwxr-xr-x   - root supergroup          0 2022-04-19 22:55 /user/manoel/stream/word_count-1650408950000
drwxr-xr-x   - root supergroup          0 2022-04-19 22:56 /user/manoel/stream/word_count-1650408960000


In [24]:
!hdfs dfs -ls /user/manoel/stream/word_count-1650408940000

Found 4 items
-rw-r--r--   2 root supergroup          0 2022-04-19 22:55 /user/manoel/stream/word_count-1650408940000/_SUCCESS
-rw-r--r--   2 root supergroup          0 2022-04-19 22:55 /user/manoel/stream/word_count-1650408940000/part-00000
-rw-r--r--   2 root supergroup         22 2022-04-19 22:55 /user/manoel/stream/word_count-1650408940000/part-00001
-rw-r--r--   2 root supergroup          9 2022-04-19 22:55 /user/manoel/stream/word_count-1650408940000/part-00002


In [25]:
!hdfs dfs -cat /user/manoel/stream/word_count-1650408940000/part-00001

('teste', 3)
('b', 1)
