In [1]:
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

# ============== CÓDIGO BASE ==============

def mult_matrices_local(A, B, n):
    C = [[0]*n for _ in range(n)]
    for i in range(n):
        for j in range(n):
            s = 0
            for k in range(n):
                s += A[i][k] * B[k][j]
            C[i][j] = s
    return C

def simplificar_exponente_local(A, k, n):
    if k == 1:
        return A
    elif k % 2 == 0:
        mitad = simplificar_exponente_local(A, k // 2, n)
        return mult_matrices_local(mitad, mitad, n)
    else:
        return mult_matrices_local(A, simplificar_exponente_local(A, k - 1, n), n)

def generar_matriz_simetrica(n):
    M = np.zeros((n, n))
    for i in range(n):
        for j in range(i, n):
            v = np.random.randn() * (-1)**(i + j)
            M[i, j] = v
            M[j, i] = v
    for i in range(n):
        M[i, i] += 1
    return np.round(M, 2)

def matriz_a_df(spark, M):
    n = len(M)
    data = []
    for i in range(n):
        for j in range(i, n):
            data.append((i, j, float(M[i][j])))
    schema = StructType([
        StructField("row", IntegerType(), False),
        StructField("col", IntegerType(), False),
        StructField("value", DoubleType(), False)
    ])
    return spark.createDataFrame(data, schema)

def mult_matrices_df(dfA, dfB):
    join_df = dfA.alias("A").join(dfB.alias("B"), col("A.col") == col("B.row"))
    df_mult = join_df.withColumn("partial", col("A.value") * col("B.value"))
    df_res = df_mult.groupBy(col("A.row").alias("i"), col("B.col").alias("j")) \
                    .agg(spark_sum("partial").alias("value"))
    df_res = df_res.filter(col("i") <= col("j"))
    df_res = df_res.select(col("i").alias("row"), col("j").alias("col"), col("value"))
    return df_res

def exponenciacion_distribuida(dfA, k, n, spark):
    I = np.eye(n)
    dfI = matriz_a_df(spark, I)
    df_res = dfI
    df_pow = dfA
    bin_k = bin(k)[2:]
    for bit in reversed(bin_k):
        if bit == '1':
            df_res = mult_matrices_df(df_res, df_pow)
        df_pow = mult_matrices_df(df_pow, df_pow)
    return df_res

# ============== TESTS Y COMPARACIÓN ==============

def test_local(A, k):
    n = len(A)
    t0 = time.time()
    R = simplificar_exponente_local(A, k, n)
    t1 = time.time()
    return R, (t1 - t0)

def test_distribuido(M, k, spark):
    n = len(M)
    dfA = matriz_a_df(spark, M)
    t0 = time.time()
    df_res = exponenciacion_distribuida(dfA, k, n, spark)
    t1 = time.time()
    # Reconstruir matriz local
    R = [[0]*n for _ in range(n)]
    for row in df_res.collect():
        i, j, v = row["row"], row["col"], row["value"]
        R[i][j] = v
        R[j][i] = v
    return R, (t1 - t0)

def comparar(n, k, spark):
    A = generar_matriz_simetrica(n)
    A_list = A.tolist()
    
    r_local, t_local = test_local(A_list, k)
    r_dist, t_dist = test_distribuido(A, k, spark)
    
    print(f"Matriz {n}x{n}, potencia={k}")
    print(f"  Tiempo Local: {t_local:.6f} s")
    print(f"  Tiempo Distribuido: {t_dist:.6f} s")
    
    # (Opcional) Comparar resultados
    diff = 0
    for i in range(n):
        for j in range(n):
            diff += abs(r_local[i][j] - r_dist[i][j])
    print(f"  Diferencia acumulada: {diff:.6f}")
    print("-"*40)

if __name__ == "__main__":
    spark = SparkSession.builder \
        .master("local[*]") \
        .appName("ExponenciarMatricesDist") \
        .getOrCreate()

  
    configuraciones = [
        (1, 1), (1, 5), (1, 10),
        (5, 1), (5, 5), (5, 10),
        (10, 1), (10, 5), (10, 10),
        (20, 1), (20, 5), (20, 10),
        (50, 1), (50, 5), (50, 10),
        (100, 1), (100, 5), (100, 10),
        (200,6), (300,8),(400,9)
    ]

    for (n, k) in configuraciones:
        comparar(n, k, spark)

    spark.stop()


ModuleNotFoundError: No module named 'pyspark'