# Condigurando ambiente PySpark na máquina

In [None]:
# Instalações
!pip install pyspark -q

In [None]:
# Bibliotecas
from pyspark.sql import SparkSession

In [None]:
# Configurando ambiente
spark = SparkSession.builder.master("local[3]").getOrCreate() # "*" means the core quantity used
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

# Extraindo informações diárias dos últimos 30 dias(úteis), através de um script Python.

# Questão 1 - Extrair informações diárias de uma das ações abaixo dos últimos 30 dias(úteis):
1. ITUB - Itaú
2. BBD - Bradesco
3. MSFT - Microsoft
4. GOOG - Google
5. TSLA - Tesla

In [1]:
#Bibliotecas utilizadas. Como foi solicitado no desafio, foi armazenado o token de API em um arquivo chave_api.py
import requests
import pandas as pd
from chave_api import chave
from pyspark.sql.functions import *
from pyspark.sql import SparkSession



In [2]:
chave_api = chave()
key = chave_api.get_key()

In [3]:
#Listas criadas para parametrizar a captura de dados das respectivas ações: ITUB, BBD, MSFT, GOOG e TSLA
acoes = ['ITUB','BBD','MSFT','GOOG','TSLA']
lista_acoes = []

In [4]:
#Laço for para capturar o JSON das ações e concatenar em um dataframe
for acao in acoes:
    url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol={acao}&outputsize=compact&apikey={key}'
    r = requests.get(url)
    data = r.json()

    # acessa a chave "Time Series (Daily)" do dicionário
    time_series = data["Time Series (Daily)"]

    df = pd.DataFrame.from_dict(time_series, orient='index')
    df.index.name = 'date'
    df.columns = ['open', 'high', 'low', 'close', 'adjusted close', 'volume', 'dividend amount', 'split coefficient']
    df = df.astype(float)
    df['Symbol']= acao
    df = df.reset_index()    #Transforma o index em uma coluna
    df = df.head(30)         #Filtra os últimos 30 dias úteis
    lista_acoes.append(df)

df_acoes = pd.concat(lista_acoes, ignore_index=True)


# Código em PySpark para realizar os questionários 2, 3, 4 e 5 

In [5]:
#Criando uma sessão no PySpark
spark = SparkSession.builder \
.master("local[3]")\
.appName("myApp") \
.config("spark.jars", "\postgresql-42.5.0.jar")\
.getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [6]:
#Passando o dataframe em pandas para um dataframe PySpark
pysp_df = spark.createDataFrame(df_acoes)
pysp_df.show()

+----------+-----+------+------+-----+----------------+-----------+---------------+-----------------+------+
|      date| open|  high|   low|close|  adjusted close|     volume|dividend amount|split coefficient|Symbol|
+----------+-----+------+------+-----+----------------+-----------+---------------+-----------------+------+
|2023-04-28| 5.09| 5.165|  5.06| 5.15|            5.15|1.7423588E7|            0.0|              1.0|  ITUB|
|2023-04-27| 5.08|5.1475|  5.05| 5.14|            5.14|1.7235927E7|            0.0|              1.0|  ITUB|
|2023-04-26| 5.02|5.0582|  4.97| 4.99|            4.99|1.2525425E7|            0.0|              1.0|  ITUB|
|2023-04-25|  5.0|  5.05|  4.96| 5.02|            5.02|2.1134543E7|            0.0|              1.0|  ITUB|
|2023-04-24| 4.99| 5.055|  4.92|  5.0|             5.0|1.6361481E7|            0.0|              1.0|  ITUB|
|2023-04-21| 5.02|  5.03|  4.96| 5.03|            5.03|  5596926.0|            0.0|              1.0|  ITUB|
|2023-04-20| 4.97| 

# Questão 2 - Valores da média, o desvio padrão, o valor mínimo, os quartis da distribuição e o valor máximo dos últimos 30 dias(úteis).

In [7]:
#Média, desvio padrão, valor mínimo, quartis e valor máximo
quest2 = pysp_df.groupBy("Symbol").agg(
    round(mean("close"),4).alias("media_acao"),
    round(stddev("close"),4).alias("desvio_padrao"),
    min("close").alias("valor_min"),
    percentile_approx("close", 0.25).alias("q1"),
    percentile_approx("close", 0.5).alias("q2"),
    percentile_approx("close", 0.75).alias("q3"),
    max("close").alias("valor_max"),
)

# Exibir o resultado
quest2.show()

+------+----------+-------------+---------+------+------+------+---------+
|Symbol|media_acao|desvio_padrao|valor_min|    q1|    q2|    q3|valor_max|
+------+----------+-------------+---------+------+------+------+---------+
|  ITUB|    4.8427|       0.2878|     4.33|  4.56|  4.83|  5.07|     5.26|
|   BBD|    2.6347|       0.1272|     2.38|  2.52|  2.63|  2.76|     2.83|
|  GOOG|   105.301|       2.1796|   101.32|104.22|105.12|106.42|   109.46|
|  MSFT|  284.8203|       8.3014|   272.23|279.43|284.34|288.45|   307.26|
|  TSLA|  182.4837|       13.217|   153.75|180.13|185.06|191.81|   207.46|
+------+----------+-------------+---------+------+------+------+---------+



