# Instalación de librerias y lectura del Dataset

## Instalacion de librerias necesarias y montaje de Drive.



Se montan las librerias que seran utilizadas para el desarrollo del poyecto en cuestión. Entre estas se encuentran algunas de las libreriasbásicas de manejo de datos como pandas y numpy; adicionalmente sre instalan algunas instancias iniciales de Pyspark para el manejo de datos. Adicionalmente se monta la lectura de datos desde Google Drive donde será ingestado el Dataset previamente mencionado.

En partes posteriores del codigo se instalaran las demás librerias que vayan siendo encesarias

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=c24077dc0fe89e87dd9e97c33d8a0bb7601f0d77a7daec40dbf066569a548771
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyspark
%matplotlib inline

import pyarrow.parquet as pq
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Se inicia sedión de spark para lectura de los datos con un nombre genérico de aplicación, esta sirve para leer los datos originales bien sea en formato parquet o csv. Para el desarrollo del proyecto se selecciona un dataset curado que se ingesta como .csv

In [None]:
conf = SparkConf().set("spark.ui.port", "4050")

# creamos el contexto y la sesión
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.appName("Leer Archivo gzip").getOrCreate()

## Lectura de datos

Se cuenta con distintas instancias para el origen de data, esto debido a los diferentes equipos desde los que se trabajó el proyecto. Se dea activo aquel que corresponde a la últiuma instancia utilizada

In [None]:
path = '/content/drive/MyDrive/MCDA/Semestre 3/Mineria datos/Proyecto - Mineria de datos 2023/Data/SEN.csv'
# path = '/content/drive/MyDrive/Shared with me/Proyecto - Mineria de datos 2023/Data/SEN.csv'
# path = '/content/drive/MyDrive/Big Data/Proyecto/SEN.csv'

In [None]:
df = spark.read.format('csv').option('header','True').load(path)

In [None]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- FECHA CIERRE: string (nullable = true)
 |-- HORA DE CIERRE: string (nullable = true)
 |-- SESION/RUEDA: string (nullable = true)
 |-- INSTRUMENTO: string (nullable = true)
 |-- TASA/ PRECIO: string (nullable = true)
 |-- TASA/ PRECIO EQUIV.: string (nullable = true)
 |-- VR. NOMINAL: string (nullable = true)
 |-- CONTRAVALOR: string (nullable = true)



In [None]:
num_rows = df.count()
num_cols = len(df.columns)
print("Shape of DataFrame: ({}, {})".format(num_rows, num_cols))

Shape of DataFrame: (2055124, 9)


El Dataset ingestado a este notebook consta de 2.055.124 registros con 9 columnas y corresponde a la sintetización de la data disponible mes a mes en la página https://www.banrep.gov.co/es/sen-puntuales-cierres, esta primera etapa de transformación es realizada en un código aparte adjunto

# Filtros y transformaciones al Dataset

## Eliminación de columnas no deseadas, concatencaión y modificación de nombres de columnas 

Eliminamos columnas que no utilizaremos para el procesamiento

- 'SESION/RUEDA', pues solo trae como dato el string 'CONH', nombre de la rueda de contado.
- 'TASA/ PRECIO', puesto que no buscamos hacer predicciones sobre el precio, sino por la Tasa al Vencmiento, que está en otra columna. Esas dos columnas están relacionadas inversamente, entonces no aporta valor a la predicción el mantener los precios en el data set.
- 'VR. NOMINAL', pues no interesa el tamaño de la transacción, únicamente la tasa al vencimiento.
- 'CONTRAVALOR', pues es una columna que solo trae datos cuando se trata de la rueda de negociación de simultáneas.

In [None]:
df2 = df.drop('_c0', 'SESION/RUEDA', 'TASA/ PRECIO', 'VR. NOMINAL', 'CONTRAVALOR')

In [None]:
df2.printSchema()

root
 |-- FECHA CIERRE: string (nullable = true)
 |-- HORA DE CIERRE: string (nullable = true)
 |-- INSTRUMENTO: string (nullable = true)
 |-- TASA/ PRECIO EQUIV.: string (nullable = true)



In [None]:
df2.show()

+------------+--------------+------------+-------------------+
|FECHA CIERRE|HORA DE CIERRE| INSTRUMENTO|TASA/ PRECIO EQUIV.|
+------------+--------------+------------+-------------------+
|      110601|         81318|TFIT07150616|              7.102|
|      110601|         81318|TFIT07150616|              7.102|
|      110601|         81318|TFIT07150616|              7.102|
|      110601|         81318|TFIT07150616|              7.102|
|      110601|         81318|TFIT07150616|              7.102|
|      110601|         81318|TFIT07150616|                7.1|
|      110601|         81318|TFIT07150616|                7.1|
|      110601|         81318|TFIT07150616|                7.1|
|      110601|         81326|TFIT07150616|               7.08|
|      110601|         81557|TFIT16240724|              8.102|
|      110601|         81601|TFIT16240724|              8.102|
|      110601|         82422|TFIT16240724|              8.089|
|      110601|         82422|TFIT16240724|             

