# Processamento

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Stock Price Analysis").getOrCreate()

In [4]:
stocks = spark.read.csv("StockData", header=True)

In [5]:
stocks.show(5)

+------+----------+----------+-------+--------+--------+--------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|
+------+----------+----------+-------+--------+--------+--------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |
+------+----------+----------+-------+--------+--------+--------+
only showing top 5 rows



In [6]:
stocks.printSchema() # colunas e seus tipos de dados

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)



In [7]:
stocks.select("Ticker").show(3) # select coluna Ticker

+------+
|Ticker|
+------+
| BRK-B|
| BRK-B|
| BRK-B|
+------+
only showing top 3 rows



In [8]:
stocks.select(["Ticker", "Date", "Open"]).show(5) # select várias colunas do dataframe

+------+----------+--------+
|Ticker|      Date|    Open|
+------+----------+--------+
| BRK-B|05/31/2023|$321.12 |
| BRK-B|05/30/2023|$321.86 |
| BRK-B|05/26/2023|$320.44 |
| BRK-B|05/25/2023|$320.56 |
| BRK-B|05/24/2023|$322.71 |
+------+----------+--------+
only showing top 5 rows



In [9]:
# Filtragem: selecionar linhas contendo stock da Microsoft (MSFT) no mês selecionado
stocks.filter((stocks.Ticker == "MSFT") & (stocks.Date == "05/31/2023")).show(10)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
+------+----------+----------+--------+--------+--------+--------+



In [10]:
# selecionar linhas de dois tickers diferentes na data selecionada
# ñ funciona com & no lugar do | pois é como se quiséssemos que o valor fosse MSFT e V ao mesmo tempo,
# o que é impossível
stocks.filter(((stocks.Ticker == "MSFT") | (stocks.Ticker == "V")) & (stocks.Date == "05/31/2023")).show(10)

+------+----------+----------+--------+--------+--------+--------+
|Ticker|      Date|Close/Last|  Volume|    Open|    High|     Low|
+------+----------+----------+--------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 |45950550|$332.29 |$335.94 |$327.33 |
|     V|05/31/2023|  $221.03 |20460620|$219.96 |$221.53 |$216.14 |
+------+----------+----------+--------+--------+--------+--------+



In [11]:
# Ticker possui um dos 5 valores e a data selecionada
stocks.filter((stocks.Ticker.isin(["MSFT", "QQQ", "SPY", "V", "TSLA"])) & (stocks.Date == "05/31/2023")).show(10)

+------+----------+----------+---------+--------+--------+--------+
|Ticker|      Date|Close/Last|   Volume|    Open|    High|     Low|
+------+----------+----------+---------+--------+--------+--------+
|  MSFT|05/31/2023|  $328.39 | 45950550|$332.29 |$335.94 |$327.33 |
|  TSLA|05/31/2023|  $203.93 |150711700|$199.78 |$203.95 |$195.12 |
|     V|05/31/2023|  $221.03 | 20460620|$219.96 |$221.53 |$216.14 |
|   SPY|05/31/2023|    417.85|110811800|  418.28|  419.22|  416.22|
|   QQQ|05/31/2023|    347.99| 65105380|  348.37|   350.6|  346.51|
+------+----------+----------+---------+--------+--------+--------+



In [12]:
# Usando UDFs: User Defined Functions
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from datetime import datetime

# cria função date_parser que converte as datas de string pra date
# na udf: arg1= a função em si (nesse caso uma lambda), arg2= tipo de dado que ela deve retornar
date_parser = udf(lambda date : datetime.strptime(date, "%m/%d/%Y"), DateType())
# basicamente a função lambda nos devolve valores em datetime, mas como o retorno definido na udf()
# é DateType ela vai converter de datetime pra DateType. Assim, date_parser recebe strings e retorna DateType.

In [13]:
# aplicando a função
# a função lambda é aplicada em cada linha da coluna Date
stocks = stocks.withColumn("ParsedDate", date_parser(stocks.Date))
stocks.show(5)

+------+----------+----------+-------+--------+--------+--------+----------+
|Ticker|      Date|Close/Last| Volume|    Open|    High|     Low|ParsedDate|
+------+----------+----------+-------+--------+--------+--------+----------+
| BRK-B|05/31/2023|  $321.08 |6175417|$321.12 |$322.41 |$319.39 |2023-05-31|
| BRK-B|05/30/2023|  $322.19 |3232461|$321.86 |$322.47 |$319.00 |2023-05-30|
| BRK-B|05/26/2023|  $320.60 |3229873|$320.44 |$322.63 |$319.67 |2023-05-26|
| BRK-B|05/25/2023|  $319.02 |4251935|$320.56 |$320.56 |$317.71 |2023-05-25|
| BRK-B|05/24/2023|  $320.20 |3075393|$322.71 |$323.00 |$319.56 |2023-05-24|
+------+----------+----------+-------+--------+--------+--------+----------+
only showing top 5 rows