# Questão 3 - Fazer uma ordenação dos dados e seleção dos 'n' maiores e menores volumes dos últimos 30 dias(úteis)

In [9]:
#Ordena o dataframe em ordem crescente com base no volume
quest3 = pysp_df.orderBy("volume")

#Selecionando os 2 maiores(df_maior) e menores(df_menor) volumes
n = 2
df_maior = quest3.orderBy(desc("volume")).limit(n)
df_menor = quest3.limit(n)

In [10]:
#Menores volumes comparando todas ações
df_menor.show(n)

+----------+----+----+-----+-----+--------------+---------+---------------+-----------------+------+
|      date|open|high|  low|close|adjusted close|   volume|dividend amount|split coefficient|Symbol|
+----------+----+----+-----+-----+--------------+---------+---------------+-----------------+------+
|2023-04-21|5.02|5.03| 4.96| 5.03|          5.03|5596926.0|            0.0|              1.0|  ITUB|
|2023-04-21|2.68|2.69|2.655| 2.69|          2.69|8210718.0|            0.0|              1.0|   BBD|
+----------+----+----+-----+-----+--------------+---------+---------------+-----------------+------+



In [11]:
#Maiores volumes comparando todas ações
df_maior.show(n)

+----------+-------+------+------+------+--------------+------------+---------------+-----------------+------+
|      date|   open|  high|   low| close|adjusted close|      volume|dividend amount|split coefficient|Symbol|
+----------+-------+------+------+------+--------------+------------+---------------+-----------------+------+
|2023-04-20|166.165| 169.7|160.56|162.99|        162.99|2.10970819E8|            0.0|              1.0|  TSLA|
|2023-03-31| 197.53|207.79| 197.2|207.46|        207.46|1.70222118E8|            0.0|              1.0|  TSLA|
+----------+-------+------+------+------+--------------+------------+---------------+-----------------+------+



# Questão 4 - Faça a soma dos volumes de ITUB e BBD dos últimos 30 dias(úteis)

In [54]:
#Filtra o dataframe para mostrar somente as ações ITUB e BBD
quest4 = pysp_df.filter(pysp_df.Symbol.isin('BBD','ITUB'))

#Realiza a soma total dos volumes de ITUB e BBD
soma_volumes = (quest4.select(sum('volume')).collect()[0][0])

print(f'Soma do volume BBD e ITUB: {soma_volumes}')
#Resultado da Soma do Volume:

Soma do volume BBD e ITUB: 1591330553.0


In [55]:
#Filtra o dataframe da ação ITUB
soma_itub = pysp_df.filter(pysp_df.Symbol.isin('ITUB'))
soma_volumes_itub = (soma_itub.select(sum('volume')).collect()[0][0])

print(f'Soma do volume ITUB: {soma_volumes_itub}')
#Resultado da soma do volume da ação ITUB

Soma do volume ITUB: 811756433.0


In [56]:
#Filtra o dataframe da ação BBD
soma_bbd = pysp_df.filter(pysp_df.Symbol.isin('BBD'))
soma_volumes_bbd = (soma_bbd.select(sum('volume')).collect()[0][0])

print(f'Soma do volume BBD: {soma_volumes_bbd}')
#Resultado da soma do volume da ação BBD

Soma do volume BBD: 779574120.0


# Questão 5 - Faça a exportação do arquivo .csv pela API, crie uma tabela e a importe o .csv das ações de TSLA - Tesla em um banco PostsgreSQL.

In [57]:
#Salva o CSV das ações da TSLA na pasta do arquivo
url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&symbol=TSLA&apikey={key}&datatype=csv'

response = requests.get(url)

with open(r'TSLA.csv', 'w') as file:
    file.write(response.text)


 Após isso, é criado dentro do banco de dados a tabela "tsla" com a seguinte query SQL
- CREATE TABLE tsla(
-    timestamp Timestamp, 
-    open float, 
-    high float, 
-    low float, 
-    close float, 
-    adjusted_close float, 
-    volume float, 
-    dividend_amount float, 
-    split_coefficient float) 

In [58]:
#Transformando o CSV exportado da API em um dataframe e realizando a carga de dados ao BD Postgre de destino.

df = spark.read.csv("TSLA.csv", header=True, inferSchema=True)

#Configurações de conexão
postgres_url = "jdbc:postgresql://192.168.1.12:5432/datarisk_desafio"
postgres_user = "admin"
postgres_password = "admin"
postgres_table = "tsla"

df.write \
    .format("jdbc") \
    .option("url", postgres_url) \
    .option("dbtable", postgres_table) \
    .option("user", postgres_user) \
    .option("password", postgres_password) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()