A continuación realizamos una corrección de formato a la colúmna Hora de Cierre, esta inicialmente se encuentra como string, esto puede generar incompatibilidades al momento de unir las columnas "HORA DE CIERRE" y "FECHA DE CIERRE". Por esto, aquellas horas con menos de 6 caracteres (antes de las 10 am) son completadas con un "0" a su izquierda

In [None]:
from pyspark.sql.functions import col, expr, from_unixtime, minute, last, lpad

df2 = df2.withColumn("HORA DE CIERRE", lpad(col("HORA DE CIERRE"), 6, "0"))

In [None]:
df2.show()

+------------+--------------+------------+-------------------+
|FECHA CIERRE|HORA DE CIERRE| INSTRUMENTO|TASA/ PRECIO EQUIV.|
+------------+--------------+------------+-------------------+
|      110601|        081318|TFIT07150616|              7.102|
|      110601|        081318|TFIT07150616|              7.102|
|      110601|        081318|TFIT07150616|              7.102|
|      110601|        081318|TFIT07150616|              7.102|
|      110601|        081318|TFIT07150616|              7.102|
|      110601|        081318|TFIT07150616|                7.1|
|      110601|        081318|TFIT07150616|                7.1|
|      110601|        081318|TFIT07150616|                7.1|
|      110601|        081326|TFIT07150616|               7.08|
|      110601|        081557|TFIT16240724|              8.102|
|      110601|        081601|TFIT16240724|              8.102|
|      110601|        082422|TFIT16240724|              8.089|
|      110601|        082422|TFIT16240724|             

Se concatenan  las colúmnas de FECHA y HORA en una sola columna en formato *datetime* para posteriores etapas de procesamiento 

In [None]:
# Assume df is your DataFrame with columns "date" and "time"
from pyspark.sql.functions import concat, col, to_timestamp

df_with_datetime = df2.withColumn(
    "datetime",
    to_timestamp(concat(col("FECHA CIERRE"), col("HORA DE CIERRE")), "yyMMddHHmmss")
)

# The "datetime" column will now contain a timestamp type
df_with_datetime.show()

+------------+--------------+------------+-------------------+-------------------+
|FECHA CIERRE|HORA DE CIERRE| INSTRUMENTO|TASA/ PRECIO EQUIV.|           datetime|
+------------+--------------+------------+-------------------+-------------------+
|      110601|        081318|TFIT07150616|              7.102|2011-06-01 08:13:18|
|      110601|        081318|TFIT07150616|              7.102|2011-06-01 08:13:18|
|      110601|        081318|TFIT07150616|              7.102|2011-06-01 08:13:18|
|      110601|        081318|TFIT07150616|              7.102|2011-06-01 08:13:18|
|      110601|        081318|TFIT07150616|              7.102|2011-06-01 08:13:18|
|      110601|        081318|TFIT07150616|                7.1|2011-06-01 08:13:18|
|      110601|        081318|TFIT07150616|                7.1|2011-06-01 08:13:18|
|      110601|        081318|TFIT07150616|                7.1|2011-06-01 08:13:18|
|      110601|        081326|TFIT07150616|               7.08|2011-06-01 08:13:26|
|   

Se prueba de exsitencia de nulos en la columna *datetime* generada

In [None]:
df_null = df_with_datetime.filter(df_with_datetime['datetime'].isNull())
df_null.show()

+------------+--------------+-----------+-------------------+--------+
|FECHA CIERRE|HORA DE CIERRE|INSTRUMENTO|TASA/ PRECIO EQUIV.|datetime|
+------------+--------------+-----------+-------------------+--------+
+------------+--------------+-----------+-------------------+--------+



Drop de columnas que no se consideran y modificación del titulo de columna "TASA/ PRECIO EQUIV." a "YTM" para facilitar la nomenclatura y codigo posterior

In [None]:
df3 = df_with_datetime.select('*').drop('FECHA CIERRE', 'HORA DE CIERRE')
df3.show()