In [14]:
stocks.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- ParsedDate: date (nullable = true)



In [15]:
# Função pra remover cifrão $
def num_parser(value):
    if isinstance(value, str):
        return float(value.strip("$"))
    elif isinstance(value, int) or isinstance(value, float):
        return value
    else:
        return None

# teste
print(num_parser("$456.89"))

456.89


In [16]:
from pyspark.sql.types import FloatType

# é possível transformar em udf uma função puramente pythônica
number_parser = udf(num_parser, FloatType())

stocks = (stocks.withColumn("Open", number_parser(stocks.Open))
                .withColumn("Close", number_parser(stocks["Close/Last"]))
                .withColumn("High", number_parser(stocks.High))
                .withColumn("Low", number_parser(stocks.Low)))

# Close/Last foi selecionada de forma diferente pois a sintaxe ñ deixa dar stocks.Close/Last

stocks.show(10)

+------+----------+----------+-------+------+------+------+----------+------+
|Ticker|      Date|Close/Last| Volume|  Open|  High|   Low|ParsedDate| Close|
+------+----------+----------+-------+------+------+------+----------+------+
| BRK-B|05/31/2023|  $321.08 |6175417|321.12|322.41|319.39|2023-05-31|321.08|
| BRK-B|05/30/2023|  $322.19 |3232461|321.86|322.47| 319.0|2023-05-30|322.19|
| BRK-B|05/26/2023|  $320.60 |3229873|320.44|322.63|319.67|2023-05-26| 320.6|
| BRK-B|05/25/2023|  $319.02 |4251935|320.56|320.56|317.71|2023-05-25|319.02|
| BRK-B|05/24/2023|  $320.20 |3075393|322.71| 323.0|319.56|2023-05-24| 320.2|
| BRK-B|05/23/2023|  $323.11 |4031342|328.19|329.27|322.97|2023-05-23|323.11|
| BRK-B|05/22/2023|  $329.13 |2763422|330.75|331.49|328.35|2023-05-22|329.13|
| BRK-B|05/19/2023|  $330.39 |4323538| 331.0|333.94|329.12|2023-05-19|330.39|
| BRK-B|05/18/2023|  $329.76 |2808329|326.87|329.98|325.85|2023-05-18|329.76|
| BRK-B|05/17/2023|  $327.39 |3047626|325.02|328.26|324.82|2023-

In [17]:
stocks.printSchema()

from pyspark.sql.types import IntegerType

# UDF pra converter Volume pra inteiro
integer_parser = udf(lambda num : int(num), IntegerType())

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [18]:
# convertendo pra int
stocks = stocks.withColumn("Volume", integer_parser(stocks.Volume))
stocks.printSchema(5)

root
 |-- Ticker: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Close/Last: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- ParsedDate: date (nullable = true)
 |-- Close: float (nullable = true)



In [19]:
# removendo colunas Date e Close/Last
stocks = stocks.select(["Ticker", "ParsedDate", "Close", "Volume", "Open", "High", "Low"])
stocks.show(5)

+------+----------+------+-------+------+------+------+
|Ticker|ParsedDate| Close| Volume|  Open|  High|   Low|
+------+----------+------+-------+------+------+------+
| BRK-B|2023-05-31|321.08|6175417|321.12|322.41|319.39|
| BRK-B|2023-05-30|322.19|3232461|321.86|322.47| 319.0|
| BRK-B|2023-05-26| 320.6|3229873|320.44|322.63|319.67|
| BRK-B|2023-05-25|319.02|4251935|320.56|320.56|317.71|
| BRK-B|2023-05-24| 320.2|3075393|322.71| 323.0|319.56|
+------+----------+------+-------+------+------+------+
only showing top 5 rows



In [20]:
# Estatísticas básicas sobre os dados
stocks.describe(["Close", "Volume", "Open", "High", "Low"]).show()

