<a href="https://colab.research.google.com/github/Homura-san/pyspark/blob/main/Pyspark_ElasticSearch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Instalando o Pyspark**

In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
# Fazendo download
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [5]:
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [6]:
# instalando a findspark
!pip install -q findspark

In [7]:
# Importando a findspark
import findspark

# Iniciando o findspark
findspark.init()

In [8]:
# importando o pacote necessário para iniciar uma seção Spark
from pyspark.sql import SparkSession

# iniciando o spark context
sc = SparkSession.builder.master('local[*]').getOrCreate()

# Verificando se a sessão foi criada
sc

# **Instalando o ElasticSearch**

In [9]:
!pip install tensorflow-io
!pip install elasticsearch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow-io
  Downloading tensorflow_io-0.32.0-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (28.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m28.0/28.0 MB[0m [31m44.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow-io
Successfully installed tensorflow-io-0.32.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting elasticsearch
  Downloading elasticsearch-8.7.0-py3-none-any.whl (387 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m387.9/387.9 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting elastic-transport<9,>=8
  Downloading elastic_transport-8.4.0-py3-none-any.whl (59 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.5/59.5 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: el

In [10]:
import os
import time
from sklearn.model_selection import train_test_split
from elasticsearch import Elasticsearch
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing
import tensorflow_io as tfio

### Validando imports do tf e tfio

In [11]:
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))

tensorflow-io version: 0.32.0
tensorflow version: 2.12.0


## Baixando e instalando a instância do Elasticsearch

In [12]:
%%bash

wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512
tar -xzf elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
sudo chown -R daemon:daemon elasticsearch-7.9.2/
shasum -a 512 -c elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512 

elasticsearch-oss-7.9.2-linux-x86_64.tar.gz: OK


In [13]:
%%bash --bg

sudo -H -u daemon elasticsearch-7.9.2/bin/elasticsearch

In [14]:
# Sleep for few seconds to let the instance start.
time.sleep(20)

In [15]:
%%bash

ps -ef | grep elasticsearch

root        4083    4081  0 20:50 ?        00:00:00 sudo -H -u daemon elasticsearch-7.9.2/bin/elasticsearch
daemon      4084    4083 57 20:50 ?        00:00:20 /content/elasticsearch-7.9.2/jdk/bin/java -Xshare:auto -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djna.nosys=true -XX:-OmitStackTraceInFastThrow -XX:+ShowCodeDetailsInExceptionMessages -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 -Dio.netty.allocator.numDirectArenas=0 -Dlog4j.shutdownHookEnabled=false -Dlog4j2.disable.jmx=true -Djava.locale.providers=SPI,COMPAT -Xms1g -Xmx1g -XX:+UseG1GC -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -Djava.io.tmpdir=/tmp/elasticsearch-4283411181427021249 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=data -XX:ErrorFile=logs/hs_err_pid%p.log -Xlog:gc*,gc+age=trace,safepoint:file=logs/gc.log:utctime,pid,tags:filecou

In [16]:
%%bash

curl -sX GET "localhost:9200/"

{
  "name" : "6068509ea69f",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "vdS6XdZeRLaxOzXGKFm2PA",
  "version" : {
    "number" : "7.9.2",
    "build_flavor" : "oss",
    "build_type" : "tar",
    "build_hash" : "d34da0ea4a966c4e49417f2da2f244e3e97b4e6e",
    "build_date" : "2020-09-23T00:45:33.626720Z",
    "build_snapshot" : false,
    "lucene_version" : "8.6.2",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}


# **Importando Arquivo para teste**

In [None]:
# Fazendo download do arquivo
!wget --verbose --show-progress --no-check-certificate https://raw.githubusercontent.com/jonates/opendata/master/receita_federal/receita_federal_arrecadacao_por_UF_2020.csv

--2023-04-04 13:46:38--  https://raw.githubusercontent.com/jonates/opendata/master/receita_federal/receita_federal_arrecadacao_por_UF_2020.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6216 (6.1K) [text/plain]
Saving to: ‘receita_federal_arrecadacao_por_UF_2020.csv’


2023-04-04 13:46:38 (54.8 MB/s) - ‘receita_federal_arrecadacao_por_UF_2020.csv’ saved [6216/6216]



In [None]:
# carregando um conjunto de dados que baixamos da internet
receitafederal = sc.read.csv(
    path = "/content/receita_federal_arrecadacao_por_UF_2020.csv", 
    inferSchema = True, 
    header = True,
    sep = ';', 
    encoding = "UTF-8")

In [None]:
# Verificando o tipo de objeto criado
type(receitafederal)

pyspark.sql.dataframe.DataFrame

In [2]:
# Espiando o dataset
receitafederal.show()

NameError: ignored

In [None]:
# Verificando o schema() deste sparkdataframe
receitafederal.printSchema()

root
 |-- uf: string (nullable = true)
 |-- regiao: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- imposto_sobre_importacao: string (nullable = true)
 |-- imposto_sobre_exportacao: string (nullable = true)
 |-- ipi_total: string (nullable = true)
 |-- imposto_sobre_a_renda_total: string (nullable = true)
 |-- irpf: string (nullable = true)
 |-- irpj: string (nullable = true)
 |-- imposto_s_renda_retido_na_fonte: string (nullable = true)
 |-- imposto_s_operacoes_financeiras: string (nullable = true)
 |-- imposto_territorial_rural: string (nullable = true)
 |-- cofins: string (nullable = true)
 |-- contribuicao_para_o_pis_pasep: string (nullable = true)
 |-- csll: string (nullable = true)
 |-- cide_combustiveis: string (nullable = true)
 |-- cpsss_contrib_p_o_plano_de_segurid_social_serv_publico: string (nullable = true)
 |-- outras_receitas_administradas: string (nullable = true)



In [None]:
# importando os métodos com funções para transformações de variáveis
from pyspark.sql.functions import *


In [None]:
# Transformando o atributo irpf em numerica
receitafederal = receitafederal.withColumn(
    colName = 'irpf', 
    col = regexp_replace('irpf',',','.').cast('float')
    )

# Inspecionando o resultado
receitafederal.select('irpf').printSchema()

root
 |-- irpf: float (nullable = true)



In [None]:
# Verificando o total do irpf por Região do Brasil
receitafederal.groupBy('regiao').sum('irpf').orderBy('regiao').show()

+------------+--------------+
|      regiao|     sum(irpf)|
+------------+--------------+
|Centro-Oeste| 3.354157696E9|
|    Nordeste| 4.303029696E9|
|       Norte| 1.404179308E9|
|     Sudeste|2.496098528E10|
|         Sul| 7.380957184E9|
|       Total|4.140331008E10|
+------------+--------------+

