# ACTIVIDAD 4: MÉTRICAS DE CALIDAD DE RESULTADOS

**Análisis de grandes volúmenes de datos**

José María Blancas Ortiz
A01363436

En esta actividad, a diferencia de la pasada, vamos a generar diversas muestras que nos permitan generar diferentes conjuntos de entrenamiento y prueba y que demuestren a través de métricas la eficiencia del modelo elegido.



# Carga de Datos

In [45]:
!pip install -q kaggle pyspark

In [46]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle (1).json


{'kaggle (1).json': b'{"username":"josmarablancasortiz","key":"17b699eb45f360a1dcea3f14aadd1423"}'}

In [47]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [48]:
!kaggle datasets download -d jakewright/9000-tickers-of-stock-market-data-full-history

Dataset URL: https://www.kaggle.com/datasets/jakewright/9000-tickers-of-stock-market-data-full-history
License(s): other
9000-tickers-of-stock-market-data-full-history.zip: Skipping, found more recently modified local copy (use --force to force download)


In [49]:
!unzip 9000-tickers-of-stock-market-data-full-history.zip -d stock_data
!ls stock_data | head -20

Archive:  9000-tickers-of-stock-market-data-full-history.zip
replace stock_data/all_stock_data.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: stock_data/all_stock_data.csv  y

replace stock_data/all_stock_data.parquet? [y]es, [n]o, [A]ll, [N]one, [r]ename:   inflating: stock_data/all_stock_data.parquet  
all_stock_data.csv
all_stock_data.parquet


In [50]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("KaggleData").getOrCreate()

In [51]:
df = spark.read.csv("stock_data/all_stock_data.csv", header=True, inferSchema=True)
df.show(5)

+----------+------+----+-------------------+-------------------+-------------------+---------+---------+------------+
|      Date|Ticker|Open|               High|                Low|              Close|   Volume|Dividends|Stock Splits|
+----------+------+----+-------------------+-------------------+-------------------+---------+---------+------------+
|1962-01-02|    ED| 0.0| 0.2658275556233194|0.26178762316703796|0.26178762316703796|  25600.0|      0.0|         0.0|
|1962-01-02|   CVX| 0.0|0.04680890217423439|0.04606926600933256|0.04680890217423439| 105840.0|      0.0|         0.0|
|1962-01-02|    GD| 0.0|0.21003275954390174|0.20306070787008793| 0.2082897424697876|2648000.0|      0.0|         0.0|
|1962-01-02|    BP| 0.0|0.14143933090345925|0.13952797651290894|0.13952797651290894|  77440.0|      0.0|         0.0|
|1962-01-02|   MSI| 0.0| 0.7649229763450202| 0.7452535214492476| 0.7518101930618286|  65671.0|      0.0|         0.0|
+----------+------+----+-------------------+------------

### Muestreo y Preprocesamiento

En esta parte, tal y como lo hicimos en la actividad pasada, no solo quitamos outliers y registros nulos de toda la base, sino que también generamos particiones a través de combinaciones de volumen y precios. Esto nos permite "saltarnos" una limpieza adicional en el muestreo y pasar directo a unir datos y continuar con los modelos.

In [52]:
from pyspark.sql.functions import col

df = df.withColumn("Stock Splits", col("Stock Splits").cast("double"))

In [53]:
df = df.na.drop(subset=["Close", "Volume", "Dividends", "Stock Splits"])

In [54]:
from pyspark.sql.functions import isnan, when, count

numeric_cols = [c for c, dtype in df.dtypes if dtype in ['double', 'float', 'int']]

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in numeric_cols]).show()

+----+----+---+-----+------+---------+------------+
|Open|High|Low|Close|Volume|Dividends|Stock Splits|
+----+----+---+-----+------+---------+------------+
|   3|   3|  3|    0|     0|        0|           0|
+----+----+---+-----+------+---------+------------+



In [55]:
df.describe("Close", "Volume", "Dividends", "Stock Splits").show()

+-------+--------------------+--------------------+--------------------+------------+
|summary|               Close|              Volume|           Dividends|Stock Splits|
+-------+--------------------+--------------------+--------------------+------------+
|  count|            34646152|            34646152|            34646152|    34646152|
|   mean|1.115048747180934...|  1339233.9657774116|0.003979243062109...|    Infinity|
| stddev| 3.95473476978514E25|1.5671722641516324E7|   1.603518382769459|         NaN|
|    min|-8.21004442335131...|                 0.0|                 0.0|         0.0|
|    max|1.507289774427978...|          9.230856E9|              4500.0|    Infinity|
+-------+--------------------+--------------------+--------------------+------------+



In [56]:
columnas = ["Close", "Volume", "Dividends", "Stock Splits"]
qs = [0.25, 0.5, 0.75]

percentiles = {}
for col in columnas:
    percentiles[col] = df.approxQuantile(col, qs, 0.01)