+-------+------------------+--------------------+------------------+------------------+------------------+
|summary|             Close|              Volume|              Open|              High|               Low|
+-------+------------------+--------------------+------------------+------------------+------------------+
|  count|             15108|               15108|             15108|             15108|             15108|
|   mean| 180.1256089860054|5.1868408793685466E7|180.09656566181036| 182.1253348687101| 177.9982781513109|
| stddev|101.14891782168563| 5.496484129953464E7|101.16125813324399|101.96625521621753|100.26590135955216|
|    min|             11.93|              961133|             12.07|             12.45|              11.8|
|    max|            477.71|           914080943|            479.22|            479.98|            476.06|
+-------+------------------+--------------------+------------------+------------------+------------------+



# Análise

#### Estrutura dos Dados

- *Ticker:* abreviação usada na bolsa para representa uma empresa
- *Date:* data de registro dos dados, cada dia é um dia de negociação na bolsa
- *Close:* último preço da ação no final do dia de negociação (é o valor que vemos falar quando dizem que "A ação da empresa X fechou hoje em tanto")
- *Volume:* quantidade de ações que foram compradas e vendidas no dia
- *Open:* primeiro preço da ação na abertura do mercado daquele dia
- *High:* maior preço atingido pela ação no dia
- *Low:* menor preço atingido pela ação no dia

##### Encontrar o maior preço de ação de cada empresa na abertura de mercado

In [21]:
# usando métodos de dataframes
stocks.groupBy("Ticker").max("Open").show(15)

+------+---------+
|Ticker|max(Open)|
+------+---------+
| BRK-B|   361.39|
|  MSFT|   344.62|
|  META|   381.68|
|  TSLA|   411.47|
|  AAPL|   182.63|
|  AMZN|    187.2|
| GOOGL|   151.25|
|  NVDA|   405.95|
|     V|   250.05|
|   TSM|   141.61|
|   SPY|   479.22|
|   QQQ|   405.57|
+------+---------+



In [22]:
# renomeando a coluna
stocks.groupBy("Ticker").max("Open").withColumnRenamed("max(Open)", "MaxStockPrice").show(15)

+------+-------------+
|Ticker|MaxStockPrice|
+------+-------------+
| BRK-B|       361.39|
|  MSFT|       344.62|
|  META|       381.68|
|  TSLA|       411.47|
|  AAPL|       182.63|
|  AMZN|        187.2|
| GOOGL|       151.25|
|  NVDA|       405.95|
|     V|       250.05|
|   TSM|       141.61|
|   SPY|       479.22|
|   QQQ|       405.57|
+------+-------------+



In [23]:
import pyspark.sql.functions as func

# outra forma de calcular usando agg (agregação)
stocks.groupBy("Ticker").agg(func.max("Open").alias("MaxStockPrice")).show(15)

# a diferença entre usar ou não agg é que se for preciso usar múltiplas agregações é possível fazer
# isso com agg sem que o códgio fique extremamente verboso, não tem impacto significativo no processamento

+------+-------------+
|Ticker|MaxStockPrice|
+------+-------------+
| BRK-B|       361.39|
|  MSFT|       344.62|
|  META|       381.68|
|  TSLA|       411.47|
|  AAPL|       182.63|
|  AMZN|        187.2|
| GOOGL|       151.25|
|  NVDA|       405.95|
|     V|       250.05|
|   TSM|       141.61|
|   SPY|       479.22|
|   QQQ|       405.57|
+------+-------------+



##### Encontrar o volume total das ações de cada empresa junto com seu valor max

In [24]:
stocks.groupBy("Ticker").agg(
                            func.sum("Volume").alias("TotalVolume"), 
                            func.max("Open").alias("MaxStockPrice")
                            ).show()

+------+------------+-------------+
|Ticker| TotalVolume|MaxStockPrice|
+------+------------+-------------+
| BRK-B|  5862401321|       361.39|
|  MSFT| 37976660472|       344.62|
|  META| 30148848043|       381.68|
|  TSLA|171802975076|       411.47|
|  AAPL|139310061360|       182.63|
|  AMZN|104503287430|        187.2|
| GOOGL| 43956560981|       151.25|
|  NVDA| 58787218324|       405.95|
|     V| 10410997871|       250.05|
|   TSM| 12506470104|       141.61|
|   SPY|107925285300|       479.22|
|   QQQ| 60437153773|       405.57|
+------+------------+-------------+



##### Extração de dia, mês e ano da coluna ParsedDate

In [39]:
stocks = (stocks.withColumn("Year", func.year(stocks.ParsedDate))
                .withColumn("Month", func.month(stocks.ParsedDate))
                .withColumn("Day", func.dayofmonth(stocks.ParsedDate))
                .withColumn("Year_Week", func.weekofyear(stocks.ParsedDate))
         )