+------------+-------------------+-------------------+
| INSTRUMENTO|TASA/ PRECIO EQUIV.|           datetime|
+------------+-------------------+-------------------+
|TFIT07150616|              7.102|2011-06-01 08:13:18|
|TFIT07150616|              7.102|2011-06-01 08:13:18|
|TFIT07150616|              7.102|2011-06-01 08:13:18|
|TFIT07150616|              7.102|2011-06-01 08:13:18|
|TFIT07150616|              7.102|2011-06-01 08:13:18|
|TFIT07150616|                7.1|2011-06-01 08:13:18|
|TFIT07150616|                7.1|2011-06-01 08:13:18|
|TFIT07150616|                7.1|2011-06-01 08:13:18|
|TFIT07150616|               7.08|2011-06-01 08:13:26|
|TFIT16240724|              8.102|2011-06-01 08:15:57|
|TFIT16240724|              8.102|2011-06-01 08:16:01|
|TFIT16240724|              8.089|2011-06-01 08:24:22|
|TFIT16240724|              8.088|2011-06-01 08:24:22|
|TFIT16240724|              8.088|2011-06-01 08:24:22|
|TFIT16240724|              8.088|2011-06-01 08:24:22|
|TFIT06141

In [None]:
column_index = df3.columns[1]

In [None]:
column_index = df3.columns[1]
df3 = df3.withColumnRenamed(column_index, "YTM")
df3.show()

+------------+-----+-------------------+
| INSTRUMENTO|  YTM|           datetime|
+------------+-----+-------------------+
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|  7.1|2011-06-01 08:13:18|
|TFIT07150616|  7.1|2011-06-01 08:13:18|
|TFIT07150616|  7.1|2011-06-01 08:13:18|
|TFIT07150616| 7.08|2011-06-01 08:13:26|
|TFIT16240724|8.102|2011-06-01 08:15:57|
|TFIT16240724|8.102|2011-06-01 08:16:01|
|TFIT16240724|8.089|2011-06-01 08:24:22|
|TFIT16240724|8.088|2011-06-01 08:24:22|
|TFIT16240724|8.088|2011-06-01 08:24:22|
|TFIT16240724|8.088|2011-06-01 08:24:22|
|TFIT06141113|6.175|2011-06-01 08:29:10|
|TFIT06140514|6.507|2011-06-01 08:33:38|
|TFIT06140514|6.507|2011-06-01 08:34:08|
|TFIT06141113| 6.13|2011-06-01 08:35:25|
|TFIT07150616| 7.09|2011-06-01 08:36:46|
+------------+-----+-------------------+
only showing top

Filtrado del Dataset para incluir unicamente INSTRUMENTOS del tipo TFIT

In [None]:
df3 = df3.filter(col('INSTRUMENTO').startswith('TFIT'))
df3.show()

+------------+-----+-------------------+
| INSTRUMENTO|  YTM|           datetime|
+------------+-----+-------------------+
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|7.102|2011-06-01 08:13:18|
|TFIT07150616|  7.1|2011-06-01 08:13:18|
|TFIT07150616|  7.1|2011-06-01 08:13:18|
|TFIT07150616|  7.1|2011-06-01 08:13:18|
|TFIT07150616| 7.08|2011-06-01 08:13:26|
|TFIT16240724|8.102|2011-06-01 08:15:57|
|TFIT16240724|8.102|2011-06-01 08:16:01|
|TFIT16240724|8.089|2011-06-01 08:24:22|
|TFIT16240724|8.088|2011-06-01 08:24:22|
|TFIT16240724|8.088|2011-06-01 08:24:22|
|TFIT16240724|8.088|2011-06-01 08:24:22|
|TFIT06141113|6.175|2011-06-01 08:29:10|
|TFIT06140514|6.507|2011-06-01 08:33:38|
|TFIT06140514|6.507|2011-06-01 08:34:08|
|TFIT06141113| 6.13|2011-06-01 08:35:25|
|TFIT07150616| 7.09|2011-06-01 08:36:46|
+------------+-----+-------------------+
only showing top

## Modificación de columnas temporales para intervalos definidos en el análisis

Se importan librerias necesarias y se modifica la temporalidad de los datos enviandolos a la hora inferior en cada registro.

El objetivo es trabajar las predicciones en intervalos de una hora, logrando agrupar los datos y representarlos con el dato de cierre de dicha hora, siguiendo las prácticas comunes del sector financiero.

In [None]:
from pyspark.sql.functions import window, last
from pyspark.sql import functions as F

In [None]:
df3 = df3.withColumn('hourly_datetime', F.date_trunc('hour', df3['datetime']))
df3.show()

