# Implementación de sPCA con pyspark!

In [27]:
import pyspark
import numpy as np

In [6]:
# inicializar spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("sPCA").getOrCreate()
sc = spark.sparkContext  # así obtienes el SparkContext moderno


25/05/08 09:04:52 WARN Utils: Your hostname, killua resolves to a loopback address: 127.0.1.1; using 10.14.13.175 instead (on interface wlo1)
25/05/08 09:04:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/08 09:04:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


25/05/08 09:05:05 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Algorithm 4 sPCA (Matrix Y , int N, int D, int d)
1: C = normrnd(D, d)
2: ss = normrnd(1, 1)
3: Ym = meanJob(Y) # media de cada columna de Y
4: ss1 = FnormJob(Y, Ym) # forbenius de la matriz de entrada
5: while not STOP_CONDITION do
	6: M = C′ ∗C + ss ∗ I
	7: CM = C ∗ M−1
	8: Xm = Y m ∗CM
	9: {XtX,YtX} = YtXJob(Y,Y m, Xm,CM) # distribuido
	10: XtX+ = ss ∗ M−1
	11: C = YtX/XtX
	12: ss2 = trace(XtX ∗C′ ∗C)
	13: ss3 = ss3Job(Y,Y m, Xm,CM,C) # distribuido
	14: ss = (ss1 + ss2 − 2 ∗ ss3)/N/D
15: end while

In [None]:
# Calcular la media de cada columna
def meanJob(Y):
    suma_vectores = Y.reduce(lambda a, b: [x + y for x, y in zip(a, b)])
    n = Y.count()
    media = [x / n for x in suma_vectores]
    return media

def center_data(Y, Ym):
    return Y.map(lambda fila: [x - m for x, m in zip(fila, Ym)])

In [266]:
import numpy as np

def FJob(Y_rdd, Ym):
    diff_squared = Y_rdd.map(lambda row: sum((xi - mi) ** 2 for xi, mi in zip(row, Ym)))
    fro_squared_sum = diff_squared.reduce(lambda a, b: a + b)
    return np.sqrt(fro_squared_sum)


# calcular norma de forbenius 
def frobenius_optimizado(Y, Ym):
    msum = 0
    sum = 0
    for i in range(len(Ym)):
        msum += (Ym[i] ** 2)
    for i in range(len(Y)):
        for j in range(len(Y[i])):
            sum += (Y[i][j] - Ym[j]) ** 2
            sum -= Ym[j]**2
        sum += msum
    return sum

# Norma de Frobenius optimizada
def FJob_no_sirve(Y, Ym):
    # Calcular msum: la suma de los cuadrados de las medias
    msum = sum([ym**2 for ym in Ym])
    
    # Calcular la suma de las diferencias cuadradas (Y_ij - Ym_j)^2
    
    sum_squared_diffs = Y.map(lambda row: sum([(row[j] - Ym[j])**2 for j in range(len(row))])).sum()

    # Restar la suma de los cuadrados de las medias de cada columna (Ym_j^2)
    sum_squared_diffs -= sum([ym**2 for ym in Ym])  # Se hace una corrección por los Ym^2

    # Añadir msum, que es la suma de Ym_j^2
    sum_squared_diffs += msum
    
    return sum_squared_diffs


In [169]:
from pyspark.accumulators import AccumulatorParam

#Clase acumuladora (usando listas de listas)
class MatrixAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        rows = len(value)
        cols = len(value[0])
        return [[0.0 for _ in range(cols)] for _ in range(rows)]

    def addInPlace(self, val1, val2):
        for i in range(len(val1)):
            for j in range(len(val1[0])):
                val1[i][j] += val2[i][j]
        return val1
    
# Wrapper de la clase acumuladora
class MatrixAccumulatorWrapper:
    def __init__(self, sc, shape):
        self.shape = shape
        self.accumulator = sc.accumulator(
            [[0.0 for _ in range(shape[1])] for _ in range(shape[0])],
            MatrixAccumulatorParam()
        )

    def addRow(self, row_index, row_contribution):
        temp = [[0.0 for _ in range(self.shape[1])] for _ in range(self.shape[0])]
        temp[row_index] = list(row_contribution)
        self.accumulator.add(temp) 
        
    def add(self, matrix):
        self.accumulator.add(matrix)

    def get(self):
        return self.accumulator.value