stocks.show(10) # o spark faz lazey evaluation e só calcula de fato essas colunas novas
                # quando o show() é usado

+------+----------+------+-------+------+------+------+----+-----+---+---------+
|Ticker|ParsedDate| Close| Volume|  Open|  High|   Low|Year|Month|Day|Year_Week|
+------+----------+------+-------+------+------+------+----+-----+---+---------+
| BRK-B|2023-05-31|321.08|6175417|321.12|322.41|319.39|2023|    5| 31|       22|
| BRK-B|2023-05-30|322.19|3232461|321.86|322.47| 319.0|2023|    5| 30|       22|
| BRK-B|2023-05-26| 320.6|3229873|320.44|322.63|319.67|2023|    5| 26|       21|
| BRK-B|2023-05-25|319.02|4251935|320.56|320.56|317.71|2023|    5| 25|       21|
| BRK-B|2023-05-24| 320.2|3075393|322.71| 323.0|319.56|2023|    5| 24|       21|
| BRK-B|2023-05-23|323.11|4031342|328.19|329.27|322.97|2023|    5| 23|       21|
| BRK-B|2023-05-22|329.13|2763422|330.75|331.49|328.35|2023|    5| 22|       21|
| BRK-B|2023-05-19|330.39|4323538| 331.0|333.94|329.12|2023|    5| 19|       20|
| BRK-B|2023-05-18|329.76|2808329|326.87|329.98|325.85|2023|    5| 18|       20|
| BRK-B|2023-05-17|327.39|30

##### Valores max e min de ações de cada empresa por ano

In [26]:
stocks_yearly = stocks.groupBy(["Ticker", "Year"]).agg(func.max("Open").alias("YearlyMax"),
                                     func.min("Open").alias("YearlyMin")).orderBy("Year")

stocks_yearly.show(15)

+------+----+---------+---------+
|Ticker|Year|YearlyMax|YearlyMin|
+------+----+---------+---------+
| BRK-B|2018|    224.0|   185.43|
|  MSFT|2018|   115.42|    95.14|
|  META|2018|   215.72|    123.1|
|  TSLA|2018|     25.0|    17.02|
|  AAPL|2018|     57.7|    37.04|
|  AMZN|2018|   101.91|     67.3|
| GOOGL|2018|    64.46|    49.22|
|  NVDA|2018|    72.33|    31.62|
|     V|2018|   150.89|   122.08|
|   TSM|2018|     45.0|    35.33|
|   SPY|2018|   293.09|   235.97|
|   QQQ|2018|   186.82|   145.12|
|  META|2019|   208.67|   128.99|
|  AMZN|2019|   101.28|    73.26|
|  TSLA|2019|     29.0|    12.07|
+------+----+---------+---------+
only showing top 15 rows



##### Por mês

In [27]:
stocks_monthly = stocks.groupBy(["Ticker", "Year", "Month"]).agg(func.max("Open").alias("MonthlyMax"),
                                                                 func.min("Open").alias("MonthlyMin")).orderBy(["Year", "Month"])

stocks_monthly.show(15)

+------+----+-----+----------+----------+
|Ticker|Year|Month|MonthlyMax|MonthlyMin|
+------+----+-----+----------+----------+
|  AAPL|2018|    5|     46.81|     46.81|
|  AMZN|2018|    5|     81.15|     81.15|
|  META|2018|    5|    187.87|    187.87|
|  TSLA|2018|    5|     19.15|     19.15|
| BRK-B|2018|    5|    194.29|    194.29|
|  MSFT|2018|    5|     99.29|     99.29|
| GOOGL|2018|    5|      54.1|      54.1|
|  NVDA|2018|    5|     62.93|     62.93|
|     V|2018|    5|    130.65|    130.65|
|   TSM|2018|    5|     38.33|     38.33|
|   SPY|2018|    5|    272.15|    272.15|
|   QQQ|2018|    5|    170.13|    170.13|
|  TSLA|2018|    6|     24.34|     19.06|
|   TSM|2018|    6|     39.85|      35.5|
|  AAPL|2018|    6|     48.54|     45.75|
+------+----+-----+----------+----------+
only showing top 15 rows



##### Diferença entre max e min mensal

In [28]:
stocks_monthly.withColumn("Spread", stocks_monthly["MonthlyMax"] - stocks_monthly["MonthlyMin"]).show(20)