In [57]:
for col in percentiles:
    print(f"Percentiles para la columna '{col}':")
    print(f"Q1 = {percentiles[col][0]}, Q2 = {percentiles[col][1]}, Q3 = {percentiles[col][2]}")
    print()

Percentiles para la columna 'Close':
Q1 = 3.2948734760284424, Q2 = 10.808192253112793, Q3 = 27.760000228881836

Percentiles para la columna 'Volume':
Q1 = 2600.0, Q2 = 52700.0, Q3 = 420300.0

Percentiles para la columna 'Dividends':
Q1 = 0.0, Q2 = 0.0, Q3 = 0.0

Percentiles para la columna 'Stock Splits':
Q1 = 0.0, Q2 = 0.0, Q3 = 0.0



In [58]:
from pyspark.sql.functions import col

Q1Close, _, Q3Close = percentiles["Close"]
IQRClose = Q3Close - Q1Close
LowClose = Q1Close - 1.5 * IQRClose
HighClose = Q3Close + 1.5 * IQRClose

Q1Volume, _, Q3Volume = percentiles["Volume"]
IQRVolume = Q3Volume - Q1Volume
LowVolume = Q1Volume - 1.5 * IQRVolume
HighVolume = Q3Volume + 1.5 * IQRVolume

nooutliersdf = df.filter((col("Close") >= LowClose) & (col("Close") <= HighClose) &(col("Volume") >= LowVolume) & (col("Volume") <= HighVolume))

In [59]:
nooutliersdf.describe("Close", "Volume").show()

+-------+------------------+------------------+
|summary|             Close|            Volume|
+-------+------------------+------------------+
|  count|          25756757|          25756757|
|   mean|12.639581447841836|138584.14815056103|
| stddev| 13.65059090561122|223930.60922598085|
|    min|-33.37889099121094|               0.0|
|    max|  64.4576416015625|         1046850.0|
+-------+------------------+------------------+



In [60]:
C1 = nooutliersdf.filter(col("Close") < 5).count()
C2 = nooutliersdf.filter((col("Close") >= 5) & (col("Close") <= 25)).count()
C3 = nooutliersdf.filter(col("Close") > 25).count()                                                              # Quitamos outliers y calculamos probabilidades de las variables C (posteriormente hacemos lo mismo con V y D)

print("Probabilidad C1 =", C1 / (C1 + C2 + C3))
print("Probabilidad C2 =", C2 / (C1 + C2 + C3))
print("Probabilidad C3 =", C3 / (C1 + C2 + C3))

Probabilidad C1 = 0.38309457980288436
Probabilidad C2 = 0.4578938256862073
Probabilidad C3 = 0.15901159451090835


In [61]:
V1 = nooutliersdf.filter(col("Volume") >=100000).count()
V2 = nooutliersdf.filter(col("Volume") < 100000).count()

print("Probabilidad V1 =", V1 / (V1 + V2))
print("Probabilidad V2 =", V2 / (V1 + V2))

Probabilidad V1 = 0.32900232742809976
Probabilidad V2 = 0.6709976725719002


In [62]:
D1 = nooutliersdf.filter(col("Dividends") == 0).count()
D2 = nooutliersdf.filter(col("Dividends") > 0).count()

print("Probabilidad D1 =", D1 / (D1 + D2))
print("Probabilidad D2 =", D2 / (D1 + D2))

Probabilidad D1 = 0.9927325089878357
Probabilidad D2 = 0.0072674910121643035


In [63]:
C1V1 = nooutliersdf.filter((col("Close")  < 5) & (col("Volume") >=100000))                                                                  # Generamos las combinaciones correspondientes y se generan las particiones
C1V1.show()

+----------+------+--------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|      Date|Ticker|                Open|                High|                 Low|               Close|  Volume|Dividends|Stock Splits|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|1962-01-02|   CVX|                 0.0| 0.04680890217423439| 0.04606926600933256| 0.04680890217423439|105840.0|      0.0|         0.0|
|1962-01-02|   MMM|                 0.0|   0.541890852671279|  0.5259528842775644|  0.5299372673034668|254509.0|      0.0|         0.0|
|1962-01-02|   DIS| 0.05835945904254913|  0.0603180077410653| 0.05835945904254913| 0.05835945904254913|841958.0|      0.0|         0.0|
|1962-01-02|    MO|                 0.0| 0.00333833846261782|0.003293024608865...|0.003293024608865...|345600.0|      0.0|         0.0|
|1962-01-02|    PG|                 0.0|  0.2749

In [64]:
C1V2 = nooutliersdf.filter((col("Close")  < 5) & (col("Volume") < 100000))
C1V2.show()