+------------+-----+-------------------+-------------------+
| INSTRUMENTO|  YTM|           datetime|    hourly_datetime|
+------------+-----+-------------------+-------------------+
|TFIT07150616|7.102|2011-06-01 08:13:18|2011-06-01 08:00:00|
|TFIT07150616|7.102|2011-06-01 08:13:18|2011-06-01 08:00:00|
|TFIT07150616|7.102|2011-06-01 08:13:18|2011-06-01 08:00:00|
|TFIT07150616|7.102|2011-06-01 08:13:18|2011-06-01 08:00:00|
|TFIT07150616|7.102|2011-06-01 08:13:18|2011-06-01 08:00:00|
|TFIT07150616|  7.1|2011-06-01 08:13:18|2011-06-01 08:00:00|
|TFIT07150616|  7.1|2011-06-01 08:13:18|2011-06-01 08:00:00|
|TFIT07150616|  7.1|2011-06-01 08:13:18|2011-06-01 08:00:00|
|TFIT07150616| 7.08|2011-06-01 08:13:26|2011-06-01 08:00:00|
|TFIT16240724|8.102|2011-06-01 08:15:57|2011-06-01 08:00:00|
|TFIT16240724|8.102|2011-06-01 08:16:01|2011-06-01 08:00:00|
|TFIT16240724|8.089|2011-06-01 08:24:22|2011-06-01 08:00:00|
|TFIT16240724|8.088|2011-06-01 08:24:22|2011-06-01 08:00:00|
|TFIT16240724|8.088|2011

Se ordena la data por la columna original *datetime*

In [None]:
df3 = df3.orderBy("datetime")
df3.show(30)

+------------+-----+-------------------+-------------------+
| INSTRUMENTO|  YTM|           datetime|    hourly_datetime|
+------------+-----+-------------------+-------------------+
|TFIT04180511| 3.12|2011-01-03 08:25:08|2011-01-03 08:00:00|
|TFIT04180511|3.115|2011-01-03 08:25:08|2011-01-03 08:00:00|
|TFIT04180511|3.113|2011-01-03 08:25:08|2011-01-03 08:00:00|
|TFIT04180511|3.113|2011-01-03 08:25:08|2011-01-03 08:00:00|
|TFIT04180511| 3.11|2011-01-03 08:25:08|2011-01-03 08:00:00|
|TFIT04180511| 3.11|2011-01-03 08:25:08|2011-01-03 08:00:00|
|TFIT04180511| 3.11|2011-01-03 08:25:08|2011-01-03 08:00:00|
|TFIT04180511| 3.11|2011-01-03 08:25:08|2011-01-03 08:00:00|
|TFIT04180511| 3.11|2011-01-03 08:25:08|2011-01-03 08:00:00|
|TFIT04180511|3.118|2011-01-03 08:25:24|2011-01-03 08:00:00|
|TFIT04180511|3.118|2011-01-03 08:25:46|2011-01-03 08:00:00|
|TFIT04180511|3.102|2011-01-03 08:26:25|2011-01-03 08:00:00|
|TFIT04180511|3.102|2011-01-03 08:26:25|2011-01-03 08:00:00|
|TFIT04150812|4.724|2011

## Generación de nueva data nuevos datos para los intervalos de tiempo en una ventana horaria definida

Se genera una ventana horaria para la data, además se genera la nueva data "last_yield" correspondiente al último valor de *YTM* en cada espacio horario por INSTRUMENTO en el Dataset

In [None]:
window = Window.partitionBy('hourly_datetime', 'INSTRUMENTO').orderBy(df3['datetime'].desc())

In [None]:
df4 = df3.withColumn('last_yield', F.first('YTM').over(window))
df4 = df4.orderBy("datetime")
df4 = df4.dropDuplicates(['INSTRUMENTO', 'hourly_datetime'])
df4 = df4.orderBy('hourly_datetime')
df4.show()

+------------+-----+-------------------+-------------------+----------+
| INSTRUMENTO|  YTM|           datetime|    hourly_datetime|last_yield|
+------------+-----+-------------------+-------------------+----------+
|TFIT04180511| 3.12|2011-01-03 08:25:08|2011-01-03 08:00:00|       3.1|
|TFIT04150812|4.724|2011-01-03 08:29:54|2011-01-03 08:00:00|     4.724|
|TFIT16240724|  8.2|2011-01-03 08:56:33|2011-01-03 08:00:00|       8.2|
|TFIT04170413| 5.85|2011-01-03 09:03:33|2011-01-03 09:00:00|     5.843|
|TFIT04180511|3.102|2011-01-03 09:16:46|2011-01-03 09:00:00|     3.128|
|TFIT04150812| 4.66|2011-01-03 09:00:08|2011-01-03 09:00:00|     4.706|
|TFIT06140514|6.498|2011-01-03 09:19:37|2011-01-03 09:00:00|     6.498|
|TFIT06140514|  6.5|2011-01-03 10:04:47|2011-01-03 10:00:00|      6.53|
|TFIT04180511| 3.13|2011-01-03 10:03:08|2011-01-03 10:00:00|     3.151|
|TFIT07150616| 7.16|2011-01-03 10:34:11|2011-01-03 10:00:00|     7.188|
|TFIT04150812|4.699|2011-01-03 10:27:56|2011-01-03 10:00:00|    