+------+----+-----+----------+----------+---------+
|Ticker|Year|Month|MonthlyMax|MonthlyMin|   Spread|
+------+----+-----+----------+----------+---------+
| BRK-B|2018|    5|    194.29|    194.29|      0.0|
|  MSFT|2018|    5|     99.29|     99.29|      0.0|
|  META|2018|    5|    187.87|    187.87|      0.0|
|  TSLA|2018|    5|     19.15|     19.15|      0.0|
|  AAPL|2018|    5|     46.81|     46.81|      0.0|
|  AMZN|2018|    5|     81.15|     81.15|      0.0|
| GOOGL|2018|    5|      54.1|      54.1|      0.0|
|  NVDA|2018|    5|     62.93|     62.93|      0.0|
|     V|2018|    5|    130.65|    130.65|      0.0|
|   TSM|2018|    5|     38.33|     38.33|      0.0|
|   SPY|2018|    5|    272.15|    272.15|      0.0|
|   QQQ|2018|    5|    170.13|    170.13|      0.0|
| BRK-B|2018|    6|     196.3|    185.43| 10.87001|
|  MSFT|2018|    6|    102.65|     97.38|5.2700043|
|  META|2018|    6|    202.76|    187.53|15.229996|
|  AMZN|2018|    6|      88.0|     81.85|6.1500015|
|  TSLA|2018

#### Joins

In [29]:
# Join do dataframe original com o stocks_yearly
# dropar também Year e Ticker de stocks_yearly
historic_stocks = stocks.join(stocks_yearly,
                              (stocks.Ticker == stocks_yearly.Ticker) & (stocks.Year == stocks_yearly.Year),
                              "inner").drop(stocks_yearly.Year, stocks_yearly.Ticker)

historic_stocks.show()

+----------+------+-------+------+------+------+-----+---+---------+------+----+---------+---------+
|ParsedDate| Close| Volume|  Open|  High|   Low|Month|Day|Year_Week|Ticker|Year|YearlyMax|YearlyMin|
+----------+------+-------+------+------+------+-----+---+---------+------+----+---------+---------+
|2023-05-31|321.08|6175417|321.12|322.41|319.39|    5| 31|       22| BRK-B|2023|    331.0|   294.68|
|2023-05-30|322.19|3232461|321.86|322.47| 319.0|    5| 30|       22| BRK-B|2023|    331.0|   294.68|
|2023-05-26| 320.6|3229873|320.44|322.63|319.67|    5| 26|       21| BRK-B|2023|    331.0|   294.68|
|2023-05-25|319.02|4251935|320.56|320.56|317.71|    5| 25|       21| BRK-B|2023|    331.0|   294.68|
|2023-05-24| 320.2|3075393|322.71| 323.0|319.56|    5| 24|       21| BRK-B|2023|    331.0|   294.68|
|2023-05-23|323.11|4031342|328.19|329.27|322.97|    5| 23|       21| BRK-B|2023|    331.0|   294.68|
|2023-05-22|329.13|2763422|330.75|331.49|328.35|    5| 22|       21| BRK-B|2023|    331.0| 

In [30]:
# Join com stocks_monthly
condition = [(historic_stocks.Ticker == stocks_monthly.Ticker) & (historic_stocks.Year == stocks_monthly.Year) & (historic_stocks.Month == stocks_monthly.Month)]

# a condição do join pode ser armazenada numa variável e usada no método join
historic_stocks = (historic_stocks.join(stocks_monthly, condition, "inner")
                   .drop(stocks_monthly.Month, stocks_monthly.Year, historic_stocks.Ticker))

historic_stocks.show()

+----------+------+-------+------+------+------+---+---------+----+---------+---------+------+-----+----------+----------+
|ParsedDate| Close| Volume|  Open|  High|   Low|Day|Year_Week|Year|YearlyMax|YearlyMin|Ticker|Month|MonthlyMax|MonthlyMin|
+----------+------+-------+------+------+------+---+---------+----+---------+---------+------+-----+----------+----------+
|2023-05-31|321.08|6175417|321.12|322.41|319.39| 31|       22|2023|    331.0|   294.68| BRK-B|    5|     331.0|    320.44|
|2023-05-30|322.19|3232461|321.86|322.47| 319.0| 30|       22|2023|    331.0|   294.68| BRK-B|    5|     331.0|    320.44|
|2023-05-26| 320.6|3229873|320.44|322.63|319.67| 26|       21|2023|    331.0|   294.68| BRK-B|    5|     331.0|    320.44|
|2023-05-25|319.02|4251935|320.56|320.56|317.71| 25|       21|2023|    331.0|   294.68| BRK-B|    5|     331.0|    320.44|
|2023-05-24| 320.2|3075393|322.71| 323.0|319.56| 24|       21|2023|    331.0|   294.68| BRK-B|    5|     331.0|    320.44|
|2023-05-23|323.