+----------+------+------------------+-------------------+-------------------+-------------------+-------+---------+------------+
|      Date|Ticker|              Open|               High|                Low|              Close| Volume|Dividends|Stock Splits|
+----------+------+------------------+-------------------+-------------------+-------------------+-------+---------+------------+
|1962-01-02|    ED|               0.0| 0.2658275556233194|0.26178762316703796|0.26178762316703796|25600.0|      0.0|         0.0|
|1962-01-02|    BP|               0.0|0.14143933090345925|0.13952797651290894|0.13952797651290894|77440.0|      0.0|         0.0|
|1962-01-02|   MSI|               0.0| 0.7649229763450202| 0.7452535214492476| 0.7518101930618286|65671.0|      0.0|         0.0|
|1962-01-02|   HON|               0.0| 1.5596422972915405|  1.549127912225958| 1.5561375617980957|40740.0|      0.0|         0.0|
|1962-01-02|    FL|               0.0| 0.9722493028732154| 0.9538055893643659| 0.959075152

In [65]:
C1D1 = nooutliersdf.filter((col("Close")  < 5) & (col("Dividends") == 0))
C1D1.show()

+----------+------+-------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|      Date|Ticker|               Open|                High|                 Low|               Close|  Volume|Dividends|Stock Splits|
+----------+------+-------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|1962-01-02|    ED|                0.0|  0.2658275556233194| 0.26178762316703796| 0.26178762316703796| 25600.0|      0.0|         0.0|
|1962-01-02|   CVX|                0.0| 0.04680890217423439| 0.04606926600933256| 0.04680890217423439|105840.0|      0.0|         0.0|
|1962-01-02|    BP|                0.0| 0.14143933090345925| 0.13952797651290894| 0.13952797651290894| 77440.0|      0.0|         0.0|
|1962-01-02|   MSI|                0.0|  0.7649229763450202|  0.7452535214492476|  0.7518101930618286| 65671.0|      0.0|         0.0|
|1962-01-02|   HON|                0.0|  1.559642297291

In [66]:
C1D2 = nooutliersdf.filter((col("Close")  < 5) & (col("Dividends") > 0))
C1D2.show()

+----------+------+--------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|      Date|Ticker|                Open|                High|                 Low|               Close|  Volume|Dividends|Stock Splits|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|1962-01-16|   CAT| 0.13898438379846487| 0.13898438379846487| 0.13727900221265876| 0.13813169300556183|163200.0|  0.01042|         0.0|
|1962-01-16|    PG|                 0.0| 0.25896285513980793|  0.2559690659631997|  0.2574658691883087|236800.0|  0.00586|         0.0|
|1962-01-29|    FL|                 0.0|  0.8984163061371896|  0.8811646699905397|  0.8811646699905396| 30000.0|  0.05208|         0.0|
|1962-01-30|    ED|                 0.0| 0.25213567951282895|  0.2513197064399719|  0.2513197064399719| 40800.0|  0.09375|         0.0|
|1962-01-31|   CVX|                 0.0| 0.04681

In [67]:
C2V1 = nooutliersdf.filter(((col("Close") >= 5) & (col("Close") <= 25)) & (col("Volume") >=100000))
C2V1.show()

+----------+------+----+------------------+------------------+------------------+--------+---------+------------+
|      Date|Ticker|Open|              High|               Low|             Close|  Volume|Dividends|Stock Splits|
+----------+------+----+------------------+------------------+------------------+--------+---------+------------+
|1965-08-04|   XRX| 0.0|5.0746825399590785|5.0039827855817425|5.0471882820129395|202881.0|      0.0|         0.0|
|1965-08-05|   XRX| 0.0|5.0550435254915405|5.0196932415279045| 5.043260097503662|137986.0|      0.0|         0.0|
|1965-08-06|   XRX| 0.0| 5.145384788513184| 5.039335105540595| 5.145384788513184|165310.0|      0.0|         0.0|
|1965-08-09|   XRX| 0.0| 5.216085575683452| 5.129674121771667| 5.141457557678223|143451.0|      0.0|         0.0|
|1965-08-10|   XRX| 0.0|  5.19251776235102|  5.08254027537341| 5.176806926727295|117493.0|      0.0|         0.0|
|1965-08-11|   XRX| 0.0| 5.239653215683643| 5.208231122273801| 5.235725402832031|110662.

In [68]:
C2V2 = nooutliersdf.filter(((col("Close") >= 5) & (col("Close") <= 25)) & (col("Volume") < 100000))
C2V2.show()

+----------+------+----+------------------+------------------+------------------+-------+---------+------------+
|      Date|Ticker|Open|              High|               Low|             Close| Volume|Dividends|Stock Splits|
+----------+------+----+------------------+------------------+------------------+-------+---------+------------+
|1965-08-23|   XRX| 0.0| 5.168952241460186| 5.098252035423808| 5.141457557678223|79240.0|      0.0|         0.0|
|1965-08-27|   XRX| 0.0| 5.326065009510483|5.2828594716248105| 5.310353755950928|68993.0|      0.0|         0.0|
|1965-09-01|   XRX| 0.0|5.3378518490104465| 5.310357137771633|  5.32606840133667|60113.0|      0.0|         0.0|
|1965-09-17|   XRX| 0.0| 5.805254102958754| 5.726698649050873|  5.78561544418335|81289.0|      0.0|         0.0|
|1965-09-20|   XRX| 0.0| 5.797400111985117| 5.726699860983337| 5.758121967315674|68993.0|      0.0|         0.0|
|1965-09-21|   XRX| 0.0| 5.781684848353557| 5.695273399353027| 5.695273399353027|62845.0|      0