In [None]:
df4 = df4.drop("YTM","datetime")
df4.show()

+------------+-------------------+----------+
| INSTRUMENTO|    hourly_datetime|last_yield|
+------------+-------------------+----------+
|TFIT04150812|2011-01-03 08:00:00|     4.724|
|TFIT04180511|2011-01-03 08:00:00|       3.1|
|TFIT16240724|2011-01-03 08:00:00|       8.2|
|TFIT04170413|2011-01-03 09:00:00|     5.843|
|TFIT04150812|2011-01-03 09:00:00|     4.706|
|TFIT04180511|2011-01-03 09:00:00|     3.128|
|TFIT06140514|2011-01-03 09:00:00|     6.498|
|TFIT04170413|2011-01-03 10:00:00|     5.859|
|TFIT04180511|2011-01-03 10:00:00|     3.151|
|TFIT04150812|2011-01-03 10:00:00|     4.743|
|TFIT15240720|2011-01-03 10:00:00|     7.734|
|TFIT06140514|2011-01-03 10:00:00|      6.53|
|TFIT07150616|2011-01-03 10:00:00|     7.188|
|TFIT16240724|2011-01-03 10:00:00|     8.192|
|TFIT04180511|2011-01-03 11:00:00|     3.138|
|TFIT04150812|2011-01-03 11:00:00|     4.747|
|TFIT15240720|2011-01-03 11:00:00|     7.738|
|TFIT04150812|2011-01-03 12:00:00|     4.758|
|TFIT04180511|2011-01-03 12:00:00|

##Filtrado de titulos relevantes

Se filtran titulos relevantes para el análisis, en este caso los titulos definidos corresponden a titulos de alta liquidez y con tiempos de transacción similares. Los demás títulos no son reconocidos como de alta liquidez, lo que implica que pueden llegar a generar bastantes problemas en los modelos de machine learning por la alta prevalencia de valores Null y por su poca profundidad en el mercado de valores.

In [None]:
df_4 = df4.filter(df4['INSTRUMENTO'].isin(['TFIT15260826', 'TFIT16240724', 'TFIT16280428']))

In [None]:
df_4.show()

+------------+-------------------+----------+
| INSTRUMENTO|    hourly_datetime|last_yield|
+------------+-------------------+----------+
|TFIT16240724|2011-01-03 08:00:00|       8.2|
|TFIT16240724|2011-01-03 10:00:00|     8.192|
|TFIT16240724|2011-01-04 09:00:00|     8.255|
|TFIT16240724|2011-01-04 10:00:00|     8.255|
|TFIT16240724|2011-01-04 12:00:00|     8.253|
|TFIT16240724|2011-01-05 09:00:00|      8.33|
|TFIT16240724|2011-01-05 10:00:00|      8.29|
|TFIT16240724|2011-01-05 12:00:00|      8.28|
|TFIT16240724|2011-01-06 09:00:00|     8.363|
|TFIT16240724|2011-01-06 10:00:00|      8.32|
|TFIT16240724|2011-01-06 11:00:00|     8.321|
|TFIT16240724|2011-01-06 12:00:00|      8.27|
|TFIT16240724|2011-01-07 08:00:00|     8.239|
|TFIT16240724|2011-01-07 10:00:00|      8.27|
|TFIT16240724|2011-01-07 11:00:00|     8.243|
|TFIT16240724|2011-01-11 10:00:00|      8.24|
|TFIT16240724|2011-01-11 11:00:00|      8.17|
|TFIT16240724|2011-01-11 12:00:00|     8.165|
|TFIT16240724|2011-01-12 08:00:00|

## Generación de Dataset para ingesta en modelos

Se realiza el pivoteo de la data por los INSTRUMENTOS seleccionados, esto genera una nueva estructura de datos que permite ver los valores de "YTM" para cada Instrumento sleccionada en cada intervalo de tiempo del Dataset original

In [None]:
df_pivot = df_4.groupBy("hourly_datetime").pivot("INSTRUMENTO").agg(F.first('last_yield'))

In [None]:
df_pivot = df_pivot.dropna()

In [None]:
df_pivot = df_pivot.orderBy('hourly_datetime')
df_pivot.show()