In [None]:
def YtXSparkJob(Y, Ym, Xm, CM, D, d):
    YtXSum = MatrixAccumulatorWrapper(sc, shape=(D, d))
    XtXSum = MatrixAccumulatorWrapper(sc, shape=(d, d))
    Ym = np.array(Ym)
    Xm = np.array(Xm)
    CM = np.array(CM)

    # Broadcast variables to use inside workers
    Ym_b = sc.broadcast(Ym)
    Xm_b = sc.broadcast(Xm)
    CM_b = sc.broadcast(CM)
    YtXSum_b = YtXSum  # since it's a custom wrapper around an accumulator
    XtXSum_b = XtXSum
    def compute_partial(Yi):
        try:
            Yi = np.array(Yi)

            CM = CM_b.value
            Ym = Ym_b.value
            Xm = Xm_b.value

            Xi = Yi @ CM - Ym @ CM
            Xi_minus_Xm = Xi - Xm

            YtX_i = np.outer(Yi.T, Xi_minus_Xm) - np.outer(Ym.T, Xi_minus_Xm)
            XtX_i = np.outer(Xi.T, Xi_minus_Xm) - np.outer(Xm.T, Xi_minus_Xm)

            non_zero_indices = np.nonzero(Yi)[0]
            for idx in non_zero_indices:
                YtXSum_b.addRow(idx, YtX_i[idx, :])

            XtXSum_b.add(XtX_i)
        except Exception as e:
            import traceback
            print("Error in compute_partial:")
            traceback.print_exc()
            raise e


    Y.foreach(compute_partial)
    YtX = YtXSum.accumulator.value
    XtX = XtXSum.accumulator.value
    return YtX, XtX


In [206]:
def ss3Job(Y, Ym, Xm, CM, C):
    ss3 = sc.accumulator(0)
    Ym = np.array(Ym)
    Xm = np.array(Xm)
    CM = np.array(CM)
    C = np.array(C)

    # Broadcast variables to use inside workers
    Ym_b = sc.broadcast(Ym)
    Xm_b = sc.broadcast(Xm)
    CM_b = sc.broadcast(CM)
    C_b = sc.broadcast(C)

    def calculate_ss3(Yi):
        try: 
            Yi = np.array(Yi)
            CM = CM_b.value
            Ym = Ym_b.value
            Xm = Xm_b.value
            C = C_b.value
        
            Xi = Yi @ CM - Ym @ CM
            Xi = Xi - Xm
            cy = C.T @ Yi.T
            xcy = Xi @ cy
            ss3.add(xcy)
        except Exception as e:
            import traceback
            print("Error in calculate_ss3:")
            traceback.print_exc()
            raise e
    Y.foreach(calculate_ss3)
    return ss3.value
    

In [None]:
# Leer archivo TXT como RDD
rdd = sc.textFile("./input/datos_1.txt")  # carga como RDD de líneas
Y = rdd.map(lambda line: [float(x) for x in line.split(",")])
D = 3  # number of columns
d = 2  # dimension of the projection
N = 17
C = np.random.normal(loc=0.0, scale=1.0, size=(D, d))
ss = np.random.normal(loc=0.0, scale=1.0)
Ym = np.array(meanJob(Y))
ss1 = FJob(Y, Ym)
I = np.eye(d)

In [194]:
# Compute M = Cᵀ * C + ss * I
M = C.T @ C + ss * I

# Compute CM = C * M⁻¹
M_inv = np.linalg.inv(M)
CM = C @ M_inv

# Compute Xm = Ym * CM
Xm = Ym @ CM  # result has shape (d,)

In [179]:
YtX, XtX = YtXSparkJob(Y, Ym, Xm, CM, D, d)

In [188]:
XtX += ss * M_inv
XtX_inv = np.linalg.inv(XtX)
C = YtX @ XtX_inv
ss2 = np.trace(XtX @ C.T @ C)

In [212]:
ss3 = ss3Job(Y, Ym, Xm, CM, C)

In [213]:
ss = (ss1 + ss2 -2 * ss3) / N / D

In [218]:
# Ahora todo varias veces
def has_converged(C_old, C_new, ss_old, ss_new, tol=1e-4):
    delta_C = np.linalg.norm(C_new - C_old, ord='fro')
    delta_ss = abs(ss_new - ss_old)
    return delta_C < tol and delta_ss < tol


In [366]:
import random
random.seed(42)
np.random.seed(42)
# Leer archivo TXT como RDD
rdd = sc.textFile("./input/datos_1.txt")  # carga como RDD de líneas
Y = rdd.map(lambda line: [float(x) for x in line.split(",")])
D = 3  # number of columns
d = 2  # dimension of the projection
N = Y.count()
C = np.random.normal(loc=0.0, scale=1.0, size=(D, d))
ss = abs(np.random.normal(loc=1.0, scale=0.1))
Ym = np.array(meanJob(Y))
ss1 = FJob(Y, Ym)
I = np.eye(d)

