# Spark & Jupyter | Ambiente de desenvolvimento e testes

In [None]:
!pip install unidecode

In [27]:
import os # Sistema operacional 
import sys # Funções e variáveis da shell do Python
import getopt # Criação de opções de entrada pela CLI
import pandas as pd # Pandas
import pyspark # Spark 
from pyspark import SparkContext # SparkContext
from pyspark.sql import SparkSession # SparkSession
from pyspark.sql.functions import * # Funções SQL

# SparkSession
spark = SparkSession.builder.appName("NotebookTeste").getOrCreate()
# SparkContext
#sc = SparkContext()

### Extraindo os dados

In [4]:
caminho_texto = "./README.md"
caminho_csv = "./nomes.csv"
#nomes = spark.read.format("csv").load(caminho_csv)

## Job 1 | Contagem de palavras

### Leitura do arquivo

In [5]:
def ler_arquivo(caminho):
    try:
        with open(caminho, "r") as arquivo:
            return list(enumerate(map(str.strip, arquivo), start=1))
    except FileNotFoundError:
        return []


## Transformações 

### Funções e Classes

#### WordMapper Class

- A função irá rodar uma só vez por cada linha do arquivo de texto; 
- **Input:** {"num_da_linha": "Texto da linha"}
- **Output:** {"palavra": "quantidade"}

In [6]:
# import sys

class WordMapper:
    def __init__(self):
        pass

    def map(self, line):
        words = line.split()
        for word in words:
            print(f"{word}\t1")

# Exemplo de uso
mapper = WordMapper()
for line in sys.stdin:
    mapper.map(line)

#### WordReducer Class

- A função irá rodar uma só vez por cada par de chave-valor ordenado pelo Hadoop;
- **Input:** {"palavra": "lista_de_frequencia"}
- **Output:** {"palavra": "contagem"}

In [7]:
# import sys

class WordReducer:
    def __init__(self):
        self.last_key = None
        self.count = 0

    def reduce(self, key, value):
        if self.last_key and self.last_key != key:
            print(f"{self.last_key}\t{self.count}")
            self.last_key, self.count = key, int(value)
        else:
            self.last_key, self.count = key, self.count + int(value)

    def finalize(self):
        if self.last_key:
            print(f"{self.last_key}\t{self.count}")

# Exemplo de uso
reducer = WordReducer()
for line in sys.stdin:
    key, value = line.strip().split("\t")
    reducer.reduce(key, value)

reducer.finalize()


#### Limpeza das linhas

In [8]:
def processar_linha(linha):
    mapeamento = {}
    for palavra in [
        re.sub(r'[^a-z]', '', unidecode(w.lower()))
        for w in re.findall(r'\b\w+\b', unidecode(linha.lower()))
    ]:
        if palavra:
            mapeamento[palavra] = mapeamento.get(palavra, 0) + 1
    return mapeamento

### Armazenamento de dados em um DataFrame

#### Migrando o conteúdo do arquivo de texto para um RDD

In [29]:
# Ler o arquivo | SparkContext (RDD)
linhas = sc.textFile(caminho_texto)
#linhas.collect()
# Limpeza das linhas
palavras = linhas.flatMap(lambda linha: linha.strip().lower.split())
# Contagem das palavras
contagem = palavras.map(lambda palavra: (palavra, 1))
# Redução das quantidades
reduzido = contagem.reduceByKey(lambda a,b: a + b)
dados = reduzido
colunas = ["Palavra", "Quantidade"]
df = spark.createDataFrame(dados, colunas)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 7) (0fa2e790908f executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1237, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 840, in func
    return f(iterator)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 3983, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_31136/2264323974.py", line 5, in <lambda>
AttributeError: 'builtin_function_or_method' object has no attribute 'split'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1237, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 840, in func
    return f(iterator)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 3983, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_31136/2264323974.py", line 5, in <lambda>
AttributeError: 'builtin_function_or_method' object has no attribute 'split'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [None]:
# Ler o arquivo | 
#df = pd.DataFrame(mapeamento_palavras, columns = ["Palavra","Quantidade"])
#df

#### Armazenamento de dados em um DataFrame | Pandas

In [None]:
# Criar um DataFrame do Spark
colunas = ["Palavra", "Quantidade"]
df = spark.createDataFrame(mapeamento_palavras, colunas)


## Job 2 | AWS Glue
- [] Alterar os valores da coluna `nome` para maiúsculo;
- [] Contagem das linhas presentes no DataFrame;
- [] Contagem de nomes agrupados por `ano` e `sexo`, ordenados por `ano` decrescente;
- [] Nome feminino mais registrado e o respectivo `ano`;
- [] Nome masculino mais registrado e o respectivo `ano`;
- [] Total de registros para cada `ano`. Apresentar as 10 primeiras linhas, ordenados por `ano` crescente;

### Transformações | Job 2

## Job 3 | Coleta in Batch

- [] importar a lib boto3
- [] passar as credenciais ao container `spark_jupyter`
- [] ler 2 arquivos em .csv sem filtrar os dados
- [] carregar os dados para o S3
- [] acessar a AWS e grava no S3, no bucket definido com RAW Zone

>      - no momento da gravação dos dados deve-se considerar o padrão: <nome do bucket>\<camada de armazenamento>\<origem do dado>\<formato do dado>\<especificação do dado>\<data de processamento separada por ano\mes\dia>\<arquivo>
>
>            Por exemplo:
>
>                   S3:\\data-lake-do-fulano\Raw\Local\CSV\Movies\2022\05\02\movies.csv
>
>                   S3:\\data-lake-do-fulano\Raw\Local\CSV\Series\2022\05\02\series.csv
>

- [] criar um container Docker com volume para (i) armazenar os arquivos.csv e (ii) executar o job.py

### Transformações | Job 3

### Escrevendo o resultado

In [None]:
caminho_saida = "./outputs"
resultado.write.format("csv").save(caminho_saida)

### Finalizando a sessão

In [None]:
spark.stop()