+-------------------+------------+------------+------------+
|    hourly_datetime|TFIT15260826|TFIT16240724|TFIT16280428|
+-------------------+------------+------------+------------+
|2013-01-10 12:00:00|       5.601|       5.469|        5.77|
|2013-01-14 10:00:00|        5.56|       5.414|       5.739|
|2013-01-14 11:00:00|       5.572|       5.418|        5.74|
|2013-01-15 09:00:00|       5.575|       5.435|       5.758|
|2013-01-15 11:00:00|       5.587|       5.445|        5.76|
|2013-01-15 12:00:00|       5.599|       5.465|       5.773|
|2013-01-16 09:00:00|       5.531|       5.404|        5.71|
|2013-01-16 10:00:00|        5.52|        5.37|       5.651|
|2013-01-16 11:00:00|       5.523|        5.37|        5.67|
|2013-01-16 12:00:00|       5.524|       5.355|       5.679|
|2013-01-17 08:00:00|       5.499|       5.341|        5.64|
|2013-01-17 11:00:00|       5.535|        5.36|       5.705|
|2013-01-17 12:00:00|        5.55|       5.371|        5.67|
|2013-01-18 11:00:00|   

In [None]:
from pyspark.sql.functions import lag
windowSpec = Window.orderBy('hourly_datetime')
df_pivot = df_pivot.withColumn('label', lag('TFIT16240724').over(windowSpec))
df_pivot = df_pivot.dropna()
df_pivot.show()

+-------------------+------------+------------+------------+-----+
|    hourly_datetime|TFIT15260826|TFIT16240724|TFIT16280428|label|
+-------------------+------------+------------+------------+-----+
|2013-01-14 10:00:00|        5.56|       5.414|       5.739|5.469|
|2013-01-14 11:00:00|       5.572|       5.418|        5.74|5.414|
|2013-01-15 09:00:00|       5.575|       5.435|       5.758|5.418|
|2013-01-15 11:00:00|       5.587|       5.445|        5.76|5.435|
|2013-01-15 12:00:00|       5.599|       5.465|       5.773|5.445|
|2013-01-16 09:00:00|       5.531|       5.404|        5.71|5.465|
|2013-01-16 10:00:00|        5.52|        5.37|       5.651|5.404|
|2013-01-16 11:00:00|       5.523|        5.37|        5.67| 5.37|
|2013-01-16 12:00:00|       5.524|       5.355|       5.679| 5.37|
|2013-01-17 08:00:00|       5.499|       5.341|        5.64|5.355|
|2013-01-17 11:00:00|       5.535|        5.36|       5.705|5.341|
|2013-01-17 12:00:00|        5.55|       5.371|        5.67| 5

In [None]:
df_pivot = df_pivot.withColumn('TFIT15260826', col('TFIT15260826').cast('float'))
df_pivot = df_pivot.withColumn('TFIT16240724', col('TFIT16280428').cast('float'))
df_pivot = df_pivot.withColumn('TFIT16280428', col('TFIT16280428').cast('float'))
df_pivot = df_pivot.withColumn('label', col('label').cast('float'))

In [None]:
df_pivot.printSchema()

root
 |-- hourly_datetime: timestamp (nullable = true)
 |-- TFIT15260826: float (nullable = true)
 |-- TFIT16240724: float (nullable = true)
 |-- TFIT16280428: float (nullable = true)
 |-- label: float (nullable = true)



#Modelamiento

# Inslatación de librerias para modelado y estructuracion de datos para este

Se instalan librerias de Pyspark necesarias para la estructuración de los datos a ser entregados a los modelos, así como los modelos en si. Los modelos utilizados, así como el análisis de los resultados y su significancia financiera serán discutidos a profundidad en el informe que acompaña la entrega de este código. 

## Random Forest

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.window import Window

assembler = VectorAssembler(inputCols=['TFIT15260826', 'TFIT16240724', 'TFIT16280428'], outputCol='features')

trainData, testData = df_pivot.randomSplit([0.7, 0.3], seed=42)

rf = RandomForestRegressor(featuresCol='features', labelCol='label')

pipeline = Pipeline(stages=[assembler, rf])

model = pipeline.fit(trainData)

predictions = model.transform(testData)

evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

rf_model = model.stages[-1]
feature_importances = rf_model.featureImportances
print("Feature Importances:")
for feature, importance in zip(['TFIT15260826', 'TFIT16240724', 'TFIT16280428'], feature_importances):
    print(f"{feature}: {importance}")

RMSE: 0.33042670718546235
Feature Importances:
TFIT15260826: 0.41335828705478556
TFIT16240724: 0.36841227712059144
TFIT16280428: 0.2182294358246229