In [69]:
C2D1 = nooutliersdf.filter(((col("Close") >= 5) & (col("Close") <= 25)) & (col("Dividends") == 0))
C2D1.show()

+----------+------+----+------------------+------------------+------------------+--------+---------+------------+
|      Date|Ticker|Open|              High|               Low|             Close|  Volume|Dividends|Stock Splits|
+----------+------+----+------------------+------------------+------------------+--------+---------+------------+
|1965-08-04|   XRX| 0.0|5.0746825399590785|5.0039827855817425|5.0471882820129395|202881.0|      0.0|         0.0|
|1965-08-05|   XRX| 0.0|5.0550435254915405|5.0196932415279045| 5.043260097503662|137986.0|      0.0|         0.0|
|1965-08-06|   XRX| 0.0| 5.145384788513184| 5.039335105540595| 5.145384788513184|165310.0|      0.0|         0.0|
|1965-08-09|   XRX| 0.0| 5.216085575683452| 5.129674121771667| 5.141457557678223|143451.0|      0.0|         0.0|
|1965-08-10|   XRX| 0.0|  5.19251776235102|  5.08254027537341| 5.176806926727295|117493.0|      0.0|         0.0|
|1965-08-11|   XRX| 0.0| 5.239653215683643| 5.208231122273801| 5.235725402832031|110662.

In [70]:
C2D2 = nooutliersdf.filter(((col("Close") >= 5) & (col("Close") <= 25)) & (col("Dividends") > 0))
C2D2.show()

+----------+------+------------------+------------------+------------------+------------------+--------+---------+------------+
|      Date|Ticker|              Open|              High|               Low|             Close|  Volume|Dividends|Stock Splits|
+----------+------+------------------+------------------+------------------+------------------+--------+---------+------------+
|1973-04-09|   GCO|               0.0|              11.5|            11.125|              11.5|  5600.0|     0.17|         0.0|
|1973-04-13|   CMS|               0.0|10.100946426391602|10.010759404727391|10.100946426391602|  5800.0|      0.5|         0.0|
|1977-01-24|    WY| 5.730135724753417| 5.730135724753417| 5.560604572296143| 5.560604572296143|181350.0|  0.13333|         0.0|
|1977-05-02|    WY| 5.231765864142896| 5.231765864142896| 5.163599790311327| 5.214724540710449|114900.0|  0.13333|         0.0|
|1977-08-29|   XRX|  5.19245304941591| 5.216547765892535| 5.168358332939284| 5.204500198364258|155747.0|

In [71]:
C3V1 = nooutliersdf.filter((col("Close") > 25) & (col("Volume") >=100000))
C3V1.show()

+----------+------+----+------------------+------------------+------------------+--------+---------+------------+
|      Date|Ticker|Open|              High|               Low|             Close|  Volume|Dividends|Stock Splits|
+----------+------+----+------------------+------------------+------------------+--------+---------+------------+
|1980-07-14|   THC| 0.0| 32.47426024440727|30.981189611068675| 32.00767517089844|173906.0|      0.0|         0.0|
|1980-07-15|   THC| 0.0| 32.75419409497783| 31.07449340820313|31.074493408203125|111844.0|      0.0|         0.0|
|1980-09-05|    HL| 0.0| 25.58759939452056|23.983600699565773| 25.05293083190918|246600.0|      0.0|         0.0|
|1980-09-08|    HL| 0.0| 27.49711799621582| 26.58054739634196| 27.49711799621582|241650.0|      0.0|         0.0|
|1980-09-09|    HL| 0.0|27.038846613660578|26.045895784853993|26.886083602905273|150150.0|      0.0|         0.0|
|1980-09-10|    HL| 0.0|28.642831246058147|27.191595628404322| 27.49711799621582|136350.

In [72]:
C3V2 = nooutliersdf.filter((col("Close") > 25) & (col("Volume") < 100000))
C3V2.show()

+----------+------+------------------+------------------+------------------+------------------+------+---------+------------+
|      Date|Ticker|              Open|              High|               Low|             Close|Volume|Dividends|Stock Splits|
+----------+------+------------------+------------------+------------------+------------------+------+---------+------------+
|1973-01-11|  TELL|31.753454208374023|31.753454208374023|31.753454208374023|31.753454208374023|   0.0|      0.0|         0.0|
|1973-01-17|  TELL|28.253089904785156|28.253089904785156|28.253089904785156|28.253089904785156|   0.0|      0.0|         0.0|
|1973-01-19|  TELL| 29.00290870666504| 29.00290870666504| 29.00290870666504| 29.00290870666504|   0.0|      0.0|         0.0|
|1973-01-31|  TELL|26.253089904785156|26.253089904785156|26.253089904785156|26.253089904785156|   0.0|      0.0|         0.0|
|1973-02-01|  TELL|26.253089904785156|26.253089904785156|26.253089904785156|26.253089904785156|   0.0|      0.0|      

