# Critical Line Algorithm

In [None]:
import numpy as np
import pandas as pd

from typing import List, Optional, Tuple

In [None]:
# Clase 
class CriticalLineAlgorithm:
    """
    Implementación del Critical Line Algorithm (CLA) para obtener
    la frontera eficiente de Markowitz bajo restricciones de cotas.

    Atributos principales:
        mean   : rendimientos esperados (n x 1)
        covar  : matriz de covarianzas (n x n)
        lB     : límites inferiores de pesos (n,)
        uB     : límites superiores de pesos (n,)
        solutions : lista de soluciones intermedias (turning points)
    """

    def __init__(self,mean:np.ndarray, covar:np.ndarray, lB:np.ndarray, uB:np.ndarray):
        
        self.mean = np.atleast_2d(np.asarray(mean, dtype=float))
        self.covar = np.atleast_2d(np.asarray(covar, dtype=float))
        self.lB = np.asarray(lB, dtype=float).flatten()
        self.uB = np.asarray(uB, dtype=float).flatten()

        # Validaciones básicas
        n = self.mean.shape[0]
        assert self.covar.shape == (n, n), "La covarianza debe ser cuadrada y del mismo tamaño que mean"
        assert len(self.lB) == n and len(self.uB) == n, "Bounds deben tener la misma longitud que mean"

        # Historial de resultados (como listas tipadas)
        self.weights: list[np.ndarray] = []   # portafolios
        self.lambdas: list[float | None] = [] # lambdas
        self.gammas: list[float | None] = [] # gammas
        self.free_weights: list[list[int]] = []    # índices de pesos libres


    def solve(self):
        """
        Ejecuta el algoritmo CLA y guarda las soluciones intermedias
        (turning points) en self.solutions.
        """
        # Compute the turning points,free sets and weights
        f,weights=self._initialize_algorithm()
        self.weights.append(weights.copy())
        self.lambdas.append(None)
        self.gammas.append(None)
        self.free_weights.append(list(f))
        while True:
            #1) case a): Bound one free weight
            l_in=None
            if len(f)>1:
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                j=0
                for i in f:
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
                    if l_in is None or l>l_in:l_in,i_in,bi_in=l,i,bi
                    j+=1
            #2) case b): Free one bounded weight
            l_out=None
            if len(f)<self.mean.shape[0]:
                b=self.getB(f)
                for i in b:
                    covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
                    covarF_inv=np.linalg.inv(covarF)
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
                        self.weights[-1][i])
                    if (self.lambdas[-1]==None or l<self.lambdas[-1]) and (l_out is None or l>l_out):
                        l_out,i_out=l,i
            #3) decide lambda
            if (l_in==None or l_in<0) and (l_out==None or l_out<0):break
            if l_in is not None and (l_out is None or l_in > l_out):
                self.lambdas.append(l_in)
                f.remove(i_in)
                weights[i_in]=bi_in # set value at the correct boundary
            else:
                self.lambdas.append(l_out)
                f.append(i_out)
            #4) compute solution vector
            covarF,covarFB,meanF,wB=self.getMatrices(f)
            covarF_inv=np.linalg.inv(covarF)
            wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
            for i in range(len(f)):weights[f[i]]=wF[i]
            self.weights.append(np.copy(weights)) # store solution
            self.gammas.append(g)
            self.free_weights.append(f[:])
            if len(f)==self.mean.shape[0]:
                #5) minimum variance solution
                wF,g=self.computeW(covarF_inv,covarFB,np.zeros(meanF.shape),wB)
                for i in range(len(f)):weights[f[i]]=wF[i]
                self.weights.append(np.copy(weights)) # store solution
                self.gammas.append(g)
                self.free_weights.append(f[:])


    def _initialize_algorithm(self):
        """Inicializa el algoritmo con el primer conjunto libre."""
        n = self.mean.shape[0]
        structured = np.zeros(n, dtype=[("id", int), ("mu", float)])
        mu_list = [self.mean[i, 0] for i in range(n)]
        structured[:] = list(zip(range(n), mu_list))

        # Ordenar por mu
        sorted_assets = np.sort(structured, order="mu")

        # Seleccionar primer peso libre
        i, weights = sorted_assets.shape[0], self.lB.copy()
        while weights.sum() < 1:
            i -= 1
            weights[sorted_assets[i][0]] = self.uB[sorted_assets[i][0]]
        weights[sorted_assets[i][0]] += 1 - weights.sum()
        free_sets = [sorted_assets[i][0]]

        return free_sets, weights


    def computeBi(self,c,bi):
        if c>0:
            bi=bi[1]
        if c<0:
            bi=bi[0]
        return bi


    def computeW(self,covarF_inv,covarFB,meanF,wB):
        #1) compute gamma
        onesF=np.ones(meanF.shape)
        g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        if np.all(wB)==None:
            g,w1=float(-self.lambdas[-1]*g1/g2+1/g2),0
        else:
            onesB=np.ones(wB.shape)
            g3=np.dot(onesB.T,wB)
            g4=np.dot(covarF_inv,covarFB)
            w1=np.dot(g4,wB)
            g4=np.dot(onesF.T,w1)
            g=float(-self.lambdas[-1]*g1/g2+(1-g3+g4)/g2)
        #2) compute weights
        w2=np.dot(covarF_inv,onesF)
        w3=np.dot(covarF_inv,meanF)
        return -w1+g*w2+self.lambdas[-1]*w3,g


    def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
        #1) C
        onesF=np.ones(meanF.shape)
        c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        c2=np.dot(covarF_inv,meanF)
        c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        c4=np.dot(covarF_inv,onesF)
        c=-c1*c2[i]+c3*c4[i]
        if c==0:return
        #2) bi
        if type(bi)==list:bi=self.computeBi(c,bi)
        #3) Lambda
        if np.all(wB==None):
            # All free assets
            return float((c4[i]-c1*bi)/c),bi
        else:
            onesB=np.ones(wB.shape)
            l1=np.dot(onesB.T,wB)
            l2=np.dot(covarF_inv,covarFB)
            l3=np.dot(l2,wB)
            l2=np.dot(onesF.T,l3)
            return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi


    def getMatrices(self,f):
        # Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
        covarF=self.reduceMatrix(self.covar,f,f)
        meanF=self.reduceMatrix(self.mean,f,[0])
        b=self.getB(f)
        covarFB=self.reduceMatrix(self.covar,f,b)
        wB=self.reduceMatrix(self.weights[-1],b,[0])
        return covarF,covarFB,meanF,wB


    def getB(self,f):
        return self.diffLists(range(self.mean.shape[0]),f)


    def diffLists(self,list1,list2):
        return list(set(list1)-set(list2))


    def reduceMatrix(self,matrix,listX,listY):
        matrix = np.array(matrix)
        # Reduce a matrix to the provided list of rows and columns
        if len(listX)==0 or len(listY)==0:return
        matrix_=matrix[:,listY[0]:listY[0]+1]
        for i in listY[1:]:
            a=matrix[:,i:i+1]
            matrix_=np.append(matrix_,a,1)
        matrix__=matrix_[listX[0]:listX[0]+1,:]
        for i in listX[1:]:
            a=matrix_[i:i+1,:]
            matrix__=np.append(matrix__,a,0)
        return matrix__


    def getMinVar(self):
        # Get the minimum variance solution
        var=[]
        for weights in self.weights:
            a=np.dot(np.dot(weights.T,self.covar),weights)
            var.append(a)
        idx = int(np.argmin(var))

        min_var = float(var[idx]**0.5)
        min_var_weights = self.weights[idx]

        return min_var, min_var_weights


    def getMaxSR(self):
        # Get the max Sharpe ratio portfolio
        #1) Compute the local max SR portfolio between any two neighbor turning points
        w_sr,sr=[],[]
        for i in range(len(self.weights)-1):
            w0=np.copy(self.weights[i])
            w1=np.copy(self.weights[i+1])
            kargs={'minimum':False,'args':(w0,w1)}
            a,b=self.goldenSection(self.evalSR,0,1,**kargs)
            w_sr.append(a*w0+(1-a)*w1)
            sr.append(b)
        return max(sr),w_sr[sr.index(max(sr))]


    def evalSR(self,a,w0,w1):
        # Evaluate SR of the portfolio within the convex combination
        weights=a*w0+(1-a)*w1
        b=np.dot(weights.T,self.mean)[0,0]
        c=np.dot(np.dot(weights.T,self.covar),weights)[0,0]**.5
        return b/c