In [31]:
historic_stocks.columns

['ParsedDate',
 'Close',
 'Volume',
 'Open',
 'High',
 'Low',
 'Day',
 'Year_Week',
 'Year',
 'YearlyMax',
 'YearlyMin',
 'Ticker',
 'Month',
 'MonthlyMax',
 'MonthlyMin']

In [32]:
# pra ordenar as colunas
final_stocks = historic_stocks.select(['Ticker', 'Year', 'Month', 'Day', 'Volume', 'Open','Low', 'High',
                                       'Close', 'YearlyMax', 'YearlyMin', 'MonthlyMax', 'MonthlyMin'])

final_stocks.show()

+------+----+-----+---+-------+------+------+------+------+---------+---------+----------+----------+
|Ticker|Year|Month|Day| Volume|  Open|   Low|  High| Close|YearlyMax|YearlyMin|MonthlyMax|MonthlyMin|
+------+----+-----+---+-------+------+------+------+------+---------+---------+----------+----------+
| BRK-B|2023|    5| 31|6175417|321.12|319.39|322.41|321.08|    331.0|   294.68|     331.0|    320.44|
| BRK-B|2023|    5| 30|3232461|321.86| 319.0|322.47|322.19|    331.0|   294.68|     331.0|    320.44|
| BRK-B|2023|    5| 26|3229873|320.44|319.67|322.63| 320.6|    331.0|   294.68|     331.0|    320.44|
| BRK-B|2023|    5| 25|4251935|320.56|317.71|320.56|319.02|    331.0|   294.68|     331.0|    320.44|
| BRK-B|2023|    5| 24|3075393|322.71|319.56| 323.0| 320.2|    331.0|   294.68|     331.0|    320.44|
| BRK-B|2023|    5| 23|4031342|328.19|322.97|329.27|323.11|    331.0|   294.68|     331.0|    320.44|
| BRK-B|2023|    5| 22|2763422|330.75|328.35|331.49|329.13|    331.0|   294.68|   

#### Query com spark SQL

In [33]:
# só é possível usar SQL em tabelas, em dataframes não funciona
# pra isso é preciso criar uma temp view a partir do dataframe desejado
final_stocks.createOrReplaceTempView('stockData')

In [34]:
spark.sql("select * from stockData where Ticker = 'MSFT' and Year = '2023' ").show()

+------+----+-----+---+--------+------+------+------+------+---------+---------+----------+----------+
|Ticker|Year|Month|Day|  Volume|  Open|   Low|  High| Close|YearlyMax|YearlyMin|MonthlyMax|MonthlyMin|
+------+----+-----+---+--------+------+------+------+------+---------+---------+----------+----------+
|  MSFT|2023|    5| 31|45950550|332.29|327.33|335.94|328.39|   335.23|    223.0|    335.23|    305.72|
|  MSFT|2023|    5| 30|29503070|335.23|330.52|335.74|331.21|   335.23|    223.0|    335.23|    305.72|
|  MSFT|2023|    5| 26|36630630|324.02|323.88| 333.4|332.89|   335.23|    223.0|    335.23|    305.72|
|  MSFT|2023|    5| 25|43301740|323.24| 320.0| 326.9|325.92|   335.23|    223.0|    335.23|    305.72|
|  MSFT|2023|    5| 24|23384890|314.73|312.61| 316.5|313.85|   335.23|    223.0|    335.23|    305.72|
|  MSFT|2023|    5| 23|30797170|320.03|315.25|322.72|315.26|   335.23|    223.0|    335.23|    305.72|
|  MSFT|2023|    5| 22|24115660| 318.6|318.01|322.59|321.18|   335.23|   

#### Análises com Window Functions

In [41]:
# pra análise só com algumas colunas
snapshot = historic_stocks.select(['Ticker', 'ParsedDate', 'Open'])

In [42]:
snapshot.show()