In [73]:
C3D1 = nooutliersdf.filter((col("Close") > 25) & (col("Dividends") == 0))
C3D1.show()

+----------+------+------------------+------------------+------------------+------------------+------+---------+------------+
|      Date|Ticker|              Open|              High|               Low|             Close|Volume|Dividends|Stock Splits|
+----------+------+------------------+------------------+------------------+------------------+------+---------+------------+
|1973-01-11|  TELL|31.753454208374023|31.753454208374023|31.753454208374023|31.753454208374023|   0.0|      0.0|         0.0|
|1973-01-17|  TELL|28.253089904785156|28.253089904785156|28.253089904785156|28.253089904785156|   0.0|      0.0|         0.0|
|1973-01-19|  TELL| 29.00290870666504| 29.00290870666504| 29.00290870666504| 29.00290870666504|   0.0|      0.0|         0.0|
|1973-01-31|  TELL|26.253089904785156|26.253089904785156|26.253089904785156|26.253089904785156|   0.0|      0.0|         0.0|
|1973-02-01|  TELL|26.253089904785156|26.253089904785156|26.253089904785156|26.253089904785156|   0.0|      0.0|      

In [74]:
C3D2 = nooutliersdf.filter((col("Close") > 25) & (col("Dividends") > 0))
C3D2.show()

+----------+------+------------------+------------------+------------------+------------------+--------+---------+------------+
|      Date|Ticker|              Open|              High|               Low|             Close|  Volume|Dividends|Stock Splits|
+----------+------+------------------+------------------+------------------+------------------+--------+---------+------------+
|1983-05-19|   THC|28.186155954355833| 28.30661053126891|27.463434006808246|27.945249557495117|200925.0|  0.13333|         0.0|
|1983-08-19|   THC|25.632895900505538|25.753806713649205| 25.14925818274642|25.391077041625977|147675.0|  0.13333|         0.0|
|1984-11-27|   LEE|               0.0| 34.39426040649414| 34.05540069805578| 34.39426040649414|   480.0|      0.2|         0.0|
|1984-12-03|   AIG|32.787512855327456|32.787512855327456|31.643763985364505|31.897930145263672|276341.0|  0.09271|         0.0|
|1985-01-18|   GHC|               0.0|30.276840209960938|  30.0032528685539|30.276840209960938| 14895.0|

In [75]:
V1D1 = nooutliersdf.filter((col("Volume") >=100000) & (col("Dividends") == 0))
V1D1.show()

+----------+------+--------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|      Date|Ticker|                Open|                High|                 Low|               Close|  Volume|Dividends|Stock Splits|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|1962-01-02|   CVX|                 0.0| 0.04680890217423439| 0.04606926600933256| 0.04680890217423439|105840.0|      0.0|         0.0|
|1962-01-02|   MMM|                 0.0|   0.541890852671279|  0.5259528842775644|  0.5299372673034668|254509.0|      0.0|         0.0|
|1962-01-02|   DIS| 0.05835945904254913|  0.0603180077410653| 0.05835945904254913| 0.05835945904254913|841958.0|      0.0|         0.0|
|1962-01-02|    MO|                 0.0| 0.00333833846261782|0.003293024608865...|0.003293024608865...|345600.0|      0.0|         0.0|
|1962-01-02|    PG|                 0.0|  0.2749

In [76]:
V1D2 = nooutliersdf.filter((col("Volume") >=100000) & (col("Dividends") > 0))
V1D2.show()

+----------+------+--------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|      Date|Ticker|                Open|                High|                 Low|               Close|  Volume|Dividends|Stock Splits|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------+---------+------------+
|1962-01-16|   CAT| 0.13898438379846487| 0.13898438379846487| 0.13727900221265876| 0.13813169300556183|163200.0|  0.01042|         0.0|
|1962-01-16|    PG|                 0.0| 0.25896285513980793|  0.2559690659631997|  0.2574658691883087|236800.0|  0.00586|         0.0|
|1962-01-31|   CVX|                 0.0| 0.04681572589458939| 0.04574930228332603|0.046175871044397354|226800.0|  0.02976|         0.0|
|1962-02-05|    BA| 0.20855923195732787| 0.20855923195732787| 0.20478437911898212| 0.20525632798671722|893025.0|  0.00823|         0.0|
|1962-02-06|   IBM|  1.4997241316491243|  1.5024

In [77]:
V2D1 = nooutliersdf.filter((col("Volume") < 100000) & (col("Dividends") == 0))
V2D1.show()