#---------------------------------------------------------------
    def goldenSection(self,obj,a,b,**kargs):
        # Golden section method. Maximum if kargs['minimum']==False is passed
        from math import log,ceil
        tol,sign,args=1.0e-9,1,None
        if 'minimum' in kargs and kargs['minimum']==False:sign=-1
        if 'args' in kargs:args=kargs['args']
        numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
        r=0.618033989
        c=1.0-r
        # Initialize
        x1=r*a+c*b;x2=c*a+r*b
        f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
        # Loop
        for i in range(numIter):
            if f1>f2:
                a=x1
                x1=x2;f1=f2
                x2=c*a+r*b;f2=sign*obj(x2,*args)
            else:
                b=x2
                x2=x1;f2=f1
                x1=r*a+c*b;f1=sign*obj(x1,*args)
        if f1<f2:return x1,sign*f1
        else:return x2,sign*f2
#---------------------------------------------------------------
    def efFrontier(self,points):
        # Get the efficient frontier
        mu,sigma,weights=[],[],[]
        points = int(100*len(self.weights))
        numero = points//len(self.weights)
        a=np.linspace(0,1,numero)[:-1] # remove the 1, to avoid duplications
        b=range(len(self.weights)-1)
        for i in b:
            w0,w1=self.weights[i],self.weights[i+1]
            if i==b[-1]:a=np.linspace(0,1,numero) # include the 1 in the last iteration
            for j in a:
                weights=w1*j+(1-j)*w0
                weights.append(np.copy(weights))
                mu.append(np.dot(weights.T,self.mean)[0,0])
                sigma.append(np.dot(np.dot(weights.T,self.covar),weights)[0,0]**.5)
        return mu,sigma,weights
#---------------------------------------------------------------
#---------------------------------------------------------------