Se relacionan las tasas de mercado del título en cuestión, contra la predicción del modelo.

El modelo tiene un RMSE bastante bajo, lo que indica precisión en el valor puntual de las predicciones.

In [None]:
predictions.select('label', 'prediction').show()

+-----+-----------------+
|label|       prediction|
+-----+-----------------+
|5.418|4.997124018345327|
|5.404|4.945243871863896|
| 5.37|4.997124018345327|
|5.355|4.945243871863896|
|5.325|4.997124018345327|
| 5.31|4.997124018345327|
|  5.3|4.997124018345327|
|5.257|4.945243871863896|
|5.248|4.945243871863896|
| 5.27|4.945243871863896|
| 5.26|4.945243871863896|
|5.263|4.945243871863896|
|5.233|4.945243871863896|
|5.215|4.945243871863896|
|  5.2|4.945243871863896|
|5.195|4.945243871863896|
|5.196|4.945243871863896|
| 5.08| 4.67640018272474|
|5.055|4.945243871863896|
|5.099| 4.67640018272474|
+-----+-----------------+
only showing top 20 rows



Al evaluar en términos de accuracy sobre la capacidad del modelo, no de predecir el valor puntual de la tasa de interés de mercado, sino de su dirección para el día siguiente, encontramos que esta medida de desempeño tiene un valor aproximado de 0.6, que, en el contexto financiero de bursátil, es un gran resultado, y que abre la posibilidad de que el modelo esté cumpliendo con su objetivo de generar rentabilidad.

En ese sentido, evaluamos para cada día la cantidad de puntos básicos (1) que la estrategia es general al tomar posición. 

El modelo toma posición de compra cuando la predicción de la dirección sea a la baja, y de venta, cuando la predicción sea que la tasa de interés del título se incrementará (2).

Por último, teniendo en cuenta la posición tomada, calculamos la cantidad de puntos básicos (1) que la estrategia obtuvo, sean de utilidad o pérdida.

Ref: \
1. Puntos Básicos: Dado que la renta fija se negocia en el mercado de valores por tasa, las ganancias o pérdidas se dan según los cambios de esas tasas. Un punto básico es la centésima parte de 1%, o 1/10000, o 0.01%.
2. A mayor tasa de interés de mercado, menor será el precio de mercado del título. Por lo tanto, en trading de renta fija, se genera incremento en el precio cuando la tasa de interés cae X puntos básicos, y se genera disminución del precio cuando la tasa de interés sube.

In [None]:
predictions = predictions.withColumn('prev_label', lag('label').over(Window.orderBy('hourly_datetime')))

# Create a new column 'label_direction' based on the change between previous label and actual label
predictions = predictions.withColumn('label_direction',
                                     when((col('label') - col('prev_label')) > 0, 1)  # Positive change
                                     .when((col('label') - col('prev_label')) < 0, -1)  # Negative change
                                     .otherwise(0)  # No change
                                     )

# Create a new column 'prediction_direction' based on the change between actual prediction and previous label
predictions = predictions.withColumn('prediction_direction',
                                     when((col('prediction') - col('prev_label')) > 0, 1)  # Positive change
                                     .when((col('prediction') - col('prev_label')) < 0, -1)  # Negative change
                                     .otherwise(0)  # No change
                                     )

predictions = predictions.withColumn('basis_points_earned',
                                     when((col('label_direction') == col('prediction_direction')), abs(col('label') - col('prev_label')))
                                     .otherwise(-abs(col('label') - col('prev_label')))  # 
                                     )

# Calculate the accuracy in terms of direction
accuracy_direction = predictions.filter(col('label_direction') == col('prediction_direction')).count() / predictions.count()

print(f"Accuracy in terms of direction: {accuracy_direction}")

Accuracy in terms of direction: 0.5899725274725275


In [None]:
predictions.show()