+----------+------+------------------+-------------------+-------------------+-------------------+-------+---------+------------+
|      Date|Ticker|              Open|               High|                Low|              Close| Volume|Dividends|Stock Splits|
+----------+------+------------------+-------------------+-------------------+-------------------+-------+---------+------------+
|1962-01-02|    ED|               0.0| 0.2658275556233194|0.26178762316703796|0.26178762316703796|25600.0|      0.0|         0.0|
|1962-01-02|    BP|               0.0|0.14143933090345925|0.13952797651290894|0.13952797651290894|77440.0|      0.0|         0.0|
|1962-01-02|   MSI|               0.0| 0.7649229763450202| 0.7452535214492476| 0.7518101930618286|65671.0|      0.0|         0.0|
|1962-01-02|   HON|               0.0| 1.5596422972915405|  1.549127912225958| 1.5561375617980957|40740.0|      0.0|         0.0|
|1962-01-02|    FL|               0.0| 0.9722493028732154| 0.9538055893643659| 0.959075152

In [78]:
V2D2 = df.filter((col("Volume") < 100000) & (col("Dividends") > 0))
V2D2.show()

+----------+------+------------------+-------------------+-------------------+-------------------+-------+---------+------------+
|      Date|Ticker|              Open|               High|                Low|              Close| Volume|Dividends|Stock Splits|
+----------+------+------------------+-------------------+-------------------+-------------------+-------+---------+------------+
|1962-01-29|    FL|               0.0| 0.8984163061371896| 0.8811646699905397| 0.8811646699905396|30000.0|  0.05208|         0.0|
|1962-01-30|    ED|               0.0|0.25213567951282895| 0.2513197064399719| 0.2513197064399719|40800.0|  0.09375|         0.0|
|1962-02-06|    AA|1.4013793992556496| 1.4013793992556496| 1.3806400771288425|  1.392491102218628|40949.0|  0.03004|         0.0|
|1962-02-09|    GT|               0.0| 1.9405069106664414| 1.9131758274176183| 1.9186420440673828|26400.0|  0.05625|         0.0|
|1962-02-20|    IP|               0.0| 0.9038413166999817| 0.8765440502671659| 0.903841316

# Particiones de M

Aquí, al ya tener limpios los datos, sólo nos queda unir en un solo dataframe la muestra de cada combinación y proseguir a partir esa muestra en submuestras diferentes (Mi) y que eso nos permita de igual forma tener conjuntos de prueba y entrenamiento separados (Tri,Tsi) y que en la suma de todos ellos nos de la muestra total (M) de la población total de datos.

In [79]:
from pyspark.sql.functions import lit

In [80]:
# Añadimos una columna para identificar la combinación de cada DataFrame
V1D1 = V1D1.withColumn("combinacion", lit("V1D1"))
V1D2 = V1D2.withColumn("combinacion", lit("V1D2"))
V2D1 = V2D1.withColumn("combinacion", lit("V2D1"))
V2D2 = V2D2.withColumn("combinacion", lit("V2D2"))
C1V1 = C1V1.withColumn("combinacion", lit("C1V1"))
C1V2 = C1V2.withColumn("combinacion", lit("C1V2"))
C1D1 = C1D1.withColumn("combinacion", lit("C1D1"))
C1D2 = C1D2.withColumn("combinacion", lit("C1D2"))
C2V1 = C2V1.withColumn("combinacion", lit("C2V1"))
C2V2 = C2V2.withColumn("combinacion", lit("C2V2"))
C2D1 = C2D1.withColumn("combinacion", lit("C2D1"))
C2D2 = C2D2.withColumn("combinacion", lit("C2D2"))
C3V1 = C3V1.withColumn("combinacion", lit("C3V1"))
C3V2 = C3V2.withColumn("combinacion", lit("C3V2"))
C3D1 = C3D1.withColumn("combinacion", lit("C3D1"))
C3D2 = C3D2.withColumn("combinacion", lit("C3D2"))

# Creamos un diccionario con las probabilidades conjuntas que nos permitirá estratificar posteriormente
probabilidades = {
    "V1D1": 0.326397,
    "V1D2": 0.002303,
    "V2D1": 0.666303,
    "V2D2": 0.004697,
    "C1V1": 0.125907,
    "C1V2": 0.256493,
    "C1D1": 0.380019,
    "C1D2": 0.002681,
    "C2V1": 0.150682,
    "C2V2": 0.307318,
    "C2D1": 0.454194,
    "C2D2": 0.003206,
    "C3V1": 0.052311,
    "C3V2": 0.106689,
    "C3D1": 0.157887,
    "C3D2": 0.001113
}

A diferencia d ela actividad pasada, en esta se incrementaron el número total de muestreos para que al generar las submuestras (Mi) tuviéramos datos suficientes de cada tipo de combinación y evitáramos sesgos relevantes al no contener ciertas categorías.

In [81]:
# Asignamos arbitrariamente el total de muestras de cada tipo de combinación (de acuerdo a los resultados, podemos cambiar el volumen)
total_muestras = 100

# Calcularmos muestras por combinación de acuerdo a su probabilidad y al número arbitrario
muestras_por_grupo = {k: int(prob * total_muestras) for k, prob in probabilidades.items()}

