In [1]:
"""
    -BC17 - Engenharia de Dados - SOULCODE
    -Projeto_Final 03/06/2022
    -ETL (Extract, Transform, Load)
    -Prof. Igor / Adriano / Bismark
    -Aluno: Aldreks Albuquerque
    -Equipe 2: Aldreks / Carlos Bahia / Jalvo / Marco Aurélio
"""

'\n    -BC17 - Engenharia de Dados - SOULCODE\n    -Projeto_Final 03/06/2022\n    -ETL (Extract, Transform, Load)\n    -Prof. Igor / Adriano / Bismark\n    -Aluno: Aldreks Albuquerque\n    -Equipe 2: Aldreks / Carlos Bahia / Jalvo / Marco Aurélio\n'

## 0-CONFIGURAÇÕES DE AMBIENTE

#### 1-INSTALAÇÕES DE BIBLIOTECAS

In [None]:
## Instalação da Biblioteca para uso do Pandera
# !pip install pandera

In [None]:
## Instalação da Biblioteca Gerenciador de Arquivos do GCP
!pip install gcsfs

In [None]:
## Instalação da Biblioteca para uso do MongoDB Atlas
!pip install pymongo[srv]

In [5]:
## Instalação da Biblioteca pySpark
!pip install pyspark

Collecting py4j==0.10.7
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m197.3/197.3 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: py4j
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.8.1
    Uninstalling py4j-0.10.8.1:
