O projeto proposto será nós trabalharmos com um dataframe do SP500, índice americano que representa ativos negociados na NYSE e na NASDAQ. Nesse dataframe, temos informações a respeito da cotação de abertura, máxima, mínima, volume, fechamento e ajuste do fechamento, do ativo.

Trabalharemos um modelo de machine learning por regressão linear, com o objetivo de prevermos o valor de fechamento do ativo, com base em alguns atributos supracitadas do dataframe.

O primeiro passo é instalar o pyspark em nossa VM e importá-lo.

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5f4621db46e734eb7bf8ef517b8affa99e45fc24ec23c5dd19774d6a11d82367
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
import pyspark

Do módulo pyspark SQL importamos o SparkSession para criar nossa sessão.

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Atribuímos a um objeto nossa sessão criada.

In [4]:
spark = SparkSession.builder.master('local').appName('sp500').getOrCreate()

Chamamos nosso dataframe pelo método read para visualizá-lo e contamos quantas linhas tem.

In [5]:
df = spark.read.csv('/content/SP500data.csv', header=True, inferSchema=True)
df.show(10, truncate=False)
df.count()

+---+----------+------------------+------------------+------------------+------------------+----------+------------------+------+
|_c0|Date      |High              |Low               |Open              |Close             |Volume    |Adj Close         |Symbol|
+---+----------+------------------+------------------+------------------+------------------+----------+------------------+------+
|0  |2010-01-04|30.64285659790039 |30.34000015258789 |30.489999771118164|30.572856903076172|1.234324E8|26.68132972717285 |AAPL  |
|1  |2010-01-04|34.099998474121094|33.400001525878906|33.599998474121094|34.0              |4.06793E7 |31.601511001586914|C     |
|2  |2010-01-04|32.709999084472656|32.23500061035156 |32.290000915527344|32.69499969482422 |5894200.0 |26.618066787719727|UNP   |
|3  |2010-01-04|59.189998626708984|57.5099983215332  |57.650001525878906|58.54999923706055 |7325600.0 |43.9952392578125  |CAT   |
|4  |2010-01-04|28.31999969482422 |27.665000915527344|28.31999969482422 |27.8700008392334 

34721

Checamos os tipos de dados de nosso dataframe.

In [6]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Symbol: string (nullable = true)



Aqui nós importamos os Vectors e o VectorAssembler, para posteriormente usar em nossa base de dados para criarmos um dataframe vetorizado.

In [7]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

Aqui nós utizamos o transformador Vector Assembler nos atributos que escolhemos para criar nossa coluna vetorizada. Através dos dados dessas colunas escolhidas, nosso modelo irá tentar prever o valor de fechamento do ativo, objetivo do nosso projeto.

In [8]:
assembler = VectorAssembler(inputCols=["Open", "High", "Low", "Volume"], outputCol="features")
print(assembler)

VectorAssembler_91f263e0964c


Abaixo nós aplicamos um transform no objeto assembler, para que ele crie nossa coluna features vetorizada.

In [9]:
df_novo = assembler.transform(df)
df_novo.show(15, truncate=False)

+---+----------+------------------+------------------+------------------+------------------+----------+------------------+------+--------------------------------------------------------------------+
|_c0|Date      |High              |Low               |Open              |Close             |Volume    |Adj Close         |Symbol|features                                                            |
+---+----------+------------------+------------------+------------------+------------------+----------+------------------+------+--------------------------------------------------------------------+
|0  |2010-01-04|30.64285659790039 |30.34000015258789 |30.489999771118164|30.572856903076172|1.234324E8|26.68132972717285 |AAPL  |[30.489999771118164,30.64285659790039,30.34000015258789,1.234324E8] |
|1  |2010-01-04|34.099998474121094|33.400001525878906|33.599998474121094|34.0              |4.06793E7 |31.601511001586914|C     |[33.599998474121094,34.099998474121094,33.400001525878906,4.06793E7]|
|2  |

Por convenção, se altera o nome da coluna que queremos prever seu valor, para label.

In [10]:
df_novo = df_novo.withColumnRenamed("Close", "label")
df_novo.show()

