In [None]:
# -*- coding: utf-8 -*-
"""
Created on Tue Oct 17 2023

@author: Meva
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.stats as st
import importlib
import random
import pickle

# import our own files and reload
#import market_data
#importlib.reload(market_data)
#import capm
#importlib.reload(capm)

from google.colab import drive
drive.mount('/content/drive')
import sys
sys.path.append('/content/drive/MyDrive/ArchivosCSV/ArchivosAuxiliares')
import market_data
import capm


'''

Tu gerente de portafolios quiere que "refactorices" el script 06_covar.py

Más precisamente, dado un portafolios o una lista de activos "rics", se te pide reducir todo el código del script 06_covar.py a dos líneas:

pca = pca_model(rics)
pca.compute()

Para lograr este objetivo, necesitas una clase: pca_model
y dos métodos: __init__() y compute()

En __init__() se te pide declarar todos los atributos necesarios, que son:
rics
mtx_var_covar
mtx_correl
eigenvalues
eigenvectors
variance_explained
volatility_min
volatility_max
pca_vector_1
pca_vector_2
pca_eigenvalue_1
pca_eigenvalue_2
pca_variance_explained
min_var_vector
min_var_eigenvalue
min_var_variance_explained

En compute() se te pide evaluar todos los atributos anteriormente declarados, siguiendo las fórmulas usadas en 06_covar.py

Lo que vamos a evaluar es que, por cada atributo:
1. Que tu clase pca_model tenga dicho atributo
2. Que el atributo sea del tipo esperado
3. Si es lista o arreglo, que sea de las dimensiones esperadas
4. Que el o los valores del atributo sean los esperados

Para no tener problemas de redondeo en la evaluación del Punto 4, no redondees tus resultados: usa las fórmulas de 06_covar.py tal y como aparecen.

ADVERTENCIA:
Sólo debes modificar los métodos __init__ y compute().
No debes modificar ningún otro método o parámetro de este archivo.
Si modificas otros métodos o parámetros entonces tu examen quedará automáticamente anulado y tendrás 0.0 (cero) de calificación.

'''


class pca_model:

    def __init__(self, rics):
      self.rics = rics
      self.df = None
      self.mtx = None
      self.mtx_var_covar = None
      self.mtx_correl = None
      self.eigenvalues = None
      self.eigenvectors = None
      self.variance_explained = None
      self.volatility_min = None
      self.volatility_max = None
      self.pca_vector_1 = None
      self.pca_vector_2 = None
      self.pca_eigenvalue_1 = None
      self.pca_eigenvalue_2 = None
      self.pca_variance_explained = None
      self.min_var_vector = None
      self.min_var_eigenvalue = None
      self.min_var_variance_explained = None
      pass


    def compute(self):
      self.df = market_data.synchronise_returns(self.rics)
      self.mtx = self.df.drop(columns=['date'])
      self.mtx_var_covar = np.cov(self.mtx, rowvar=False) * 252
      self.mtx_correl = np.corrcoef(self.mtx, rowvar=False)
      self.eigenvalues = np.linalg.eigh(self.mtx_var_covar)[0]
      self.eigenvectors = np.linalg.eigh(self.mtx_var_covar)[1]
      self.variance_explained = self.eigenvalues / np.sum(self.eigenvalues)
      self.volatility_min = np.sqrt(self.eigenvalues[0])
      self.volatility_max = np.sqrt(self.eigenvalues[-1])
      self.pca_vector_1 = self.eigenvectors[:,-1]
      self.pca_vector_2 = self.eigenvectors[:,-2]
      self.pca_eigenvalue_1 = self.eigenvalues[-1]
      self.pca_eigenvalue_2 = self.eigenvalues[-2]
      self.pca_variance_explained = self.variance_explained[-2:].sum()
      self.min_var_vector = self.eigenvectors[:,0]
      self.min_var_eigenvalue = self.eigenvalues[0]
      self.min_var_variance_explained = self.variance_explained[0]
      pass



def pca_model_compare(pca_student, pca_teacher):
    counter_answers = 0
    counter_success = 0
    counter_attribute_max = 4

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'rics'):
        counter_success += 1
        s = pca_student.rics
        t = pca_teacher.rics
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'rics')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'mtx_var_covar'):
        counter_success += 1
        s = pca_student.mtx_var_covar
        t = pca_teacher.mtx_var_covar
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'mtx_var_covar')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'mtx_correl'):
        counter_success += 1
        s = pca_student.mtx_correl
        t = pca_teacher.mtx_correl
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'mtx_correl')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'eigenvalues'):
        counter_success += 1
        s = pca_student.eigenvalues
        t = pca_teacher.eigenvalues
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'eigenvalues')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'eigenvectors'):
        counter_success += 1
        s = pca_student.eigenvectors
        t = pca_teacher.eigenvectors
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'eigenvectors')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'variance_explained'):
        counter_success += 1
        s = pca_student.variance_explained
        t = pca_teacher.variance_explained
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'variance_explained')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'volatility_min'):
        counter_success += 1
        s = pca_student.volatility_min
        t = pca_teacher.volatility_min
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'volatility_min')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'volatility_max'):
        counter_success += 1
        s = pca_student.volatility_max
        t = pca_teacher.volatility_max
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'volatility_max')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'pca_vector_1'):
        counter_success += 1
        s = pca_student.pca_vector_1
        t = pca_teacher.pca_vector_1
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'pca_vector_1')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'pca_vector_2'):
        counter_success += 1
        s = pca_student.pca_vector_2
        t = pca_teacher.pca_vector_2
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'pca_vector_2')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'pca_eigenvalue_1'):
        counter_success += 1
        s = pca_student.pca_eigenvalue_1
        t = pca_teacher.pca_eigenvalue_1
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'pca_eigenvalue_1')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'pca_eigenvalue_2'):
        counter_success += 1
        s = pca_student.pca_eigenvalue_2
        t = pca_teacher.pca_eigenvalue_2
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'pca_eigenvalue_2')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'pca_variance_explained'):
        counter_success += 1
        s = pca_student.pca_variance_explained
        t = pca_teacher.pca_variance_explained
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'pca_variance_explained')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'min_var_vector'):
        counter_success += 1
        s = pca_student.min_var_vector
        t = pca_teacher.min_var_vector
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'min_var_vector')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'min_var_eigenvalue'):
        counter_success += 1
        s = pca_student.min_var_eigenvalue
        t = pca_teacher.min_var_eigenvalue
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'min_var_eigenvalue')

    counter_answers += counter_attribute_max
    if hasattr(pca_student, 'min_var_variance_explained'):
        counter_success += 1
        s = pca_student.min_var_variance_explained
        t = pca_teacher.min_var_variance_explained
        counter = compare_attributes(s, t)
        counter_success += counter
        error_warning(counter, counter_attribute_max, 'min_var_variance_explained')

    note = 10 * counter_success / counter_answers
    return counter_success, counter_answers, note



def compare_attributes(s, t):
    counter_success = 0
    if isinstance(s, np.float32) or isinstance(s, float):
        s = np.float64(s)
    if type(s) == type(t):
        counter_success += 1
        if isinstance(s, list) and len(s) == len(t):
            counter_success += 1
            if s[int(len(s)/2)] in t:
                counter_success += 1
        elif isinstance(s, np.float64):
            counter_success += 1
            if abs(s-t) < 10**-3:
                counter_success += 1
        elif isinstance(s, np.ndarray) and s.shape[0] == t.shape[0]:
            counter_success += 1
            if len(t.shape) == 2:
              s1 = s[int(s.shape[0]/2)][0]
              t1 = t[int(t.shape[0]/2)][0]
            else:
              s1 = s[int(s.shape[0]/2)]
              t1 = t[int(t.shape[0]/2)]
            if abs(s1-t1) < 10**-3:
                counter_success += 1
    return counter_success


def error_warning(counter, counter_attribute_max, attribute):
    if counter < counter_attribute_max - 1:
        print('Warning: I found errors on attribute = ' + attribute)


# evaluation loop
nb_tests = 10
instances = []
with open('/content/drive/MyDrive/ArchivosCSV/ArchivosAuxiliares/examen_02_instances.pickle','rb') as f:
    instances = pickle.load(f)
list_pca_teacher = random.sample(instances, nb_tests)
n = 0
final_note = 0.0
for pca_teacher in list_pca_teacher:
    n += 1
    print('---')
    print('Test ' + str(n))
    rics = pca_teacher.rics
    print('rics = ' + str(rics))
    pca_student = pca_model(rics)
    pca_student.compute()
    counter_success, counter_answers, note = pca_model_compare(pca_student, pca_teacher)
    print('counter_success = ' + str(counter_success)\
          + ' | counter_answers = ' + str(counter_answers)\
          + ' | note = ' + str(note))
    final_note += note
final_note /= nb_tests
print('---')
print('final_note = ' + str(final_note))

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
---
Test 1
rics = ['XLC', 'JPYUSD=X', 'GOOG', 'BLK', 'XLV', 'GBPUSD=X', 'EWW', '^STOXX', 'LLY', 'AAPL', 'EURUSD=X', 'XLK', 'NOKUSD=X', 'BTC-USD', 'ETH-USD', 'USDC-USD']
counter_success = 64 | counter_answers = 64 | note = 10.0
---
Test 2
rics = ['^MXX', 'MS', 'EWW', '^GDAXI', 'XLRE', 'ABBV', 'EURUSD=X', '^FCHI', '^STOXX', 'PFE', 'XLK', 'XLU', 'XLF', 'MTUM', 'DAI-USD']
counter_success = 64 | counter_answers = 64 | note = 10.0
---
Test 3
rics = ['MXNUSD=X', 'JPM', 'GBPUSD=X', 'XLK', 'SEKUSD=X', 'BLK', 'IVW', '^SPX', 'XLI', 'EWW', 'BRK-B', 'USDC-USD', 'XLP', 'JPYUSD=X', 'XLV']
counter_success = 64 | counter_answers = 64 | note = 10.0
---
Test 4
rics = ['^STOXX', 'BTC-USD', 'SEKUSD=X', 'IVW']
counter_success = 64 | counter_answers = 64 | note = 10.0
---
Test 5
rics = ['BAC', 'GOOG', 'MA', 'XLU', 'EURUSD=X', '^FCHI', 'ETH-USD', 'XLB', 'GS', 'MTUM', 'SOL-USD', 'GBP