[31mERROR: Could not install packages due to an OSError: [Errno 13] Permission denied: '__init__.cpython-37.pyc'
Consider using the `--user` option or check the permissions.
[0m[31m
[0m

In [6]:
## Instalação da Biblioteca parquet 
!pip install pyarrow



#### 2-CARREGAMENTO DE BIBLIOTECAS

In [7]:
# Carregamento biblioteca Pandas
import pandas as pd
import pandera as pa

#Uso em Parquet
import pyarrow

# Carregamento biblioteca GCP
import gcsfs #acessar GCP
from google.cloud import storage
import os

# Carregamento biblioteca Mongo
import pymongo
from pymongo import MongoClient

# Carregamento biblioteca (tratamento de arquivos)
from bson.json_util import dumps, loads
import csv

# Carregamento biblioteca numpy
import numpy as np


# Definindo limite de linhas e colunas no DF Pandas
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

#Fixa qtde de casas decimais nos campos float no DF Pandas
pd.options.display.float_format = '{:,.8f}'.format 

#### 3-AMBIENTE GCP

##### 3.1-Chave Conexão GCP

In [8]:
#CONFIGURAÇÃO DA CHAVE DE SEGURANÇA DO GCP (ACESSO)
serviceAccount = 'central-point-349020-90861ebe3455.json'

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

##### 3.2-Prepara Importação da Base de Dados direto do Bucket/GCP

In [58]:
#Chave de acesso GCP, JSON, já carregado na aplicação

#Cria conexão com bucket GCP
client = storage.Client()

#Define a pasta no bucket, onde estarão as bases para normalização (Cloud Storage/GCP)
folder_bucket = client.get_bucket('criptomoeda') 

#Define arquivo a ser extraído do bucket/GCP
folder_bucket.blob('Base_Tratada_SparkValorMercadoMaiorZeroF.csv')

#Cria Path do local de origem do arquivo a ser extraído do Bucket/GCP (gsutil URI)
path_tratados = 'gs://criptomoeda/ETL/Base_Tratada_SparkValorMercadoMaiorZeroF.csv'


##### 3.3-Importa Base de Dados e Cria Dataframe PARQUET, Traduzindo Rótulos das Colunas de Inglês para Pt-Br

In [11]:
    
# Preparando tradução dos rótulos das colunas do dataset de inglês para Pt-BR via Pandas
#Rotulo_Ingles = (['ticker', 'TokenName', 'Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Market Cap'])
#Rotulo_PortBR = (['Cod_Empresa', 'Empresa', 'Dt_Negociacao', 'Abertura', 'Max', 'Min', 'Fechamento', 'Volume_Negociado', 'Valor_Mercado_Empresa'])

# Importa base CSV do Bucket/GCP e cria Dataframe em Pandas
# df_pd = pd.read_csv(path_CSV_bucket, sep=',', usecols = Rotulo_Ingles) 

# Aplica a atualização da tradução dos rótulos das colunas
# df_bruto.columns = Rotulo_PortBR

print(">>> Base CSV carregada.")
df_pandas = pd.read_csv(path_tratados, sep=',') 

# Faz cópia do DF para normalização futura
df_pd = df_pandas.copy()

#Converte DF Pandas em arquivo Parquet
print(">>> Salva DF_Pandas em Parquet.")
df_pd.to_parquet('df.parquet')

print(">>> Carrega Base Parquet para DF.")
df_pd = pd.read_parquet('df.parquet')  

print(">>> Base Parquet carregada com sucesso.")

>>> Base CSV carregada.
>>> Salva DF_Pandas em Parquet.
>>> Carrega Base Parquet para DF.
>>> Base Parquet carregada com sucesso.


# 4-TECNOLOGIAS

## 5-PANDAS (DataFrame/Parquet)

##### 5.1 - Leitura e Análise dos Dados (início e fim)

In [12]:
## Verifica a qtde de Colunas e linhas (tamanho da base de dados)
df_pd.shape

(1249976, 10)

In [13]:
# Ordena o DataFrame por Data e demais colunas abaixo
df_pd.sort_values(by=['Dt_Negociacao', 'Cod_Empresa', 'Empresa', 'Volume_Negociado'], inplace=True)

In [None]:
## Checa os primeiros registros
df_pd.head(10)

In [None]:
## Checa os últimos registros
df_pd.tail(10)

##### 5.2 - Checa Duplicidade de Registros

In [None]:
# Verifica total de linhas duplicadas
df_pd.duplicated().sum()

In [None]:
# Exclui registros duplicados
df_pd = df_pd.drop_duplicates()

## Verifica tamanho da base de dados após dropagem
df_pd.shape

##### 5.3 - Analisa Estrutura do DF

In [None]:
#Verifica qtde de dados validos por coluna
df_pd.count()

In [None]:
# Obtém informações detalhadas da estrutura do DF como: 
# tipos de dados por campo, e qtos possuem dados NÂO NULOS, qtde de linhas e de colunas
df_pd.info()

# insight da análise: "Dt_Negociacao deve ser convertida de Object para DateTime"

In [None]:
# Informa a qtde de registros com campos vazios(NA) por coluna
df_pd.isnull().sum()

##### 5.4 - Checa Uniticidade dos Dados

In [None]:
## Checa se há Uniticidade de dados na coluna especificada (True/False)
df_pd.Cod_Empresa.is_unique

In [None]:
# Mostra valores unicos, e testa se há mais de um tipo de dado na mesma coluna.
sorted(pd.unique(df_pd['Volume_Negociado']))

* *** busca dados do tipo caracter especial numa coluna específica

In [None]:
#Checa coluna Empresa por dados únicos em busca de possível dado inconsistente como "*, **, ?, /, //, etc"
lista_de_valores_unicos = list(df_pd['Empresa'].fillna('').unique())

In [None]:
#Visualiza os 15 primeiros registros da lista
lista_de_valores_unicos[0:15]

In [None]:
#Define caracters especiais a serem procurados na coluna empresa
spec_chars = ["!",'"',"#","%","&","'",
              "*","+",",","-",".","/",":",";","<",
              "=",">","?","@","[","\\","]","^","_",
              "`","{","|","}","~","–"]

In [None]:
#Visualiza linhas que possuam caract especiais acima
for valor in lista_de_valores_unicos:
  for char in spec_chars:
    if char in valor:
      print(valor)

In [None]:
#Lista na horizontal os dados filtrados, únicos
# ', '.join(lista_de_valores_unicos)

##### 5.5 - Analisando Dados por colunas, e fazendo Estatística

In [None]:
# Estatística Geral
df_pd[ ["Abertura", "Max", "Min", "Fechamento", "Volume_Negociado", "Valor_Mercado_Empresa"] ].describe()

In [None]:
#Analisa coluna com valores <= 0
df_pd[ df_pd['Volume_Negociado'] <= 0 ].head()

In [None]:
# Analisa coluna com valores <= 0
df_pd[ df_pd['Valor_Mercado_Empresa'] <= 0 ].head()

In [None]:
# Analisa coluna com valores <= 0
df_pd[ df_pd['Abertura'] <= 0 ].head()

In [None]:
# Analisa coluna com valores <= 0
df_pd[ df_pd['Min'] <= 0 ].head()

In [None]:
# Analisa coluna com valores <= 0
df_pd[ df_pd['Max'] <= 0 ].head()

In [None]:
# Analisa coluna com valores <= 0
df_pd[ df_pd['Fechamento'] <= 0 ].head()

In [None]:
# Apresenta estatística no ano determinado
df_2019 = df_pd[["Abertura", "Min", "Max", "Fechamento", "Ano" ]].query('Ano == 2019 ')

df_2019[["Abertura", "Min", "Max", "Fechamento" ]].describe()

##### 5.6 - Tratando Dados Inconsistentes (Conversão de Dados, Exclusão de Coluna/Registro, Formata DATA)

In [None]:
#Converte coluna de object para DateTime (formato Y-M-D)
df_pd['Dt_Negociacao'] = pd.to_datetime(df_pd['Dt_Negociacao'], format="%Y-%m-%d", errors='coerce')

In [None]:
#Altera a ordem da coluna Dt_Negociacao
list_columns = list(df_pd.columns)
list_columns.remove('Dt_Negociacao')
new_list_columns = ['Dt_Negociacao'] + list_columns

#Refaz o DF na ordem correta de colunas
df_pd = df_pd.reindex(new_list_columns, axis=1)


In [None]:
# Visualiza DF atualizado
df_pd.head()

In [None]:
# TRATANDO CAMPO DATA QUE ESTÁ COMO TIPO STRING/OBJECT, mas, no format (Abr d/Y)
serie_testconvertdt = pd.to_datetime(df_pd['Dt_Negociacao'], format="%b %d, %Y", errors='coerce')

In [None]:
# Identica e filtra registros da coluna Data com valor diverso de uma Data
list_value_dterrors = df_pd.loc[serie_testconvertdt.isna(), 'Dt_Negociacao'].unique().tolist()

# Mostra tipos de dados diferente de Data ainda na coluna
list_value_dterrors[0:5]

In [None]:
# Usamos uma "List comprehension" para criar um dicionario de todos os registros com String para substituição por NaN
dict_value_dterros = {value_dterrors: np.nan for value_dterrors in list_value_dterrors}

# Visualiza o Dic criado com o valor adverso ao tipo Date
dict_value_dterros

In [None]:
# Substituindo valores String por NaN
df_pd['Dt_Negociacao'] = df_pd['Dt_Negociacao'].replace(dict_value_dterros)

In [None]:
# Após normalização de toda a coluna, agora Converte-a para datetime
df_pd['Dt_Negociacao'] = pd.to_datetime(df_pd['Dt_Negociacao'],format="%b %d, %Y")

In [None]:
# Ordena o DF pelas colunas especificadas
df_pd.sort_values(by=['Dt_Negociacao', 'Cod_Empresa', 'Empresa', 'Volume_Negociado'], inplace=True)

###########################  FIM DA NORMALIZAÇÃO DA COLUNA DE DATA DO FORMATO BRUTO (Abr d/Y)  ###############################

#### 5.7 - Insights da Análise do DataFrame

In [None]:
"""
*  ANALISANDO O DATA FRAME PANDAS DE CRIPTOMOEDA
  
1. Campo Data_Negociacao: Apresentava dados do tipo String/Object, após a conversão para Date ainda havia dado String. 
  Foi normalizado para DateTime com alguns dados como NaN.

2. Verificamos campos com valor "NaN":

Cod_Negociacao           23554 = 0,95%
Empresa                 144476 = 5,83%
Abertura                   405 = 0,02%
Max                        405 = 0,02%
Min                        405 = 0,02%
Fechamento                 405 = 0,02%
Volume$_Negociado          405 = 0,02%
Valor_Mercado_Empresa   315819 = 12,75%

Total de Registro do DF (DataSet) = 2.477.695

"""

#### 5.8 - Exporta DF Tratado para o GCP

In [None]:
#Exporta DataFrame Tratado para o Bucket/GCP
df_pd.to_csv(path_tratados, index=False)

print(">>>Exportação da base concluída com sucesso.")

#### 5.9 - Plotagem de Dados

###### PLOTAGEM_1 

In [None]:
# MODELO 1
#Monta Serie com as duas colunas, para plotar Valor_Mercado_Empresa por ano (X 1.000.000)
df_Valor_Mercado_Empresa = df_pd[ ["Ano", "Valor_Mercado_Empresa"] ].groupby('Ano')['Valor_Mercado_Empresa'].mean()
df_Valor_Mercado_Empresa_by_milion = df_Valor_Mercado_Empresa/(10**6)

# GRÁFICO
df_Valor_Mercado_Empresa_by_milion.plot.bar( title='ANÁLISE TOTAL VALOR MERCADO DAS EMPRESAS DE CRIPTOMOEDA DE 2013 À 2022/Abr', \
                                            figsize=(11,5), xlabel='Ano', ylabel='Valor_Mercado_Empresa (x 1 mi)', color='blue')

###### PLOTAGEM_2 

In [None]:
# MODELO 2
#Monta Serie com as duas colunas, para plotar volume negociado por ano (X 1.000.000)
df_volume_negociado = df_pd[ ["Ano", "Volume_Negociado"] ].groupby('Ano')['Volume_Negociado'].mean()
df_volume_negociado_by_milion = df_volume_negociado/(10**6)

# GRÁFICO
df_volume_negociado_by_milion.plot.bar( title='ANÁLISE VOLUME NEGOCIADO DE CRIPTOMOEDA DE 2013 À 2022/Abr', \
                                       figsize=(11,5), xlabel='Ano', ylabel='Volume_Negociado (x 1 mi)', color='blue')

###### PLOTAGEM_3

In [None]:
#MODELO 3
#Monta Serie com as duas colunas, para plotar evolução de empresas por ano
serie_ano_empresa_count = df_pd[ ["Ano", "Empresa"] ].groupby(['Ano', 'Empresa'])['Empresa'].count()
serie_ano_empresa_count = serie_ano_empresa_count.rename('count')
df_ano_empresa_count = serie_ano_empresa_count.to_frame().reset_index()
groupby_empresa_count = df_ano_empresa_count[["Ano", "Empresa"]].groupby(['Ano'])['Ano']
serie_empresa_count = groupby_empresa_count.count().rename('Qtde_Empresas')
df_empresa_count = serie_empresa_count.to_frame()
#GRAFICO
df_empresa_count.plot.bar(title='ANÁLISE DO CRESCIMENTO DE EMPRESAS DE CRIPTOMOEDA DE 2013 À 2022/Abr', \
                          figsize=(11,4), xlabel='Ano', ylabel='Qtde_Empresas', color='blue')

## 7.0 - PySPARK

##### 7.1-SPARK - Bibliotecas

In [14]:
#PYSPARK - IMPORTA AS BIBLIOTECAS NECESSÁRIAS
import pyspark.sql.functions as F
from pyspark.sql.functions import avg, round, row_number
from pyspark.sql import SparkSession, Row
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark import SparkConf

#Uso em Parquet
import pyarrow


##### 7.2-SPARK - CONEXÃO SPARKSESSION

In [59]:
#SEÇÃO GCP
spark = (
    SparkSession.builder
                .master ('local')
                .appName('proj_final')  #nome da session para o projeto
                .config('spark.ui.port','4050')
                .config('spark.jars','https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar')
                .getOrCreate()
)

#testa se a conexao com spark foi realizada com sucesso, e ver versão
spark

##### 7.3-SPARK - CRIA SCHEMA - Estrutura do DataFrame em PySpark

In [60]:
#Formando a Estrutura do DataFrame em pySpark
esquema = (
    StructType([                
        StructField('Cod_Empresa', StringType(), True),
        StructField('Empresa', StringType(), True),
        StructField('Dt_Negociacao', DateType(), False),
        StructField('Abertura', FloatType(), True),                
        StructField('Max', FloatType(), True),
        StructField('Min', FloatType(), True),
        StructField('Fechamento', FloatType(), True),
        StructField('Volume_Negociado', FloatType(), True),
        StructField('Valor_Mercado_Empresa', FloatType(), True),
        StructField('Ano', IntegerType(), True)        
    ])
)
print(">>> Schema preparado!")

>>> Schema preparado!


##### 7.4-SPARK - DOWNLOAD Base CSV do GCP/Bucket e Cria DataFrame com StructType/PySpark.
* Cria Arquivo PARQUET
* Carrega Arq PARQUET como DataFrame

In [61]:
# Cria DataFrame com StructType do pySpark, captando a base CSV do GCP/Bucket
df_spark = (
    spark.read.format('csv')
              .option('header', True)
              .option('delimiter', ',')
              .option('inferschema', True)
              .load(path_tratados, schema=esquema)
)  

In [62]:
df_spark.show(10)

+-----------+---------+-------------+--------+--------+--------+----------+----------------+---------------------+----+
|Cod_Empresa|  Empresa|Dt_Negociacao|Abertura|     Max|     Min|Fechamento|Volume_Negociado|Valor_Mercado_Empresa| Ano|
+-----------+---------+-------------+--------+--------+--------+----------+----------------+---------------------+----+
|        ANC| Anoncoin|   2013-12-27|    5.21|    5.39|    4.64|      4.81|         16203.0|            2975246.0|2013|
|        BTC|  Bitcoin|   2013-12-27|  763.28|  777.51|   713.6|    735.07|       4.68627E7|          8.9553951E9|2013|
|        DMD|  Diamond|   2013-12-27|    2.29|     2.5|    1.49|      1.75|          8360.0|             341344.0|2013|
|       DOGE| Dogecoin|   2013-12-27| 6.03E-4|6.282E-4|4.969E-4|  5.219E-4|        477422.0|            8016604.0|2013|
|        LTC| Litecoin|   2013-12-27|   24.81|   25.27|   22.26|     23.27|       3.11122E7|          5.6608806E8|2013|
|        TRC|Terracoin|   2013-12-27|  0

In [None]:
#  SPARK - CRIA Arquivo Parquet

# Converter um DataFrame Pandas, num DataFrame Spark
# spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# df_spark = spark.createDataFrame(df_spark)

# Salva DF Spark em Parquet
df_spark.write.parquet('df_spark.parquet')
print('Arquivo Parquet criado com sucesso.')


In [64]:
# SPARK - Carrega arquivo Parquet em DataFrame Spark
df_spark = spark.read.parquet('df_spark.parquet')

##### 7.5-SPARK - Visualiza o STRUCT TYPE e o DataFrame criado em PySpark

In [44]:
#Visualiza Estrutura do DF Spark
df_spark.dtypes

[('Cod_Empresa', 'string'),
 ('Empresa', 'string'),
 ('Dt_Negociacao', 'date'),
 ('Abertura', 'float'),
 ('Max', 'float'),
 ('Min', 'float'),
 ('Fechamento', 'float'),
 ('Volume_Negociado', 'float'),
 ('Valor_Mercado_Empresa', 'float'),
 ('Ano', 'int')]

In [45]:
#Visualiza o STRUCT TYPE
df_spark.printSchema()

root
 |-- Cod_Empresa: string (nullable = true)
 |-- Empresa: string (nullable = true)
 |-- Dt_Negociacao: date (nullable = true)
 |-- Abertura: float (nullable = true)
 |-- Max: float (nullable = true)
 |-- Min: float (nullable = true)
 |-- Fechamento: float (nullable = true)
 |-- Volume_Negociado: float (nullable = true)
 |-- Valor_Mercado_Empresa: float (nullable = true)
 |-- Ano: integer (nullable = true)



In [66]:
#Visualiza o DataFrame pySpark
df_spark.show(15)

+-----------+--------------------+-------------+--------+--------+--------+----------+----------------+---------------------+----+
|Cod_Empresa|             Empresa|Dt_Negociacao|Abertura|     Max|     Min|Fechamento|Volume_Negociado|Valor_Mercado_Empresa| Ano|
+-----------+--------------------+-------------+--------+--------+--------+----------+----------------+---------------------+----+
|        RSR|      Reserve Rights|   2021-01-04| 0.03263| 0.03521| 0.02686|   0.03066|    2.31849744E8|         2.86691712E8|2021|
|       RUFF|                Ruff|   2021-01-04|0.005884| 0.00857|0.005709|  0.007571|       3069577.0|            7423676.0|2021|
|       RUNE|           THORChain|   2021-01-04|    1.48|    1.59|    1.31|      1.59|     4.1093944E7|         2.51942752E8|2021|
|       SAFE|                Safe|   2021-01-04|  0.2123|  0.2379|  0.1805|    0.2153|        102844.0|            4484890.0|2021|
|   SALTSALT|                null|   2021-01-04|  0.3795|  0.4356|  0.3708|    0.43

In [None]:
#Mostra Quadro Estatístico do DF
df_spark.summary().show()

##### 7.6-SPARK - ALGUMAS ANÁLISES

In [68]:
# Qtde de registros no DataFrame
df_spark.count()

1249976

In [69]:
# date_format() - CONVERTE AS DATAS PARA O PADRÃO BRASILEIRO
df_spark.select(F.col('Dt_Negociacao'), \
                F.date_format(F.col('Dt_Negociacao'), 'dd-MM-yyyy') \
                .alias('Data_BR')).show(5)

+-------------+----------+
|Dt_Negociacao|   Data_BR|
+-------------+----------+
|   2021-01-04|04-01-2021|
|   2021-01-04|04-01-2021|
|   2021-01-04|04-01-2021|
|   2021-01-04|04-01-2021|
|   2021-01-04|04-01-2021|
+-------------+----------+
only showing top 5 rows



In [None]:
# Cria coluna de Ano
df_spark = df_spark.withColumn("Ano",  F.year(F.col('Dt_Negociacao')))

In [None]:
# Visualiza os 5 primeiros registros (com a nova coluna)
df_spark.show(5)

In [74]:
# Verificando o maior Volume_Negociado
df_spark.select(F.max("Volume_Negociado").alias("Maior_Volume_Negociado_US$")).show()

+--------------------------+
|Maior_Volume_Negociado_US$|
+--------------------------+
|             3.50967955E11|
+--------------------------+



In [76]:
# Verificando o maior Valor_Mercado_Empresa
df_spark.select(F.max("Valor_Mercado_Empresa").alias("Maior_Valor_Mercado_Empresa_US$")).show()

+-------------------------------+
|Maior_Valor_Mercado_Empresa_US$|
+-------------------------------+
|                   6.5313069E13|
+-------------------------------+



In [75]:
# Verificando o menor Volume$_Negociado
df_spark.select(F.min("Volume_Negociado").alias("Menor_Volume_Negociado_US$")).show()

+--------------------------+
|Menor_Volume_Negociado_US$|
+--------------------------+
|                       1.0|
+--------------------------+



In [77]:
# Mostra as Empresas TOP 10 de Valor_Mercado_Empresa, 
# em ordem de descrescente de valor, no acumulado de todos os anos
df_spark.groupBy('Empresa').sum('Valor_Mercado_Empresa') \
    .orderBy(F.col('sum(Valor_Mercado_Empresa)') \
    .desc()).show(10)



+------------+--------------------------+
|     Empresa|sum(Valor_Mercado_Empresa)|
+------------+--------------------------+
|     Bitcoin|       6.21429177312781E14|
|    Ethereum|       2.07008240644544E14|
|        null|       1.02992941407779E14|
|       Terra|          7.23411946263E13|
|    TerraKRW|        6.5344932432596E13|
|      Tether|        3.6155192978193E13|
|     Cardano|         2.559830057648E13|
|Bitcoin Cash|        1.6216749557504E13|
|    USD Coin|        1.4850736000834E13|
|    Polkadot|         1.306589232384E13|
+------------+--------------------------+
only showing top 10 rows



                                                                                

In [78]:
# Seleciona colunas especificas, ordenando Valor de Mercado do Maior pro Menor
df_spark.select("Dt_Negociacao", "Empresa", 'Volume_Negociado', 'Valor_Mercado_Empresa') \
    .orderBy(F.col('Valor_Mercado_Empresa') \
    .desc(), "Dt_Negociacao", 'Empresa' ).show(10)

+-------------+--------+----------------+---------------------+
|Dt_Negociacao| Empresa|Volume_Negociado|Valor_Mercado_Empresa|
+-------------+--------+----------------+---------------------+
|   2020-01-15|   Terra|          6092.0|         6.5313069E13|
|   2020-01-15|TerraKRW|          6092.0|         6.5313069E13|
|   2021-11-08| Bitcoin|    4.1125609E10|        1.27483144E12|
|   2021-11-09| Bitcoin|    4.2357993E10|        1.26366712E12|
|   2021-10-20| Bitcoin|    4.0788955E10|        1.24392741E12|
|   2021-11-14| Bitcoin|   2.51220931E10|        1.23557904E12|
|   2021-11-10| Bitcoin|    4.8730829E10|        1.22643153E12|
|   2021-11-11| Bitcoin|    3.5880632E10|        1.22564313E12|
|   2021-11-13| Bitcoin|   3.04742298E10|        1.21669511E12|
|   2021-10-19| Bitcoin|    4.0471196E10|        1.21124251E12|
+-------------+--------+----------------+---------------------+
only showing top 10 rows



In [None]:
# Lista registros com valor de Abertura <= Zero (0)
df_spark.select(  F.col('Cod_Empresa'), F.col('Empresa'), round(F.col('Abertura'),5), round(F.col('Fechamento'),5) ) \
    .where(F.col('Abertura') <= 0) \
    .orderBy( F.col('Abertura').desc()).show(20)

In [None]:
# Agrupa informações, conta e Ordena descrescente
df_spark.groupBy( F.col("Empresa") ).count().orderBy(F.col("Empresa").desc()).show(10);

In [None]:
# SPARK (SELECT, DISTINCT, ORDERBY, ASC, DESC, F.COL) SELECIONANDO VALORES DISTINTOS ORDENADOS ALFABETICAMENTE
df_spark.select( 'Empresa', 'Valor_Mercado_Empresa').distinct().orderBy(F.col('Valor_Mercado_Empresa').asc()).show(8)

In [79]:
# Algumas Análises sobre o Min e Max
df_spark.select( F.col('Cod_Empresa'), F.col('Empresa'), F.col('Abertura'), F.col('Max'), \
        F.col('Min'), F.col('Fechamento'), F.col('Volume_Negociado'), F.col('Valor_Mercado_Empresa') ) \
    .orderBy(F.col('Min'), F.col('Max')).show(5)

+-----------+-------+--------+-------+-------+----------+----------------+---------------------+
|Cod_Empresa|Empresa|Abertura|    Max|    Min|Fechamento|Volume_Negociado|Valor_Mercado_Empresa|
+-----------+-------+--------+-------+-------+----------+----------------+---------------------+
|      SPORE|  Spore| 2.7E-11|3.3E-11|2.5E-11|   2.9E-11|          3310.0|            1042488.0|
|      SPORE|  Spore| 3.2E-11|3.2E-11|2.7E-11|   2.7E-11|          1765.0|             967044.0|
|      SPORE|  Spore| 2.9E-11|3.1E-11|2.9E-11|   3.1E-11|          3281.0|            1097600.0|
|      SPORE|  Spore| 3.1E-11|3.2E-11|3.0E-11|   3.2E-11|          2249.0|            1126016.0|
|      SPORE|  Spore| 3.2E-11|3.3E-11|3.0E-11|   3.1E-11|          2551.0|            1087170.0|
+-----------+-------+--------+-------+-------+----------+----------------+---------------------+
only showing top 5 rows



                                                                                

In [None]:
# Seleciona colunas específicas e filtra valores de uma data determinada
data='2022-01-07'
df_spark.select("Empresa","Cod_Empresa", "Dt_Negociacao").filter(F.col("Dt_Negociacao") == data).show(5)

In [None]:
# Seleciona valores condicionais com Filter
df_spark.filter( (F.col('Abertura') > 0.0) & (F.col('Fechamento') > 0.0) & (F.col('Volume_Negociado') > 0.0) ).show(5)

In [81]:
# Seleciona por período de data_negociação e cod_empresa específicas (com SELECT, FILTER)
df_spark.select( F.col("Dt_Negociacao"), F.col("Cod_Empresa"),  F.col("Empresa") ) \
    .filter(F.col("Cod_Empresa") == 'AVA') \
    .filter(F.col("Dt_Negociacao") >= '2021-01-01').show(5)

+-------------+-----------+-----------+
|Dt_Negociacao|Cod_Empresa|    Empresa|
+-------------+-----------+-----------+
|   2021-01-05|        AVA|Travala.com|
|   2021-01-06|        AVA|Travala.com|
|   2021-01-07|        AVA|Travala.com|
|   2021-01-08|        AVA|Travala.com|
|   2021-01-09|        AVA|Travala.com|
+-------------+-----------+-----------+
only showing top 5 rows



In [85]:
# Seleciona período de data_negociação e codigo_empresa específicas com condicional (WHERE)
df_spark.select( F.col("Dt_Negociacao"), F.col("Cod_Empresa"),  F.col("Empresa"), \
                F.col("Volume_Negociado"), F.col("Valor_Mercado_Empresa") ) \
      .where( ((F.col("Dt_Negociacao") >= '2022-01-01') & (F.col("Dt_Negociacao") <= '2022-05-31' )) \
             & (F.col("Cod_Empresa") < 'BBB') ) \
      .orderBy(F.col('Dt_Negociacao').desc()).show(10)

[Stage 49:>                                                         (0 + 4) / 4]

+-------------+-----------+------------------+----------------+---------------------+
|Dt_Negociacao|Cod_Empresa|           Empresa|Volume_Negociado|Valor_Mercado_Empresa|
+-------------+-----------+------------------+----------------+---------------------+
|   2022-05-01|       AAVE|              Aave|     2.1493816E8|         2.00780134E9|
|   2022-05-01|        ADD|           Add.xyz|             2.0|             829701.0|
|   2022-05-01|        ABT|          Arcblock|        967788.0|          1.3024856E7|
|   2022-05-01|        ACA|       Acala Token|     1.5200707E7|         3.59167968E8|
|   2022-05-01|   ACENTACE|              null|       1318497.0|            5906972.0|
|   2022-05-01|        ACK|       AcknoLedger|         14140.0|             959150.0|
|   2022-05-01|        ACM|AC Milan Fan Token|     1.8003992E7|          1.8066448E7|
|   2022-05-01|        ACT|            Achain|        783204.0|            5079028.0|
|   2022-05-01|        ADA|           Cardano|     9.2

                                                                                

In [86]:
##  FUNÇÂO DE AGREGAÇÂO - AGG

#Médias dos dados numéricos
tot_gastos = (df_spark.agg(
              {'Abertura': 'avg', 
              'Min': 'avg',
              'Max': 'avg',
              'Fechamento': 'avg',
              'Volume_Negociado': 'avg',
              'Valor_Mercado_Empresa': 'avg'                                   
              }).show(truncate=False)
)

+------------------+---------------------+-----------------+----------------+-----------------+--------------------------+
|avg(Min)          |avg(Volume_Negociado)|avg(Abertura)    |avg(Max)        |avg(Fechamento)  |avg(Valor_Mercado_Empresa)|
+------------------+---------------------+-----------------+----------------+-----------------+--------------------------+
|175.84742154999952|1.522296926915117E8  |183.2379846402586|271.410636244737|183.3984840688474|1.137847612179638E9       |
+------------------+---------------------+-----------------+----------------+-----------------+--------------------------+



In [87]:
##  FUNÇÂO DE AGREGAÇÂO - AGG

#Somátório Volume_Negociado e Valor_Mercado_Empresa
tot_gastos = (df_spark.agg({'Volume_Negociado': 'sum',
                          'Valor_Mercado_Empresa': 'sum'                        
                          }).show()
)

+---------------------+--------------------------+
|sum(Volume_Negociado)|sum(Valor_Mercado_Empresa)|
+---------------------+--------------------------+
|  1.90283462351765E14|      1.422282206881855E15|
+---------------------+--------------------------+



#### 7.7-SPARK - TRATAMENTO DO DATAFRAME

##### TRATAMENTO DE DADOS - EXCLUSAO LINHAS DUPLICADAS (DROPDUPLICATES)

In [88]:
#Verif qtde de linhas duplicadas
total = (df_spark.count()) - (df_spark.dropDuplicates().count())
print(total)



0


                                                                                

In [None]:
#Remover Linhas Duplicadas
df_spark.dropDuplicates()

##### TRATAMENTO DE DADOS, ANALISE LINHAS COM DATA NULL (FILTER, SELECT, ORDER BY, COUNT, ISNULL)

In [91]:
#Identifica o início de possíveis datas nulas para posterior dropagem
df_spark.select(F.col('Dt_Negociacao'), F.col('Cod_Empresa'), F.col('Empresa'), F.col('Abertura'), \
                F.col('Fechamento'), F.col('Volume_Negociado'), F.col('Valor_Mercado_Empresa'), F.col('Ano')) \
                .orderBy(F.col('Dt_Negociacao'), F.col('Empresa')).show(5)



+-------------+-----------+--------+--------+----------+----------------+---------------------+----+
|Dt_Negociacao|Cod_Empresa| Empresa|Abertura|Fechamento|Volume_Negociado|Valor_Mercado_Empresa| Ano|
+-------------+-----------+--------+--------+----------+----------------+---------------------+----+
|   2013-12-27|        XRP|    null| 0.02443|   0.02708|        148422.0|         2.11674064E8|2013|
|   2013-12-27|        ANC|Anoncoin|    5.21|      4.81|         16203.0|            2975246.0|2013|
|   2013-12-27|        BTC| Bitcoin|  763.28|    735.07|       4.68627E7|          8.9553951E9|2013|
|   2013-12-27|        DMD| Diamond|    2.29|      1.75|          8360.0|             341344.0|2013|
|   2013-12-27|       DOGE|Dogecoin| 6.03E-4|  5.219E-4|        477422.0|            8016604.0|2013|
+-------------+-----------+--------+--------+----------+----------------+---------------------+----+
only showing top 5 rows



                                                                                

In [92]:
#Conta registros NULL nas colunas especificadas
qtde = df_spark.filter(F.col('Dt_Negociacao').isNull() | F.col('Abertura').isNull()).count()

print(qtde)

0


#####  TRATAMENTO DE DADOS:____DROPAGEM:  Elimina as linhas NULL das colunas Abaixo

In [94]:
# Elimina as linhas NULL das colunas Destacadas Abaixo, criando novo DF NotNull
df_sparkDrop2 = df_spark.where( F.col('Abertura').isNotNull() & \
                                    F.col('Fechamento').isNotNull() & \
                                    F.col('Volume_Negociado').isNotNull() & \
                                    F.col('Valor_Mercado_Empresa').isNotNull() & \
                                    F.col('Dt_Negociacao').isNotNull() )

#####  TRATAMENTO DE DADOS:___DROPAGEM: Eliminas as linhas com valores 0 (zero) e Negativos

In [None]:
# Elimina as linhas NULL das colunas Destacadas Abaixo
df_sparkDrop3 = df_sparkDrop2.where( (F.col('Abertura') > 0.0)  &  \
                (F.col('Fechamento') > 0.0)  &  (F.col('Volume_Negociado') > 0.0) )

In [None]:
# Elimina as linhas com valores negativos e zerados na coluna Min
df_sparkDrop4 = df_sparkDrop3.where( (F.col('Min') > -0.0000000001)  &  (F.col('Min') != 0.0) ) 

In [None]:
# Elimina as linhas com valores Negativos na coluna Valor_Mercado_Empresa
df_SparkValorMercadoComZero = df_sparkDrop4.where( F.col('Valor_Mercado_Empresa') > -0.00000001 ) 

In [None]:
# Elimina as linhas com valores 0.0 na coluna Valor_Mercado_Empresa
df_SparkValorMercadoMaiorZero = df_SparkValorMercadoComZero.where( F.col('Valor_Mercado_Empresa') != 0.0 ) 

#### 7.8 - SPARK - INSIGHTS (FALTA*************)

In [None]:
""" 
  FALTA
"""


#### 7.9 - SPARK - UPLOAD / DOWNLOAD DE DATASET (BUCKET/GCP)

In [None]:
#Exporta base de dados (DataFrame SPARK/PARQUET) Tratada para o Bucket/GCP  (com Valor Mercado Maior Zero)
df_SparkValorMercadoMaiorZero.toPandas().to_csv(path_tratados&'/Base_Tratada_SparkValorMercadoMaiorZeroF.csv', \
                            index=False)

print(">>> Exportação do DF Spark para Bucket/GCP, concluído com sucesso.")

##### --- SPARK - DOWNLOAD BUCKET/GCP: SPARK PARQUET TRATADO.CSV

In [None]:
#Formando a Estrutura do DataFrame em pySpark
esquema = (
    StructType([                
        StructField('Cod_Empresa', StringType(), True),
        StructField('Empresa', StringType(), True),
        StructField('Dt_Negociacao', DateType(), False),
        StructField('Abertura', FloatType(), True),                
        StructField('Max', FloatType(), True),
        StructField('Min', FloatType(), True),
        StructField('Fechamento', FloatType(), True),
        StructField('Volume_Negociado', FloatType(), True),
        StructField('Valor_Mercado_Empresa', FloatType(), True) 
    ])
)

In [None]:
# Download do DataSet CSV do GCP/Bucket, e Cria DataFrame com StructType do pySpark  (dataset com valor de mercado maior que zero)
path2 = ptah_tratados&'/Tratado_SparkValorMercadoMaiorZero_2022_06_09_Aldreks.csv'
df_spark = (
    spark.read.format('csv')
              .option('header', True)
              .option('delimiter', ',')
              .option('inferschema', True)
              .load(path2, schema=esquema))

In [None]:
#Salva DF Spark em Parquet
df_spark.write.parquet(path_parquet) 

In [None]:
#Carrega arquivo parquet em DataFrame
df_SparkSql = spark.read.parquet(path_parquet)

In [95]:
df_SparkSql = df_spark

## 8.0 - SPARK_SQL

##### 8.1-SPARK_SQL - TRATAMENTO DO DATAFRAME

In [96]:
# Visualiza o início do DF
df_SparkSql.show(5)

+-----------+--------------+-------------+--------+-------+--------+----------+----------------+---------------------+----+
|Cod_Empresa|       Empresa|Dt_Negociacao|Abertura|    Max|     Min|Fechamento|Volume_Negociado|Valor_Mercado_Empresa| Ano|
+-----------+--------------+-------------+--------+-------+--------+----------+----------------+---------------------+----+
|        RSR|Reserve Rights|   2021-01-04| 0.03263|0.03521| 0.02686|   0.03066|    2.31849744E8|         2.86691712E8|2021|
|       RUFF|          Ruff|   2021-01-04|0.005884|0.00857|0.005709|  0.007571|       3069577.0|            7423676.0|2021|
|       RUNE|     THORChain|   2021-01-04|    1.48|   1.59|    1.31|      1.59|     4.1093944E7|         2.51942752E8|2021|
|       SAFE|          Safe|   2021-01-04|  0.2123| 0.2379|  0.1805|    0.2153|        102844.0|            4484890.0|2021|
|   SALTSALT|          null|   2021-01-04|  0.3795| 0.4356|  0.3708|    0.4356|        152177.0|          3.4262344E7|2021|
+-------

In [None]:
# SPARK_SQL (withColumn, F.year) - Insere a coluna ANO no DF
# df_SparkSql = df_SparkSql.withColumn("ano",  F.year(F.col('Dt_Negociacao')))

In [None]:
# SPARK_SQL (Drop) - Excluir Colunas
# df_SparkSql = df_SparkSql.drop('Column1', 'Column2')

In [None]:
# SPARK_SQL (withColumnRenamed) - Renomear Colunas
# df_SparkSql = df_SparkSql \
#     .withColumnRenamed('Cod_Negociacao', 'Cod_Empresa') \
#     .withColumnRenamed('Volume$_Negociado', 'Volume_Negociado')

##### 8.2-SPARK_SQL (USO DE VIEW, COUNT, MIN, MAX, GROUP BY, ORDER BY, WITHCOLUMN)

In [101]:
#  SPARK_SQL (withColumn) - Incluir Colunas em df temporário

#Incluir  Coluna:  Valor Total = Valor_Marcado + Volume_Negociado)
df_tmp2 = df_SparkSql.withColumn('Vol_Negoc + Vlr_Mercado', (F.col('Volume_Negociado') + F.col('Valor_Mercado_Empresa')) )

#Visualiza o DataFrame pySpark, temporário
df_tmp2.show(5)

+-----------+--------------+-------------+--------+-------+--------+----------+----------------+---------------------+----+-----------------------+
|Cod_Empresa|       Empresa|Dt_Negociacao|Abertura|    Max|     Min|Fechamento|Volume_Negociado|Valor_Mercado_Empresa| Ano|Vol_Negoc + Vlr_Mercado|
+-----------+--------------+-------------+--------+-------+--------+----------+----------------+---------------------+----+-----------------------+
|        RSR|Reserve Rights|   2021-01-04| 0.03263|0.03521| 0.02686|   0.03066|    2.31849744E8|         2.86691712E8|2021|            5.1854144E8|
|       RUFF|          Ruff|   2021-01-04|0.005884|0.00857|0.005709|  0.007571|       3069577.0|            7423676.0|2021|            1.0493253E7|
|       RUNE|     THORChain|   2021-01-04|    1.48|   1.59|    1.31|      1.59|     4.1093944E7|         2.51942752E8|2021|           2.93036704E8|
|       SAFE|          Safe|   2021-01-04|  0.2123| 0.2379|  0.1805|    0.2153|        102844.0|            4484

In [None]:
#1.0 Cria VIEW Temporária
df_SparkSql.createGlobalTempView("View_Analise")

In [103]:
#1.1 Consulta à Visão Criada, gerando informações: Qtde, min, max, de 2 colunas da VIEW
spark.sql("SELECT Empresa, count(Volume_Negociado),  min(Volume_Negociado), max(Volume_Negociado), \
           count(Valor_Mercado_Empresa), min(Valor_Mercado_Empresa), max(Valor_Mercado_Empresa) \
            FROM global_temp.View_Analise \
            GROUP BY Empresa \
            ORDER BY Empresa ").show(5)




+---------------+-----------------------+---------------------+---------------------+----------------------------+--------------------------+--------------------------+
|        Empresa|count(Volume_Negociado)|min(Volume_Negociado)|max(Volume_Negociado)|count(Valor_Mercado_Empresa)|min(Valor_Mercado_Empresa)|max(Valor_Mercado_Empresa)|
+---------------+-----------------------+---------------------+---------------------+----------------------------+--------------------------+--------------------------+
|           null|                 102836|                  1.0|         3.6955177E10|                      102836|                    1316.0|              1.3085347E11|
|          Chain|                   2087|                218.0|         1.21326822E9|                        2087|                 1565412.0|              2.38228432E8|
|           Coin|                   1683|                  3.0|            5574248.0|                        1683|                  135716.0|              

                                                                                

##### 8.3-SPARK_SQL (GROUPBY, AGG, ORDERBY, SUM, MEAN, MAX, ROUND)

In [105]:
# MOSTRAR ALGUNS ÍNDICES DOS DADOS:  Abertura, Fechamento, Min, Max, Volume_Negociado, Valor_Mercado_Empresa
df_SparkSql.groupBy(F.col('Empresa')).agg( round(F.mean('Abertura'),3), round(F.mean('Fechamento'),3), round(F.mean('Min'),3), round(F.mean('Max'),3), \
                                                                            F.max('Volume_Negociado'), \
                                                                             F.max('Valor_Mercado_Empresa'), \
                                                                             ).orderBy(F.col('Empresa')).show(10)



+---------------+-----------------------+-------------------------+------------------+------------------+---------------------+--------------------------+
|        Empresa|round(avg(Abertura), 3)|round(avg(Fechamento), 3)|round(avg(Min), 3)|round(avg(Max), 3)|max(Volume_Negociado)|max(Valor_Mercado_Empresa)|
+---------------+-----------------------+-------------------------+------------------+------------------+---------------------+--------------------------+
|           null|                244.255|                   244.56|           228.029|           269.795|         3.6955177E10|              1.3085347E11|
|          Chain|                  0.023|                    0.023|             0.021|             0.025|         1.21326822E9|              2.38228432E8|
|           Coin|                  0.131|                    0.131|             0.119|             0.143|            5574248.0|               3.2283772E7|
|       Datalink|                  0.443|                    0.443|   

                                                                                

##### 8.4-SPARK_SQL (FUNCTIONS, WINDOW, BETWEEN)

In [106]:
# Criando uma coluna com o intervalo em ANOS, até a data corrente

#MONTHS_BETWEEN() - RETORNA A DIFERENÇA ENTRE DUAS DATAS EM MESES
#F.bround() arredonda os numeros decimais

(df_SparkSql.select( F.col('Dt_Negociacao'), F.current_date(), 
            (F.bround(F.months_between(F.current_date(), F.col('Dt_Negociacao'))/12))
             .alias('Período_em_Anos')).show(10))


+-------------+--------------+---------------+
|Dt_Negociacao|current_date()|Período_em_Anos|
+-------------+--------------+---------------+
|   2021-01-04|    2022-06-14|            1.0|
|   2021-01-04|    2022-06-14|            1.0|
|   2021-01-04|    2022-06-14|            1.0|
|   2021-01-04|    2022-06-14|            1.0|
|   2021-01-04|    2022-06-14|            1.0|
|   2021-01-04|    2022-06-14|            1.0|
|   2021-01-04|    2022-06-14|            1.0|
|   2021-01-04|    2022-06-14|            1.0|
|   2021-01-04|    2022-06-14|            1.0|
|   2021-01-04|    2022-06-14|            1.0|
+-------------+--------------+---------------+
only showing top 10 rows



* criação de WINDOW.partitionBy

In [107]:
#Cria DF temporário para visualização de colunas específicas na Function Window Spark
df_tmp = df_SparkSql.select("Cod_Empresa", "Valor_Mercado_Empresa")

#prepara a janela window para visualização 
w0 = Window.partitionBy(F.col('Valor_Mercado_Empresa')).orderBy(F.col('Cod_Empresa'))

In [108]:
#ROW_NUMBER (primeira ordem) - Apresenta classificação por  Valor de Mercado para cada Empresa  
df_tmp.withColumn('row_number', F.row_number().over(w0)).show(10)



+-----------+---------------------+----------+
|Cod_Empresa|Valor_Mercado_Empresa|row_number|
+-----------+---------------------+----------+
|       BOLI|               2098.0|         1|
|     STREAM|               2098.0|         2|
|        GDR|               2679.0|         1|
|        CRW|               3238.0|         1|
|       BOLI|               3675.0|         1|
|       BOLI|               3675.0|         2|
|        MUE|               4393.0|         1|
|      RAGNA|               4393.0|         2|
|      MEDIC|               4446.0|         1|
|        PNY|               4446.0|         2|
+-----------+---------------------+----------+
only showing top 10 rows



                                                                                

In [None]:
#RANK (segunda ordem) - Apresenta classificação por Valor de Mercado para cada Empresa 
df_tmp.withColumn('rank', F.rank().over(w0)).show(35) #Visualiar com 50 linhas para melhor analises

##### 8.5-SPARK_SQL (VIEWS, SQL, SELECT, WHERE, GROUP BY, ORDER BY, DISTINCT, COUNT)

In [110]:
## Criando uma VIEW do DataSet Tratado para processamento de análises mais rápido
df = (spark
      .read
      .format("csv")
      .option("header", "true")
      .option("inferschema", "true")
      .option("delimiter", ",")
      .load(path_tratados)
      .createOrReplaceTempView("VIEW_Spark_Tratada"))

                                                                                

In [123]:
# VIEW - Filtra registros por período de Data_Negociação e ordena por Data + Cod_Empresa
spark.sql('''SELECT Ano, Cod_Empresa, Volume_Negociado, Valor_Mercado_Empresa
          FROM VIEW_Spark_Tratada
          WHERE Dt_Negociacao >= "2020-01-01" AND Dt_Negociacao <= "2020-01-31"
          ORDER BY Dt_Negociacao ASC, Cod_Empresa DESC''').show(5)



+----+-----------+----------------+---------------------+
| Ano|Cod_Empresa|Volume_Negociado|Valor_Mercado_Empresa|
+----+-----------+----------------+---------------------+
|2020|        ZSC|          5457.0|             282529.0|
|2020|        ZRX|     1.0396732E7|         1.10582024E8|
|2020|        ZPT|            30.0|             599404.0|
|2020|        ZNT|         12066.0|              14756.0|
|2020|        ZIL|       4366624.0|          4.4445668E7|
+----+-----------+----------------+---------------------+
only showing top 5 rows



                                                                                

In [121]:
# VIEW/DISTINCT - Agrupa por Ano e Empresa, apresentando o total de Volume_Negociado por ano, por empresa
spark.sql('''SELECT DISTINCT Ano, Cod_Empresa, sum(Volume_Negociado) AS Total_Volume_Negociado
    FROM VIEW_Spark_Tratada
    GROUP BY Ano, Cod_Empresa
    ORDER BY Ano, Cod_Empresa, Total_Volume_Negociado DESC
''').show(10)

                                                                                

+----+-----------+----------------------+
| Ano|Cod_Empresa|Total_Volume_Negociado|
+----+-----------+----------------------+
|2013|        ANC|               61994.0|
|2013|        BTC|            1.399848E8|
|2013|        DMD|               40793.0|
|2013|       DOGE|             1730037.0|
|2013|        LTC|            7.338657E7|
|2013|        TRC|              103521.0|
|2013|        XRP|              484853.0|
|2014|        ANC|             4247251.0|
|2014|        BCN|              915772.0|
|2014|        BTC|          9.15918112E9|
+----+-----------+----------------------+
only showing top 10 rows



In [None]:
# VIEW/DISTINCT - Agrupa por Ano, apresentando o total de empresas por ano
spark.sql('''SELECT DISTINCT Ano COUNT(Empresa) AS QTDE_EMPRESAS_POR_ANO
    FROM VIEW_Spark_Tratada
    GROUP BY Ano, Empresa
    ORDER BY Ano ASC, QTDE_EMPRESAS_POR_ANO;
''').show()

In [124]:
# VIEW/MIN/MAX - Verifica a menor e maior data do DataFrame
spark.sql('''
    SELECT MIN(Dt_Negociacao) AS Data_Inicial, MAX(Dt_Negociacao) AS Data_Final
    FROM VIEW_Spark_Tratada
''').show()



+-------------------+-------------------+
|       Data_Inicial|         Data_Final|
+-------------------+-------------------+
|2013-12-27 00:00:00|2022-05-01 00:00:00|
+-------------------+-------------------+



                                                                                

##### 8.6-SPARK_SQL - ENCERRA AMBIENTE SPARK

In [None]:
#Encerra a sessão Spark.
spark.stop()

## 9.0 - AMBIENTE MONGO_DB ATLAS - NoSQL

#### 9.1-FUNÇÕES CONECÇÃO MONGODB

In [None]:
#Carrega biblioteca MongoDb
from pymongo import MongoClient

#Classe para tratamento das transações no banco MongoDB
class Conector_mongoDB():
    def __init__(self, database = "Proj_Final", collection = "Criptomoedas"):  #mongodb+srv://ativ20:ativ20@cluster0.4kajc.mongodb.net/cluster0.ativ20
        try:
            self.cliente = MongoClient("mongodb+srv://soulcode:a1b2c3@cluster-proj-final.uj7gz.mongodb.net/db_criptomoeda.Criptomoeda_SparkSQL_Tratada") 
            self.database =  self.cliente[database]
            self.collection =self.database[collection]
        except Exception as e:
            print(str(e))
        
    def set_database(self,database):
        '''Escolhe o database'''
        try:
            self.database = database
        except Exception as e:
            print(str(e))
        
    def set_collection(self, collection):
        '''Escolhe a coleção'''
        try:
            self.collection = collection  
        except Exception as e:
            print(str(e))
                
    def get_database(self):
        '''Retorna o database'''
        try:
            return self.database
        except Exception as e:
            print(str(e))
            
    def get_collection(self):
        '''Retorna a coleção'''
        try:    
            return self.collection
        except Exception as e:
            print(str(e))
        

    # Métodos da classe:
    def insert(self,dados):
        '''Insere dados em um banco mongoDB'''
        try:
            self.collection.insert_many(dados)
        except Exception as e:
            print(str(e))
        
    def find(self):
        '''Busca dados em um banco mongoDB'''
        try:
            lista_itens = []
            itens_db = self.collection.find()
            for i in itens_db:
                lista_itens.append(i)
            return lista_itens
        except Exception as e:
            print(str(e))
                
    def delete_one(self):
        '''Deleta um dado de um banco mongoDB'''
        try:
            coluna = input("Você deseja excluir por qual dado? ")
            
            valor = input("Qual o valor desse dado do item que você deseja excluir? ")
            
            filter = {coluna: valor}
                
            self.collection.delete_one(filter)
        except Exception as e:
            print(str(e))
                
    def delete_many(self):
        '''Deleta dados de um banco mongoDB'''
        try:
            coluna = input("Você deseja excluir por qual dado? ")
            
            valor = input("Qual o valor desse dado dos itens que você deseja excluir? ")
            
            filter = {coluna: valor}
                
            self.collection.delete_many(filter)
        except Exception as e:
            print(str(e))
                
    
    def update_one(self):
        '''Atualiza um dado de um banco mongoDB'''
        try:    
            coluna_escolhida = input("Digite a coluna que você deseja realizar uma alteração: ")
            old_value = input("Digite o valor antigo desse item nessa coluna: ")
            new_value= input("Digite o novo valor para esse item: ")
            
            
            filter = {coluna_escolhida: old_value}
            newvalues = {"$set": {coluna_escolhida: new_value}}
            
            self.collection.update_one(filter,newvalues)
        except Exception as e:
            print(str(e))
                    

#### 9.2-EXPORTAR BASE TRATADA (PANDAS) PARA MONGO_DB ATLAS (site)

In [None]:
#Cria Conexão com o Servidor MongoDb_Atlas
print('Conecção Servidor MongoDb_Atlas.')
myurl = "mongodb+srv://soulcode:a1b2c3@cluster-proj-final.uj7gz.mongodb.net/db_criptomoeda.Criptomoeda_Pandas_Tratada"
client = MongoClient(myurl)

#STATUS DO SERVIDOR CLIENT
print('Status do servidor Client do MongoDb_Atlas.')
print(client.stats )

#Conectando com o Banco de Dados
print('Conecta ao banco de dados.')
db = client.db_criptomoeda 

#Converte de DF para Dicionário
print('Converte DF para Dict.')
data_dict = df_pd.to_dict(orient='records') 

#Cria coleção e insere o dicionário (json) no MongoDB
print('Cria coleção no banco MongoDB.')
db.Criptomoeda_Pandas_Tratada.insert_many(data_dict) 

print('\n>>> Dados inseridos com sucesso no banco MongoDb Atlas.')

#### 9.3-EXPORTAR BASE TRATADA (SPARK_SQL) PARA MONGO_DB ATLAS (Cloud)

In [None]:
#Cria Conexão com o Servidor MongoDb_Atlas
print('Conecção Servidor MongoDb_Atlas.')
myurl = "mongodb+srv://soulcode:a1b2c3@cluster-proj-final.uj7gz.mongodb.net/db_criptomoeda.Criptomoeda_SparkSQL_Tratada"
client = MongoClient(myurl)

#STATUS DO SERVIDOR CLIENT
print('Status do servidor Client do MongoDb_Atlas.')
print(client.stats )

#Conectando com o Banco de Dados
print('Conecta ao banco de dados.')
db = client.db_criptomoeda 

# converte de pyspark df para pandas df
print('Converte DF de pySpark para DF Pandas.')
df_pd_tratado = df_SparkSql.toPandas()

#Converte Dt_Negociacao em string
df_pd_tratado['Dt_Negociacao'] = df_pd_tratado['Dt_Negociacao'].astype('datetime64[ns]')
df_pd_tratado['Dt_Negociacao'] = df_pd_tratado['Dt_Negociacao'].dt.strftime('%Y-%m-%d')

#Converte de DF para Dicionário
print('Converte DF para Dict.')
data_dict = df_pd_tratado.to_dict(orient='records')

#Insere coleção (json) no MongoDB
print('Cria coleção no banco MongoDB.')
db.Criptomoeda_SparkSQL_Tratada.insert_many(data_dict) 

print('\n>>> Dados inseridos com sucesso no banco MongoDB Atlas.')

#### 9.4-Lista todos os Bancos de Dados do MongoDB Atlas

In [None]:
# mostrando os databases do mongodb atuais
client.list_database_names()

In [None]:
#################  JSON

# json = pd.read_json('data_ocorrencias.json') # Carrega o arquivo json
# df = pd.DataFrame(json) # poderia trabalhar com json, mas preferi Cria um dataframe
# total_series = df.columns.tolist() # Lista de colunas