+------+----------+------+
|Ticker|ParsedDate|  Open|
+------+----------+------+
| BRK-B|2023-05-31|321.12|
| BRK-B|2023-05-30|321.86|
| BRK-B|2023-05-26|320.44|
| BRK-B|2023-05-25|320.56|
| BRK-B|2023-05-24|322.71|
| BRK-B|2023-05-23|328.19|
| BRK-B|2023-05-22|330.75|
| BRK-B|2023-05-19| 331.0|
| BRK-B|2023-05-18|326.87|
| BRK-B|2023-05-17|325.02|
| BRK-B|2023-05-16|322.46|
| BRK-B|2023-05-15|322.89|
| BRK-B|2023-05-12|323.82|
| BRK-B|2023-05-11| 321.0|
| BRK-B|2023-05-10|326.08|
| BRK-B|2023-05-09|324.87|
| BRK-B|2023-05-08|328.26|
| BRK-B|2023-05-05|323.36|
| BRK-B|2023-05-04|323.44|
| BRK-B|2023-05-03|327.13|
+------+----------+------+
only showing top 20 rows



In [43]:
# Window Functions
from pyspark.sql.window import Window

# definimos as janelas de dados (particionamento)
# cada partição será ordenada de forma cerscente pela data
lag1Day = Window.partitionBy('Ticker').orderBy('ParsedDate')

In [44]:
# LAG DO DIA ANTERIOR

# aplicamos a window function LAG na coluna Open, LAG obtém o valor de 1 linha anterior da coluna Open
# e põe esse valor na coluna PreviousOpen, aplicando nas janelas de dados que definimos
snapshot.withColumn('PreviousOpen', func.lag("Open", 1).over(lag1Day)).show()

+------+----------+-----+------------+
|Ticker|ParsedDate| Open|PreviousOpen|
+------+----------+-----+------------+
|  AAPL|2018-05-31|46.81|        NULL|
|  AAPL|2018-06-01| 47.0|       46.81|
|  AAPL|2018-06-04|47.91|        47.0|
|  AAPL|2018-06-05|48.27|       47.91|
|  AAPL|2018-06-06|48.41|       48.27|
|  AAPL|2018-06-07|48.54|       48.41|
|  AAPL|2018-06-08|47.79|       48.54|
|  AAPL|2018-06-11|47.84|       47.79|
|  AAPL|2018-06-12|47.85|       47.84|
|  AAPL|2018-06-13|48.11|       47.85|
|  AAPL|2018-06-14|47.89|       48.11|
|  AAPL|2018-06-15|47.51|       47.89|
|  AAPL|2018-06-18|46.97|       47.51|
|  AAPL|2018-06-19|46.29|       46.97|
|  AAPL|2018-06-20|46.59|       46.29|
|  AAPL|2018-06-21|46.81|       46.59|
|  AAPL|2018-06-22|46.53|       46.81|
|  AAPL|2018-06-25|45.85|       46.53|
|  AAPL|2018-06-26|45.75|       45.85|
|  AAPL|2018-06-27|46.31|       45.75|
+------+----------+-----+------------+
only showing top 20 rows



In [47]:
# MÉDIA DAS LINHAS ANTERIORES

# rowsBetween pega o valor das 50 linhas anteriores (incluindo a atual) à linha atual
moving_average = Window.partitionBy('Ticker').orderBy('ParsedDate').rowsBetween(-50, 0)

# aplica a função da média nas 50 linhas anteriores & linha atual e coloca esse valor na nova coluna MA50
(snapshot.withColumn('MA50', func.avg('Open').over(moving_average))
        .withColumn('MA50', func.round('MA50', 2)).show()) # arredondar pra 2 casas decimais

+------+----------+-----+-----+
|Ticker|ParsedDate| Open| MA50|
+------+----------+-----+-----+
|  AAPL|2018-05-31|46.81|46.81|
|  AAPL|2018-06-01| 47.0|46.91|
|  AAPL|2018-06-04|47.91|47.24|
|  AAPL|2018-06-05|48.27| 47.5|
|  AAPL|2018-06-06|48.41|47.68|
|  AAPL|2018-06-07|48.54|47.82|
|  AAPL|2018-06-08|47.79|47.82|
|  AAPL|2018-06-11|47.84|47.82|
|  AAPL|2018-06-12|47.85|47.82|
|  AAPL|2018-06-13|48.11|47.85|
|  AAPL|2018-06-14|47.89|47.86|
|  AAPL|2018-06-15|47.51|47.83|
|  AAPL|2018-06-18|46.97|47.76|
|  AAPL|2018-06-19|46.29|47.66|
|  AAPL|2018-06-20|46.59|47.59|
|  AAPL|2018-06-21|46.81|47.54|
|  AAPL|2018-06-22|46.53|47.48|
|  AAPL|2018-06-25|45.85|47.39|
|  AAPL|2018-06-26|45.75| 47.3|
|  AAPL|2018-06-27|46.31|47.25|
+------+----------+-----+-----+
only showing top 20 rows