+-------------------+------------+------------+------------+-----+--------------------+-----------------+----------+---------------+--------------------+-------------------+
|    hourly_datetime|TFIT15260826|TFIT16240724|TFIT16280428|label|            features|       prediction|prev_label|label_direction|prediction_direction|basis_points_earned|
+-------------------+------------+------------+------------+-----+--------------------+-----------------+----------+---------------+--------------------+-------------------+
|2013-01-15 09:00:00|       5.575|       5.758|       5.758|5.418|[5.57499980926513...|4.997124018345327|      null|              0|                   0|               null|
|2013-01-16 10:00:00|        5.52|       5.651|       5.651|5.404|[5.51999998092651...|4.945243871863896|     5.418|             -1|                  -1|        0.014000416|
|2013-01-16 12:00:00|       5.524|       5.679|       5.679| 5.37|[5.52400016784668...|4.997124018345327|     5.404|             -

Sumamos los puntos básicos ganados o pérdidos para cada una de las predicciones en los intervalos definidos y para todo el periodo de análisis. Se multiplica por mil para dejar los puntos básicos en escala 1:1.

Se debe tener en cuenta que se ganan puntos básicos cuando el modelo predice la dirección del mercado, y toma posiciones de compra o venta que ganarán puntos básicos en la siguiente hora. Y se pierden cuando la predicción es incorrecta, se toman posiciones basadas en esa predicción y el mercado toma la dirección contraria. 

In [None]:
basis_earned_total = predictions.select(sum('basis_points_earned')).first()[0]

print(f"Basis earned: {basis_earned_total*100}")

Basis earned: 4299.900054931641


In [None]:
pandas_df = predictions.toPandas()

# Export the pandas DataFrame to Excel
pandas_df.to_excel('/content/drive/MyDrive/Big Data/Proyecto/output_randomforest.xlsx', index=False)

## Gradient Boosting

In [None]:
from pyspark.ml.regression import GBTRegressor 

# Create a Gradient Boosted Trees regressor
gbt = GBTRegressor(featuresCol='features', labelCol='label')  # Use GBTRegressor instead of RandomForestRegressor

# Build the ML pipeline
pipeline = Pipeline(stages=[assembler, gbt])  # Replace 'rf' with 'gbt'

# Train the model on the training data
model = pipeline.fit(trainData)

# Make predictions on the test data
predictions = model.transform(testData)

# Evaluate the model (you can use different metrics for regression)
evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

# You can also access the feature importances
gbt_model = model.stages[-1]  # Get the GBTRegressor model
feature_importances = gbt_model.featureImportances
print("Feature Importances:")
for feature, importance in zip(['TFIT15260826', 'TFIT16240724', 'TFIT16280428'], feature_importances):
    print(f"{feature}: {importance}")

RMSE: 0.24911457916915009
Feature Importances:
TFIT15260826: 0.3848258630945847
TFIT16240724: 0.6151741369054153
TFIT16280428: 0.0


In [None]:
predictions = predictions.withColumn('prev_label', lag('label').over(Window.orderBy('hourly_datetime')))

# Create a new column 'label_direction' based on the change between previous label and actual label
predictions = predictions.withColumn('label_direction',
                                     when((col('label') - col('prev_label')) > 0, 1)  # Positive change
                                     .when((col('label') - col('prev_label')) < 0, -1)  # Negative change
                                     .otherwise(0)  # No change
                                     )

# Create a new column 'prediction_direction' based on the change between actual prediction and previous label
predictions = predictions.withColumn('prediction_direction',
                                     when((col('prediction') - col('prev_label')) > 0, 1)  # Positive change
                                     .when((col('prediction') - col('prev_label')) < 0, -1)  # Negative change
                                     .otherwise(0)  # No change
                                     )

predictions = predictions.withColumn('basis_points_earned',
                                     when((col('label_direction') == col('prediction_direction')), abs(col('label') - col('prev_label')))
                                     .otherwise(-abs(col('label') - col('prev_label')))  # 
                                     )

# Calculate the accuracy in terms of direction
accuracy_direction = predictions.filter(col('label_direction') == col('prediction_direction')).count() / predictions.count()

basis_earned_total = predictions.select(sum('basis_points_earned')).first()[0]

print(f"Accuracy in terms of direction: {accuracy_direction}")
print(f"Basis earned: {basis_earned_total*100}")

Accuracy in terms of direction: 0.6572802197802198
Basis earned: 5883.50043296814


Repetimos para un modelo de Gradient Boosted, y obtenemos un mejor RMSE y mejor Accuracy, que implica una mayor utilidad en términos de puntos básicos.

Después de concluir que ambos modelos pueden generar una gran cantidad de puntos básicos, es decir, de utilidad, si se quiere, se puede estimar dicha utilidad en términos monetarios multiplicando esa cantidad de puntos básicos por un DV01 (1) promedio para los TES con vencimiento en julio de 2024. Este DV01 promedio es de 500 mil pesos para un portafolio de mil millones de pesos (Se puede escalar dependiendo del tamaño del portafolio).

Ref:\
1. DV01: O 'dollar duration', es una medida de sensibilidad de portafolios de renta fija en términos monetarios. Para las prácticas del mercado financiero colombiano, el DV01 es la utilidad en pesos que se genera en un portafolio de renta fija de x activos por la variación en un punto básico en la tasa de interés.