### **Explorando com Spark**

**Objetivo: Transformação de dados (API do Twitter) com Spark**

Essa primeira etapa consiste em:

- **Instalar o PySpark e ler a base de dados brutos**;

A instalação do PySpark é simples, mas deve prestar atenção aos pré-requisitos e versões utilizadas para não ter problema ao rodar alguns comandos.

Depois de instalado, é possível ler os dados utilizando o SparkSession, onde é possível checar o tipo dos dados, que no caso é json, e a localização, que é dentro do Data Lake. A exploração fica por conta dos métodos show e printSchema.

- **Identificar os dados que devem ser extraídos e simplificados**;

Com os dados lidos e explorados, é possível identificar quais são os dados essenciais. No caso, existem os metadados que não vão ter utilidade para análise, então podem ser ignorados. Já os dados dos tweets e usuários são as informações mais importantes e precisam ser separadas, simplificando assim a leitura dos dados.

- **Extrair e salvar os dados simplificados**;

Utilizando os métodos select e explode é feita a extração dos dados de tweets e users, podendo salvá-los em um novo DataFrame. Com esses dois DataFrames a informação fica muito mais simples de ler e torna possível que a análise e decisão se vai ler ambos os dados ou não.

- **Transformar todo esse processo em um script .py**.

A última etapa é muito importante, pois é quando todas as decisões presentes nesse Jupyter Notebook são transformadas em um script que será incluso no pipeline orquestrado pelo Airflow. Para testar o Script, será utilizado a solução do SparkSubmit, que permite rodar código Spark de maneira simples e gerenciada pelo próprio Spark.

In [1]:
# Spark necessita do Java instalado na máquina para funcionar
# no terminal: java -version
# Caso não tenha instalado, no terminal: sudo apt-get install openjdk-8-jdk-headless -qq 

# pip install pyspark==3.3.1