max_iters = 1
C_old =  np.inf
ss_old = np.inf
num_iters = 0

In [367]:
C

array([[ 0.49671415, -0.1382643 ],
       [ 0.64768854,  1.52302986],
       [-0.23415337, -0.23413696]])

In [368]:
while not has_converged(C_old, C, ss_old, ss) and num_iters == max_iters:
    # Compute M = Cᵀ * C + ss * I
    M = C.T @ C + ss * I

    # Compute CM = C * M⁻¹
    M_inv = np.linalg.inv(M)
    CM = C @ M_inv

    # Compute Xm = Ym * CM
    Xm = Ym @ CM  # result has shape (d,)
    YtX, XtX = YtXSparkJob(Y, Ym, Xm, CM, D, d)
    XtX += ss * M_inv
    XtX_inv = np.linalg.inv(XtX)
    C_old = C
    C = YtX @ XtX_inv
    ss2 = np.trace(XtX @ C.T @ C)
    ss3 = ss3Job(Y, Ym, Xm, CM, C)
    ss_old = ss
    ss = (ss1 + ss2 -2 * ss3) / N / D
    num_iters +=1

In [369]:
# Compute M = Cᵀ * C + ss * I
print("M:\n ", M)
print("Xm \n", Xm)
print("XtX\n", XtX)
print("YtX\n", YtX)
print("C\n", C)
print("ss2\n", ss2)
print("ss3\n", ss3)
print("ss\n", ss)

M:
  [[1.87897448 0.9725951 ]
 [0.9725951  3.55147836]]
Xm 
 [ 0.35867453 -1.30560805]
XtX
 [[  5.57201365 -15.32459479]
 [-15.32459479  52.89378186]]
YtX
 [[np.float64(6.198586744913513), np.float64(-18.88353346895369)], [np.float64(-3.0143604890505844), np.float64(9.305774505185928)], [np.float64(0.0), np.float64(0.0)]]
C
 [[ 0.49671415 -0.1382643 ]
 [ 0.64768854  1.52302986]
 [-0.23415337 -0.23413696]]
ss2
 8.93583614178467
ss3
 -1.0192034984372622
ss
 1.1579212815507391


In [None]:
print("M: \n", M)
print("X \n",X_p)
print("XtX\n", XtX_p)
print("YtX\n", YtX_p)
print("C\n", C_p)
print("ss2\n", ss2_p)
print("ss3\n", ss3_p)
print("ss\n", ss_p)

In [446]:
random.seed(42)
np.random.seed(42)
# Leer archivo TXT como RDD
rdd = sc.textFile("./input/datos_1.txt")  # carga como RDD de líneas
Y = rdd.map(lambda line: [float(x) for x in line.split(",")])
D = 3  # number of columns
d = 2  # dimension of the projection
N = Y.count()
C_p = np.random.normal(loc=0.0, scale=1.0, size=(D, d))
ss_p = abs(np.random.normal(loc=1.0, scale=0.1))
I = np.eye(d)
C_old =  np.inf
ss_old = np.inf

In [453]:
random.seed(42)
np.random.seed(42)

pathcsv = "./input/datos_1.txt"
Y_p = np.genfromtxt(pathcsv, delimiter=",")  # convierte a numpy array

D = 3  # number of columns
d = 2  # dimension of the projection
N = 17
C_p = np.random.normal(loc=0.0, scale=1.0, size=(D, d))
ss_p = abs(np.random.normal(loc=1.0, scale=0.1))
I = np.eye(d)
C_old =  np.inf
ss_old = np.inf
Ym = np.sum(Y_p, axis=0) / N
Yc = Y_p - Ym
max_iters = 1000
num_iters = 0
while num_iters < max_iters:
    M = C_p.T @ C_p + ss_p * I
    M_inv = np.linalg.inv(M)
    X_p = Yc @ C_p @ M_inv
    XtX_p = X_p.T @ X_p + ss_p * M_inv
    YtX_p = Yc.T @ X_p
    C_old = C_p
    C_p = YtX_p @ np.linalg.inv(XtX_p)
    ss2_p = np.trace(XtX_p @ C_p.T @ C_p)
    ss3_p = np.trace(X_p @ C_p.T @ Yc.T)
    ss_old = ss_p
    ss_p = (np.linalg.norm(Yc, ord='fro')**2 + ss2_p - 2 * ss3_p) / (N * D)
    num_iters += 1

In [454]:
C_p

array([[ 2.45931816, -0.86903657],
       [ 0.84413307,  2.13054494],
       [ 0.        ,  0.        ]])