# Muestreamos desde cada dataframe
df_muestra = (
    V1D1.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["V1D1"])
    .union(V1D2.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["V1D2"]))
    .union(V2D1.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["V2D1"]))
    .union(V2D2.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["V2D2"]))
    .union(C1V1.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C1V1"]))
    .union(C1V2.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C1V2"]))
    .union(C1D1.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C1D1"]))
    .union(C1D2.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C1D2"]))
    .union(C2V1.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C2V1"]))
    .union(C2V2.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C2V2"]))
    .union(C2D1.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C2D1"]))
    .union(C2D2.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C2D2"]))
    .union(C3V1.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C3V1"]))
    .union(C3V2.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C3V2"]))
    .union(C3D1.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C3D1"]))
    .union(C3D2.sample(withReplacement=True, fraction=1.0).limit(muestras_por_grupo["C3D2"]))
)

# Elaboramos un nuevo dataframe ya muestreado y lo mostramos
df_muestra.show()
print("Total de registros muestreados: ", df_muestra.count())

+----------+------+-------------------+--------------------+--------------------+--------------------+--------+---------+------------+-----------+
|      Date|Ticker|               Open|                High|                 Low|               Close|  Volume|Dividends|Stock Splits|combinacion|
+----------+------+-------------------+--------------------+--------------------+--------------------+--------+---------+------------+-----------+
|1962-01-02|   MMM|                0.0|   0.541890852671279|  0.5259528842775644|  0.5299372673034668|254509.0|      0.0|         0.0|       V1D1|
|1962-01-02|    MO|                0.0| 0.00333833846261782|0.003293024608865...|0.003293024608865...|345600.0|      0.0|         0.0|       V1D1|
|1962-01-02|    GE| 2.6095722714128895|  2.6530648102552266|  2.5834764822291847|  2.6008732318878174|432682.0|      0.0|         0.0|       V1D1|
|1962-01-02|    BA|0.19031055781074385| 0.19031055781074385| 0.18703724443912506| 0.18703724443912506|352350.0|      0

In [82]:
# Verificamos columnas para saber cuáles ocuparemos en los modelos
for col_name, dtype in df_muestra.dtypes:
    print(f"{col_name}: {dtype}")

Date: date
Ticker: string
Open: double
High: double
Low: double
Close: double
Volume: double
Dividends: double
Stock Splits: double
combinacion: string


In [83]:
# Asignamos columnas numéricas para que las otras no influyan en el resultado final
columnas_numericas = [
    "Open", "High", "Low", "Close",
    "Volume", "Dividends", "Stock Splits"
]

In [89]:
from pyspark.sql import functions as F

Aquí uyilizaremos M1, M2 y M3 (Mi's) y generaremos los conjutnos junto con el modelo y métricas para hacer un análisis a profundidad.

In [90]:
Muestra_particiones=df_muestra.orderBy(F.rand(seed=42))
M1=Muestra_particiones.limit(100)
M2=Muestra_particiones.limit(100)
M3=Muestra_particiones.subtract(M1).subtract(M2)

In [84]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, ClusteringEvaluator
from pyspark.ml.clustering import KMeans

In [85]:
from pyspark.ml.classification import RandomForestClassifier

# Construcción Train-Test y Entrenamiento de Modelos de Aprendizaje para cada conjunto de cada submuestra.

# Elección de métricas

Para cada modelo (en este caso de aprendizaje supervisado para la predicción de precios) se utilizan las métricas de RSME y R2 para poder medir realmente la efectividad de los modelos, e incluir análisis detallados de lo que estos indicadores nos quieren decir. Utilizamos estas mátricas ya que nos dan el número exacto de centavos diferenciales en los que el modelo se equivoca en promedio al predecir los precios de close, lo que además de darns un número certero, no ayuda a comprender si el gap es alto o bajo (no es lo mismo variar en un peso en acciones que valen 5, que errar en 1 peso en acciones que valen 1000)

In [92]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Definimos las variables
target = "Close"
features = ["Open", "High", "Low", "Volume", "Dividends", "Stock Splits"]

# VectorAssembler
ensamblador = VectorAssembler(inputCols=features, outputCol="features")

# Generamos los conjuntos de entrenamiento y prueba de manera arbitraria
df_entrenaM1, df_pruebaM1 = M1.randomSplit([0.8, 0.2], seed=42)

# Modelo de regresión
regresor = DecisionTreeRegressor(labelCol=target, featuresCol="features")

# Pipeline
pipeline = Pipeline(stages=[ensamblador, regresor])
modelo = pipeline.fit(df_entrenaM1)
predicciones = modelo.transform(df_pruebaM1)

# Evaluamos el modelo con RMSE y R2
evaluator_rmse = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predicciones)
r2 = evaluator_r2.evaluate(predicciones)

print("📊 Métricas del Modelo de Regresión:")
print(f"RMSE: {rmse:.4f}")
print(f"R2  : {r2:.4f}")

📊 Métricas del Modelo de Regresión:
RMSE: 0.0990
R2  : 0.9972


In [93]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Definimos las variables
target = "Close"
features = ["Open", "High", "Low", "Volume", "Dividends", "Stock Splits"]

# VectorAssembler
ensamblador = VectorAssembler(inputCols=features, outputCol="features")

# Generamos los conjuntos de entrenamiento y prueba de manera arbitraria
df_entrenaM2, df_pruebaM2 = M2.randomSplit([0.8, 0.2], seed=42)

# Modelo de regresión
regresor = DecisionTreeRegressor(labelCol=target, featuresCol="features")

# Pipeline
pipeline = Pipeline(stages=[ensamblador, regresor])
modelo = pipeline.fit(df_entrenaM2)
predicciones = modelo.transform(df_pruebaM2)

# Evaluamos el modelo con RMSE y R2
evaluator_rmse = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predicciones)
r2 = evaluator_r2.evaluate(predicciones)

print("📊 Métricas del Modelo de Regresión:")
print(f"RMSE: {rmse:.4f}")
print(f"R2  : {r2:.4f}")

📊 Métricas del Modelo de Regresión:
RMSE: 0.0990
R2  : 0.9972


In [95]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Definimos las variables
target = "Close"
features = ["Open", "High", "Low", "Volume", "Dividends", "Stock Splits"]

# VectorAssembler
ensamblador = VectorAssembler(inputCols=features, outputCol="features")

# Generamos los conjuntos de entrenamiento y prueba de manera arbitraria
df_entrenaM3, df_pruebaM3 = M3.randomSplit([0.8, 0.2], seed=42)

# Modelo de regresión
regresor = DecisionTreeRegressor(labelCol=target, featuresCol="features")

# Pipeline
pipeline = Pipeline(stages=[ensamblador, regresor])
modelo = pipeline.fit(df_entrenaM3)
predicciones = modelo.transform(df_pruebaM3)

# Evaluamos el modelo con RMSE y R2
evaluator_rmse = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predicciones)
r2 = evaluator_r2.evaluate(predicciones)

print("📊 Métricas del Modelo de Regresión:")
print(f"RMSE: {rmse:.4f}")
print(f"R2  : {r2:.4f}")

📊 Métricas del Modelo de Regresión:
RMSE: 0.6649
R2  : 0.9897


In [86]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Definimos las variables
target = "Close"
features = ["Open", "High", "Low", "Volume", "Dividends", "Stock Splits"]

# VectorAssembler
ensamblador = VectorAssembler(inputCols=features, outputCol="features")

# Generamos los conjuntos de entrenamiento y prueba de manera arbitraria
df_entrena, df_prueba = df_muestra.randomSplit([0.8, 0.2], seed=42)

# Modelo de regresión
regresor = DecisionTreeRegressor(labelCol=target, featuresCol="features")

# Pipeline
pipeline = Pipeline(stages=[ensamblador, regresor])
modelo = pipeline.fit(df_entrena)
predicciones = modelo.transform(df_prueba)

# Evaluamos el modelo con RMSE y R2
evaluator_rmse = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predicciones)
r2 = evaluator_r2.evaluate(predicciones)

print("📊 Métricas del Modelo de Regresión:")
print(f"RMSE: {rmse:.4f}")
print(f"R2  : {r2:.4f}")

📊 Métricas del Modelo de Regresión:
RMSE: 2.4634
R2  : 0.9627


La métricas salen correctas, incluso evaluando diferentes escenarios o muestreos se obtiene una prueba similar y esto se puede deber a que las correlaciones entre los precios es alta. En resumen, el RMSE significa que, en promedio, el modelo se equivoca por 33.49 centavos, lo que, si bien pudiera ser relevante en acciones que no presentan tanta variabilidad, en el conjunto, no representa mayor problema (hay unas que varían por miles de pesos, incluso).

Podemos observar que si bien el rendimiento es similar, el utilizar elementos aún más pequeños nos llevan a errores en centavos mucho menores, y tiene sentido, de igual manera, que en la muestra general el RSME salga más alto, debido a que ya está tomando en cuenta más datos y un mayor aprendizaje.

# Conclusiones

Esta actividad fue sumamente relevante y destacó de la anterior debido a que pudimos utilizar diferentes conjuntos de prueba y entrenamiento y evaluar realmente la calidad de los resultados, ya que, por ejemplo en mi caso, la efectividad salía bastante grande, y necesitaba checar si en las demás muestras mostraba un desempeño similar.

Se ajustaron diferentes híperparámetros, incluso ajustando los mejores con un Gridsearch que fueron los que al final dejé en la actividad.

Parte del reto fue encontrar la mezcla perfecta entre el número de registros a muestrear y la representatividad de las probabilidades en cada submuestra, lo que si bien no llevó mucho tiempo, si implicó iterar varias veces sobre el mismo particionamiento.