Collecting pyspark==3.3.1
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[?25hUsing legacy 'setup.py install' for pyspark, since package 'wheel' is not installed.
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninstalling py4j-0.10.9.7:
      Successfully uninstalled py4j-0.10.9.7
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.1
    Uninstalling pyspark-3.5.1:
      Successfully uninstalled pyspark-3.5.1
  Running setup.py install for pyspark ... [?25ldone
[?25hSuccessfully install

In [2]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession\
    .builder\
    .appName("twitter_transformation")\
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR") # reduce the amount of log output

In [5]:
import os

In [6]:
# twitter_datascience tem subfolders com diferentes datas de extração
parent_folder = "../../datalake/twitter_datascience"
subfolders = [os.path.join(parent_folder, folder) for folder in os.listdir(parent_folder) if os.path.isdir(os.path.join(parent_folder, folder))]

df = spark.read.json(subfolders)

                                                                                

In [7]:
df.show()

+--------------------+--------------------+------------------+
|                data|            includes|              meta|
+--------------------+--------------------+------------------+
|[{44, 65, 2024-06...|{[{2024-06-02T19:...|{1234567890abcdef}|
|[{71, 91, 2024-06...|{[{2024-06-02T06:...|{1234567890abcdef}|
|[{44, 48, 2024-06...|{[{2024-06-02T05:...|{1234567890abcdef}|
|[{25, 98, 2024-06...|{[{2024-06-02T16:...|{1234567890abcdef}|
|[{23, 80, 2024-06...|{[{2024-06-02T07:...|{1234567890abcdef}|
|[{82, 74, 2024-06...|{[{2024-06-02T04:...|              null|
|[{90, 84, 2024-06...|{[{2024-06-01T14:...|{1234567890abcdef}|
|[{8, 54, 2024-06-...|{[{2024-06-01T01:...|{1234567890abcdef}|
|[{46, 97, 2024-06...|{[{2024-06-01T01:...|              null|
|[{48, 67, 2024-05...|{[{2024-05-31T03:...|{1234567890abcdef}|
|[{93, 43, 2024-05...|{[{2024-05-31T01:...|              null|
|[{1, 27, 2024-05-...|{[{2024-05-29T22:...|              null|
|[{34, 3, 2024-05-...|{[{2024-05-28T00:...|            

In [13]:
df.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: string (nullable = true)
 |    |    |-- conversation_id: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- edit_history_tweet_ids: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- in_reply_to_user_id: string (nullable = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- public_metrics: struct (nullable = true)
 |    |    |    |-- like_count: long (nullable = true)
 |    |    |    |-- quote_count: long (nullable = true)
 |    |    |    |-- reply_count: long (nullable = true)
 |    |    |    |-- retweet_count: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- includes: struct (nullable = true)
 |    |-- users: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- c

In [8]:
# Etapa 1: Extrair informações da coluna data

from pyspark.sql import functions as f

In [9]:
# Transforma as informações da coluna data em linhas dentro do df atual

df.select(f.explode('data')).printSchema()

root
 |-- col: struct (nullable = true)
 |    |-- author_id: string (nullable = true)
 |    |-- conversation_id: string (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- edit_history_tweet_ids: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- id: string (nullable = true)
 |    |-- in_reply_to_user_id: string (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- public_metrics: struct (nullable = true)
 |    |    |-- like_count: long (nullable = true)
 |    |    |-- quote_count: long (nullable = true)
 |    |    |-- reply_count: long (nullable = true)
 |    |    |-- retweet_count: long (nullable = true)
 |    |-- text: string (nullable = true)



In [16]:
df.select(f.explode('data')).show()

+--------------------+
|                 col|
+--------------------+
|{44, 65, 2024-06-...|
|{78, 46, 2024-06-...|
|{76, 98, 2024-06-...|
|{88, 47, 2024-06-...|
|{66, 2, 2024-06-0...|
|{78, 86, 2024-06-...|
|{37, 89, 2024-06-...|
|{62, 8, 2024-06-0...|
|{34, 29, 2024-06-...|
|{53, 45, 2024-06-...|
|{71, 91, 2024-06-...|
|{6, 47, 2024-06-0...|
|{95, 99, 2024-06-...|
|{13, 32, 2024-06-...|
|{99, 97, 2024-06-...|
|{5, 52, 2024-06-0...|
|{40, 55, 2024-06-...|
|{26, 55, 2024-06-...|
|{10, 95, 2024-06-...|
|{95, 64, 2024-06-...|
+--------------------+
only showing top 20 rows



> Ainda tem apenas uma única coluna com um json de conteúdo das rows.

In [10]:
# Transformação dos dados em colunas

df.select(f.explode('data').alias('tweets')).select("tweets.author_id", "tweets.conversation_id",
        "tweets.created_at", "tweets.id",
        "tweets.public_metrics.*", "tweets.text").show()

# public_metrics recebe * pois ainda não está com uma estrutura simplificada e isso permite acessar todos os campos que pertencem a essa métrica

+---------+---------------+--------------------+---+----------+-----------+-----------+-------------+--------------------+
|author_id|conversation_id|          created_at| id|like_count|quote_count|reply_count|retweet_count|                text|
+---------+---------------+--------------------+---+----------+-----------+-----------+-------------+--------------------+
|       44|             65|2024-06-02T21:15:...| 58|        99|         10|         25|           68|Este é um tweet f...|
|       78|             46|2024-06-02T17:40:...| 14|        57|         84|         59|           30|Tweet fictício ge...|
|       76|             98|2024-06-02T15:10:...| 16|        75|         35|         91|           40|Tweet fictício ge...|
|       88|             47|2024-06-02T09:58:...| 10|        42|         26|         42|           10|Um terceiro tweet...|
|       66|              2|2024-06-02T18:36:...|  8|        67|         96|         95|           43|Tweet fictício ge...|
|       78|     

In [11]:
df.select(f.explode("data").alias("tweets"))\
.select("tweets.author_id", "tweets.conversation_id",
        "tweets.created_at", "tweets.id",
        "tweets.public_metrics.*", "tweets.text").printSchema()

root
 |-- author_id: string (nullable = true)
 |-- conversation_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- like_count: long (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- text: string (nullable = true)



In [12]:
# salva as informações em uma variável
tweet_df = df.select(f.explode("data").alias("tweets"))\
.select("tweets.author_id", "tweets.conversation_id",
        "tweets.created_at", "tweets.id",
        "tweets.public_metrics.*", "tweets.text")

In [13]:
# json -> dataframe: mais fácil e simples para acessar os dados
tweet_df.show(5)

+---------+---------------+--------------------+---+----------+-----------+-----------+-------------+--------------------+
|author_id|conversation_id|          created_at| id|like_count|quote_count|reply_count|retweet_count|                text|
+---------+---------------+--------------------+---+----------+-----------+-----------+-------------+--------------------+
|       44|             65|2024-06-02T21:15:...| 58|        99|         10|         25|           68|Este é um tweet f...|
|       78|             46|2024-06-02T17:40:...| 14|        57|         84|         59|           30|Tweet fictício ge...|
|       76|             98|2024-06-02T15:10:...| 16|        75|         35|         91|           40|Tweet fictício ge...|
|       88|             47|2024-06-02T09:58:...| 10|        42|         26|         42|           10|Um terceiro tweet...|
|       66|              2|2024-06-02T18:36:...|  8|        67|         96|         95|           43|Tweet fictício ge...|
+---------+-----

**Criando um novo DataFrame para os dados dos usuários que fizeram esses tweets**

In [19]:
# Transformar a lista de usuários em linhas do DataFrame
df.select(f.explode("includes.users")).show()

+--------------------+
|                 col|
+--------------------+
|{2024-06-02T19:47...|
|{2024-06-02T16:33...|
|{2024-06-02T09:12...|
|{2024-06-02T05:57...|
|{2024-06-02T11:53...|
|{2024-06-02T17:45...|
|{2024-06-02T02:00...|
|{2024-06-02T19:17...|
|{2024-06-02T20:51...|
|{2024-06-02T17:41...|
|{2024-06-02T06:50...|
|{2024-06-02T16:59...|
|{2024-06-02T05:56...|
|{2024-06-02T17:41...|
|{2024-06-02T21:52...|
|{2024-06-02T21:54...|
|{2024-06-02T09:21...|
|{2024-06-02T09:26...|
|{2024-06-02T04:17...|
|{2024-06-02T18:16...|
+--------------------+
only showing top 20 rows



In [20]:
# Nomeando a nova coluna de usuário usando o alias
df.select(f.explode("includes.users").alias("users")).show()

+--------------------+
|               users|
+--------------------+
|{2024-06-02T19:47...|
|{2024-06-02T16:33...|
|{2024-06-02T09:12...|
|{2024-06-02T05:57...|
|{2024-06-02T11:53...|
|{2024-06-02T17:45...|
|{2024-06-02T02:00...|
|{2024-06-02T19:17...|
|{2024-06-02T20:51...|
|{2024-06-02T17:41...|
|{2024-06-02T06:50...|
|{2024-06-02T16:59...|
|{2024-06-02T05:56...|
|{2024-06-02T17:41...|
|{2024-06-02T21:52...|
|{2024-06-02T21:54...|
|{2024-06-02T09:21...|
|{2024-06-02T09:26...|
|{2024-06-02T04:17...|
|{2024-06-02T18:16...|
+--------------------+
only showing top 20 rows



In [16]:
# Utiliza o select e * para selecionar todos os campos de informações dos usuários
# printSchema para verificar o formato do DataFrame
df.select(f.explode("includes.users").alias("users")).select("users.*").printSchema()

root
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)



In [17]:
# Salva em uma variável
user_df = df.select(f.explode("includes.users").alias("users")).select("users.*")

In [18]:
user_df.show(5)

+--------------------+---+------+--------+
|          created_at| id|  name|username|
+--------------------+---+------+--------+
|2024-06-02T19:47:...| 38|User 1|   user1|
|2024-06-02T16:33:...|  9|User 2|   user2|
|2024-06-02T09:12:...| 58|User 3|   user3|
|2024-06-02T05:57:...| 53|User 4|   user4|
|2024-06-02T11:53:...|  2|User 5|   user5|
+--------------------+---+------+--------+
only showing top 5 rows



In [21]:
# Salvando as estruturas criadas
tweet_df.coalesce(1).write.mode("overwrite").json('output/tweet')
user_df.coalesce(1).write.mode("overwrite").json('output/user')

                                                                                

O método `.coalesce(1)` com o parâmetro 1 é responsável por pegar as informações que o Spark quebra em pedaços (nodes) e as une permitindo que sejam salvas em um só arquivo.

> Com um grande volume de dados é preferível salvar eles em diversos arquivos para não estourar a memória do node.

### **Preparando o ambiente: instalando Spark e SparkSubmit**

O Spark precisa estar na versão 3.1.3 e a versão 3.2 do Hadoop. No terminal, pelo comando **wget** com a URL de download:

`wget https://archive.apache.org/dist/spark/spark-3.1.3/spark-3.1.3-bin-hadoop3.2.tgz`

Depois de terminado o download é necessário descompactar o arquivo através do comando tar, passando por parâmetros as configurações -xvzf e o nome do arquivo.

`tar -xvzf spark-3.1.3-bin-hadoop3.2.tgz`

A partir disso já é possível gerar o script desenvolvido (transformation.py). Para isso é necessário entrar no dir descompactado (usar comando ls para verificar ele para então usar o comando cd).

`cd spark-3.1.3-bin-hadoop3.2`

`./bin/spark-submit` (executa no terminal, dentro da pasta descompactado, o caminho bin e o arquivo spark-submit)

> Esse executável recebe como parâmetro a localização do script a ser executado, que no caso é o caminho até o arquivo transformation.py.

`./bin/spark-submit …/src/spark/transformation.py`

**Também é preciso passar mais três parâmetros, _que são pré requisitos do script criado_. No caso os parâmetros são: src, destino e process-date.**

`./bin/spark-submit …/src/spark/transformation.py --src --destino --process-date`

- Para o src coloca o caminho até os dados brutos no data lake (parent folder com subdiretórios);
- Para o destino coloca dentro de uma pasta temporária de output;
- O process-date pode colocar a data em que está rodando o processo.

`./bin/spark-submit …/src/spark/transformation.py --src …/datalake/twitter_datasciencie --destino …/src/spark --process-date 2024-07-04`

### **Fazendo o Airflow se comunicar com o Spark Submit:**

- Instalar o modulo Spark;
- Utilizar o SparkSubmitOperator;
- Atualizar uma DAG;
- Salvar os dados transformados pelo Spark.

Com o ambiente virtual ativo, é preciso executar o seguinte código para instalar o operador:

`pip install apache-airflow-providers-apache-spark`

Ao final da instação é possível atualizar a DAG de interesse do Airflow com a seguinte importação:

`from airflow.providers.apache.spark.operators.spark_submit`

### **Inicializando o Airflow:**

**IMPORTANTE**: Exportar a segunda variável de ambiente:

`export SPARK_HOME=/home/pacer/Documents/Cursos-Alura/spark-3.1.3-bin-hadoop3.2` (passar onde está salvo essa pasta do Spark)

`airflow standalone`

Quando o Airflow iniciar (na porta :8080 - "localhost:8080/home").

Em **"Admin"**, na barra superior, seleciona a opção **"Connections"**, mas, ao invés de criar uma nova conexão, é possível apenas **editar** uma já existente: a do **Spark**, que foi *trazida com a instalação dos provides do Spark*.

Usa o "Ctrl + F", digite "spark" para buscar essa ocorrência na listagem e selecionar o botão "Edit record" à esquerda de "spark_default". O nome deve permanecer "spark_default"; enquanto o tipo deve ser "Spark"; e o host, "local". Após isso, selecionar salvar.