+---+----------+------------------+------------------+------------------+------------------+----------+------------------+------+--------------------+
|_c0|      Date|              High|               Low|              Open|             label|    Volume|         Adj Close|Symbol|            features|
+---+----------+------------------+------------------+------------------+------------------+----------+------------------+------+--------------------+
|  0|2010-01-04| 30.64285659790039| 30.34000015258789|30.489999771118164|30.572856903076172|1.234324E8| 26.68132972717285|  AAPL|[30.4899997711181...|
|  1|2010-01-04|34.099998474121094|33.400001525878906|33.599998474121094|              34.0| 4.06793E7|31.601511001586914|     C|[33.5999984741210...|
|  2|2010-01-04|32.709999084472656| 32.23500061035156|32.290000915527344| 32.69499969482422| 5894200.0|26.618066787719727|   UNP|[32.2900009155273...|
|  3|2010-01-04|59.189998626708984|  57.5099983215332|57.650001525878906| 58.54999923706055| 7

Para facilitar nossa visualização, vamos chamar apenas as colunas label e features, através do select.

In [11]:
df_novo = df_novo.select("label", "features")
df_novo.show(truncate=False)

+------------------+--------------------------------------------------------------------+
|label             |features                                                            |
+------------------+--------------------------------------------------------------------+
|30.572856903076172|[30.489999771118164,30.64285659790039,30.34000015258789,1.234324E8] |
|34.0              |[33.599998474121094,34.099998474121094,33.400001525878906,4.06793E7]|
|32.69499969482422 |[32.290000915527344,32.709999084472656,32.23500061035156,5894200.0] |
|58.54999923706055 |[57.650001525878906,59.189998626708984,57.5099983215332,7325600.0]  |
|27.8700008392334  |[28.31999969482422,28.31999969482422,27.665000915527344,4832000.0]  |
|31.530000686645508|[31.3799991607666,31.600000381469727,30.969999313354492,1.21995E7]  |
|41.42499923706055 |[41.244998931884766,41.439998626708984,40.93000030517578,4874200.0] |
|26.010000228881836|[26.200000762939453,26.610000610351562,25.889999389648438,1.03693E7]|
|8.4849996

Abaixo nós iremos dividir nosso dataframe na proporção 70/30, onde 70% será nossa base para treinamento do modelo e 30% para testá-lo. Após contaremos as linhas de cada df.

In [14]:
(trainingData, testeData) = df_novo.randomSplit([0.7, 0.3])
print(trainingData.count(), testeData.count())

24335 10386


In [13]:
trainingData.show(10, truncate=False)
testeData.show(10, truncate=False)

+-----------------+-----------------------------------------------------------------+
|label            |features                                                         |
+-----------------+-----------------------------------------------------------------+
|7.018571376800537|[7.2928571701049805,7.357142925262451,6.9314284324646,1.37914E7] |
|7.269999980926514|[7.29714298248291,7.484285831451416,7.214285850524902,9395400.0] |
|7.278571605682373|[7.24571418762207,7.407142639160156,7.2328572273254395,1.30312E7]|
|7.281428337097168|[7.142857074737549,7.311428546905518,6.992856979370117,2.33471E7]|
|7.314285755157471|[7.25,7.382857322692871,7.242856979370117,5302500.0]             |
|7.320000171661377|[7.265714168548584,7.492856979370117,7.221428394317627,9685200.0]|
|7.358571529388428|[7.652857303619385,7.657142639160156,7.258571624755859,2.37531E7]|
|7.485714435577393|[7.731428623199463,7.757143020629883,7.462857246398926,9955400.0]|
|7.604285717010498|[7.659999847412109,7.70428562164306

Aqui nós iremos importar da biblioteca, as técnicas de regressão linear.

In [15]:
from pyspark.ml.regression import LinearRegression, LinearRegressionTrainingSummary
from pyspark.ml.evaluation import RegressionEvaluator

Abaixo atribuímos ao objeto lr a técnica de Regressão Linear.

In [16]:
from pyspark.ml.regression import LinearRegression, LinearRegressionTrainingSummary
from pyspark.ml.evaluation import RegressionEvaluator

Abaixo atribuímos ao objeto lr a técnica de Regressão Linear.

In [17]:
lr = LinearRegression()

Abaixo nós iremos pegar nossa base de testes que dividimos anteriormente e treiná-la com nossa técnica de regressão linear, que atribuímos ao objeto lr acima.

In [18]:
lrModelo = lr.fit(trainingData)

Feito o treinameento acima, abaixo iremos verificar os parâmetros para avaliar nosso modelo. A técnica de Regressão Linear, nos traz os sequintes parâmetros:


- Coefficients
- Interceptor
- Mean Squared Error
- Root Mean Squared Error --> quanto menor, melhor.
- Root Squared --> quanto mais próximo de 1, melhor.

Esses parâmetros são de nossa base de treinamento.

In [19]:
resultado = lrModelo.summary
print("Coefficients: %s" % str(lrModelo.coefficients))
print("Intercept: %s" % str(lrModelo.intercept))
print(resultado.meanSquaredError)
print(resultado.rootMeanSquaredError)
print(resultado.r2)

Coefficients: [-0.5412340802981839,0.8478774027358718,0.6919976694586922,-3.0786189625971426e-10]
Intercept: 0.027450418605165294
0.23144304169797777
0.48108527487128283
0.9999274918978792


Testando nosso modelo usando nossa base de testes.

In [20]:
df_resultado = lrModelo.transform(testeData)
df_resultado.show(10, truncate=False)

+-----------------+------------------------------------------------------------------+------------------+
|label            |features                                                          |prediction        |
+-----------------+------------------------------------------------------------------+------------------+
|7.135714054107666|[6.960000038146973,7.1785712242126465,6.937142848968506,1.13435E7]|7.144003976791267 |
|7.281428337097168|[7.142857074737549,7.311428546905518,6.992856979370117,2.33471E7] |7.192540835801276 |
|7.284285545349121|[7.518571376800537,7.575714111328125,7.269999980926514,1.76855E7] |7.406798511108243 |
|7.314285755157471|[7.25,7.382857322692871,7.242856979370117,5302500.0]              |7.3736689402195035|
|7.614285945892334|[7.498571395874023,7.742856979370117,7.465714454650879,8180900.0] |7.697699907108041 |
|7.664999961853027|[7.630000114440918,7.715000152587891,7.554999828338623,3.6472E7]  |7.656022550364536 |
|7.710000038146973|[7.755000114440918,7.864999

Agora vamos avaliar se nosso modelo treinado com nossa base de testes, através da métrica RMSE, obteve bom desempenho.

Pedemos verificar que nosso modelo teve uma leve piora, quando testado com nossa base de treinamento, pois o valor encontrado utilizando a mesma métrica (RMSE) subiu um pouquinho.

In [22]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse= evaluator.evaluate(df_resultado)
print("Root mean Squared Error (RMSE) on teste data = %g" % rmse)

Root mean Squared Error (RMSE) on teste data = 0.509102


Agora iremos testar nosso modelo com dados desconhecidos.

Suponhamos que uma determinada ação, teve sua abertura com valor 20, uma máxima de 22, uma mínima de 19 e com um volume até entao de ações negociadas de 7.400,300. Vamos ver qual será o valor que nosso modelo irá prever de fechamento?

Iremos usar a técnica de vetor denso e informar os valores que mencionamos acima.

In [23]:
nova_acao = Vectors.dense(20, 22, 19, 7400300)
print(lrModelo.predict(nova_acao))

21.001749122154926


**Considerações Finais**

De uma maneira geral, e digo porque também faço operações na bolsa de valores, o resultado do modelo está parecendo satisfatório, fazendo sentido, teria que testar para conferir.

Melhorias como aumentar os números de atributos, como por exemplo, adicionar o ajuste de fechamento, tendencia do ativo e min e max semanal, valem muito a pena considerar e testar, mas de certa forma achei que as previsões de fechamento foram boas, realistas, como os parâmetros do modelo de Regressão Linear, mostraram.