In [50]:
# MAIORES VALORES DE OPEN

# particiona pelo Ticker e ordena pelo Open decrescente
maximum_stock = Window.partitionBy('Ticker').orderBy(snapshot.Open.desc())

In [53]:
# aplica a função row_number nas janelas de dados, ou seja, como Open é decrescente os maiores valores
# vão estar nas primeiras linhas, e suas posições (do valor mais alto pro mais baixo) serão indicadas pelo
# row number para cada janela de dado (cada Ticker)
snapshot.withColumn('MaxOpen', func.row_number().over(maximum_stock)).show()

+------+----------+------+-------+
|Ticker|ParsedDate|  Open|MaxOpen|
+------+----------+------+-------+
|  AAPL|2022-01-04|182.63|      1|
|  AAPL|2021-12-13|181.12|      2|
|  AAPL|2021-12-28|180.16|      3|
|  AAPL|2022-01-05|179.61|      4|
|  AAPL|2021-12-30|179.47|      5|
|  AAPL|2021-12-29|179.33|      6|
|  AAPL|2021-12-16|179.28|      7|
|  AAPL|2022-03-30|178.55|      8|
|  AAPL|2021-12-31|178.09|      9|
|  AAPL|2022-03-31|177.84|     10|
|  AAPL|2022-01-03|177.83|     11|
|  AAPL|2022-04-05| 177.5|     12|
|  AAPL|2023-05-31|177.33|     13|
|  AAPL|2021-12-27|177.09|     14|
|  AAPL|2023-05-30|176.96|     15|
|  AAPL|2022-03-29|176.69|     16|
|  AAPL|2023-05-19|176.39|     17|
|  AAPL|2022-01-12|176.12|     18|
|  AAPL|2022-02-09|176.05|     19|
|  AAPL|2021-12-23|175.85|     20|
+------+----------+------+-------+
only showing top 20 rows



In [63]:
# mesma coisa mas pra exibir os 5 maiores valores
result = snapshot.withColumn('MaxOpen', func.row_number().over(maximum_stock)).filter('MaxOpen<=5')
result.show()

+------+----------+------+-------+
|Ticker|ParsedDate|  Open|MaxOpen|
+------+----------+------+-------+
|  AAPL|2022-01-04|182.63|      1|
|  AAPL|2021-12-13|181.12|      2|
|  AAPL|2021-12-28|180.16|      3|
|  AAPL|2022-01-05|179.61|      4|
|  AAPL|2021-12-30|179.47|      5|
|  AMZN|2021-07-12| 187.2|      1|
|  AMZN|2021-07-09|186.13|      2|
|  AMZN|2021-07-07|185.87|      3|
|  AMZN|2021-11-19|185.63|      4|
|  AMZN|2021-07-14|185.44|      5|
| BRK-B|2022-03-29|361.39|      1|
| BRK-B|2022-03-28|360.59|      2|
| BRK-B|2022-03-31| 359.0|      3|
| BRK-B|2022-03-30|354.66|      4|
| BRK-B|2022-03-25| 353.9|      5|
| GOOGL|2022-02-02|151.25|      1|
| GOOGL|2021-11-19|149.98|      2|
| GOOGL|2021-11-08|149.83|      3|
| GOOGL|2021-11-22|149.33|      4|
| GOOGL|2021-11-09|149.23|      5|
+------+----------+------+-------+
only showing top 20 rows



### Armazenamento

In [64]:
# CSV vs PARQUET vs AVRO

# CSV não é colunar nem tem schema definido, mas é passível de compressão e legível por humanos
# Avro é passível de compressão e tem schema definido, mas não é colunar e nem legível por humanos
# Parquet é colunar, passível de compressão e possui schema definido mas não é legível por humanos

# Também há outros formatos como Text, ORC e JSON  

# salvando o dataframe em PARQUET
(result.write.option('header', True)
             .partitionBy('Ticker', 'ParsedDate')
             .mode('overwrite')
             .parquet('result_parquet') #nome da pasta onde os arquivos .parquet serão salvos
)

In [65]:
# salvando o dataframe em CSV
(result.write.option('header', True)
             .partitionBy('Ticker')
             .mode('overwrite')
             .csv('result_csv')
)