## Setting
- Desilo key 생성

In [2]:
from desilofhe import Engine

engine = Engine(use_bootstrap=True)

secret_key = engine.create_secret_key()
public_key = engine.create_public_key(secret_key)
relinearization_key = engine.create_relinearization_key(secret_key)
conjugation_key = engine.create_conjugation_key(secret_key)
bootstrap_key = engine.create_bootstrap_key(secret_key)
fixed_rotation_key = engine.create_fixed_rotation_key(secret_key, delta=1)

Import 안될 때 해결법
- __pycache__속 파일 삭제
- 하단 코드에서 내가 원하는 파일의 이름을 Basic_operations 대신 입력 후 실행

In [3]:
import importlib
import Basic_operations
importlib.reload(Basic_operations)

<module 'Basic_operations' from '/home/junhyung/study/Approximation_in_CKKS/desilo/Basic_operations.py'>

Min/max Algorithm
- Min/max between two numbers
- Min/max between several numbers
- Max index는 Comp.에서 진행

In [None]:
# Min/max between two numbers
from Basic_operations import Basic_sqrt

def min(engine, a, b, d, relinearization_key, conjugation_key, bootstrap_key): # 0 <= x < 1
    x = engine.multiply(engine.add(a,b),0.5) # x= (a+b)/2
    y = engine.multiply(engine.subtract(a,b),0.5) # y= (a-b)/2
    y_2 = engine.square(y, relinearization_key)
    z= Basic_sqrt(engine, y_2, d, relinearization_key, conjugation_key, bootstrap_key)
    return engine.subtract(x,z)
def max(engine, a, b, d, relinearization_key, conjugation_key, bootstrap_key): # 0 <= x < 1
    x = engine.multiply(engine.add(a,b),0.5) # x= (a+b)/2
    y = engine.multiply(engine.subtract(a,b),0.5) # y= (a-b)/2
    y_2 = engine.square(y, relinearization_key)
    z= Basic_sqrt(engine, y_2, d, relinearization_key, conjugation_key, bootstrap_key)
    return engine.add(x,z)

def main():
    data = [1,2,7,9,5,6,3,8,4,10]
    scale_down_factor = 10

    val1_scaled_down = [0.5 + (x / scale_down_factor) for x in data]
    enc_orig1 = engine.encrypt(val1_scaled_down, public_key)
    enc_orig2 = engine.rotate(enc_orig1, fixed_rotation_key)

    # min,max 연산 SIMD 병렬 처리 (전체 벡터 한번에)
    min_val_scaled_down = min(engine, enc_orig1, enc_orig2, 23, relinearization_key, conjugation_key, bootstrap_key)
    max_val_scaled_down = max(engine, enc_orig1, enc_orig2, 23, relinearization_key, conjugation_key, bootstrap_key)

    plt_min_val = engine.decrypt(min_val_scaled_down, secret_key)
    plt_max_val = engine.decrypt(max_val_scaled_down, secret_key)

    # 각 요소별 출력
    for i in range(len(data)):
        print(f"\n=== {i}번째 비교: {data[i]}와 {data[i+1]}간의 비교===")
        print(f"Min(data[{i}]) = {plt_min_val[i] * scale_down_factor}")
        print(f"Max(data[{i}]) = {plt_max_val[i] * scale_down_factor}")

    return 0



if __name__ == '__main__':
    main()


Bootstrapping performed


In [None]:
# Min/max between several numbers

from Basic_operations import min, max

def array_max(engine, enc_values, d, relinearization_key, conjugation_key, bootstrap_key): #0 <= x < 1
    n = len(enc_values)
    # 논문은 n이 2의 거듭제곱일 때 분석 편함
    assert (n & (n - 1)) == 0, "n must be power of two"

    current_level = enc_values
    while len(current_level) > 1:
        next_level = []
        # len(current level)/2 로 log n 구현 & 홀수면 마지막 값은 그대로 넘김
        for i in range(0, len(current_level), 2):
            max_val = max(engine, current_level[i], current_level[i+1], d, relinearization_key, conjugation_key, bootstrap_key)
            next_level.append(max_val)
        current_level = next_level
    # 최상위(마지막) 레벨의 첫 요소 ≈ 전체 배열의 최대값 근사치
    return current_level[0]

def array_min(engine, enc_values, d, relinearization_key, conjugation_key, bootstrap_key): #0 <= x < 1
    n = len(enc_values)
    # 논문은 n이 2의 거듭제곱일 때 분석 편함
    assert (n & (n - 1)) == 0, "n must be power of two"

    current_level = enc_values
    while len(current_level) > 1:
        next_level = []
        # len(current level)/2 로 log n 구현 & 홀수면 마지막 값은 그대로 넘김
        for i in range(0, len(current_level), 2):
            max_val = min(engine, current_level[i], current_level[i+1], d, relinearization_key, conjugation_key, bootstrap_key)
            next_level.append(max_val)
        current_level = next_level
    # 최상위(마지막) 레벨의 첫 요소 ≈ 전체 배열의 최대값 근사치
    return current_level[0]


def main():
    
    data = [1,2,7,9,5,6,3,8,4,10]
    scale_down_factor = 10
    # 1. 데이터 스케일링 후 암호화
    val1_scaled_down = [x / scale_down_factor for x in data]
    enc_orig = engine.encrypt(val1_scaled_down, public_key)


    # 2. arraymin, arraymax 연산 적용
    max_val_scaled_down = array_max(engine, enc_orig, 10, relinearization_key, conjugation_key, bootstrap_key)
    min_val_scaled_down = array_min(engine, enc_orig, 10, relinearization_key, conjugation_key, bootstrap_key)
    max_val_restored = max_val_scaled_down * scale_down_factor
    min_val_restored = min_val_scaled_down * scale_down_factor
    
    # 3. 복호화 및 출력
    for i in range(len(data)-1):
        print(f"\n=== {i}번째 비교: {data[i]}와 {data[i+1]}간의 비교===")
        print(f"Min(data[{i}]) = {min_val_restored[i]}")
        print(f"Max(data[{i}]) = {max_val_restored[i]}")

    return 0


if __name__ == '__main__':
    main()


Comp. Algorithm

- Comp. between two numbers
- MaxIndex/MinIndex

In [None]:
# Comp. between two numbers
# Min/max between several numbers와 유사하게 바로 옆 원소와의 2개 수 간 비교 진행
from Basic_operations import Basic_Inv


def comp_max(engine, a, b, d_a, d_b, t, m, relinearization_key): # 1/2 <= x < 3/2
    a_2 = engine.multiply(a,0.5)
    a = engine.multiply(a_2,Basic_Inv(engine, (a+b)/2,d_a, relinearization_key)) # a = a/2 * 2/(a+b) =  a/(a+b)
    b = engine.subtract(1,a) # b = 1 - a
    a_m=a
    b_m=b
    for _ in  range(m-1):
        a_m = engine.multiply(a_m, a, relinearization_key)
        b_m = engine.multiply(b_m, b, relinearization_key)
    for _ in range(t):
        inv = Basic_Inv(engine.add(a_m,b_m),d_b)
        a=engine.multiply(a_m,inv)
        b=engine.subtract(1,a)
    return a

def comp_min(engine, a, b, d_a, d_b, t, m, relinearization_key): # 1/2 <= x < 3/2
    a_2 = engine.multiply(a,0.5)
    a = engine.multiply(a_2,Basic_Inv(engine, (a+b)/2,d_a, relinearization_key)) # a = a/2 * 2/(a+b) =  a/(a+b)
    b = engine.subtract(1,a) # b = 1 - a
    a_m=a
    b_m=b
    for _ in  range(m):
        a_m = engine.square(a, relinearization_key)
        b_m = engine.square(b, relinearization_key)
    for _ in range(t):
        inv = Basic_Inv(engine.add(a_m,b_m),d_b)
        a=engine.multiply(a_m,inv)
        b=engine.subtract(1,a)
    return b

def main():
    data = [1,2,7,9,5,6,3,8,4,10]
    scale_down_factor = 10
    val1_scaled_down = [0.5 + (x / scale_down_factor) for x in data]

    # 1. 데이터 스케일링 후 암호화
    enc_orig1 = engine.encrypt(val1_scaled_down, public_key)
    enc_orig2 = engine.rotate(enc_orig1, fixed_rotation_key)

    # 2. min,max 연산 적용
    min_val_scaled_down = comp_min(engine, enc_orig1, enc_orig2, 23, relinearization_key, conjugation_key, bootstrap_key)
    max_val_scaled_down = comp_max(engine, enc_orig1, enc_orig2, 23, relinearization_key, conjugation_key, bootstrap_key)
    plt_min_val = engine.decrypt(min_val_scaled_down, secret_key)
    plt_max_val = engine.decrypt(max_val_scaled_down, secret_key)
        # 3. 복호화 및 출력
    for i in range(len(data)-1):
        print(f"\n=== {i}번째 비교: {data[i]}와 {data[i+1]}간의 비교===")
        print(f"Min(data[{i}]) = {(plt_min_val)[i] * scale_down_factor}")
        print(f"Max(data[{i}]) = {(plt_max_val)[i] * scale_down_factor}")
    return 0


if __name__ == '__main__':
    main()


In [None]:
# MaxIndex(수정 필요)

from Basic_operations import Basic_Inv


def MaxIdx(enc_value, d_a, d_b, t, m, relinearization_key): # 1/2 <= x < 3/2, d_a = d, d_b = d'
    n = len(enc_value)
    sum_a = 0
    for i in range(n):
        sum_a = engine.add(sum_a, enc_value[i])
    inv_a = engine.multiply(n, Basic_Inv(engine, sum_a, d_a))
    for i in range(n-1):
        b = [(engine.multiply(enc_value[i],inv_a))/n]
    b.append(engine.subtract(1, (engine.multiply(sum_a, inv_a))/n))
    for i in range(t):
        b_m = b
        sum_b = 0
        for i in range(m-1):
            b_m = engine.multiply(b_m, b, relinearization_key)
            sum_b = engine.add(sum_b, b_m)
        inv_b = Basic_Inv(engine, sum_b, d_b)
        sum_b2 = 0
        for _ in range (n-1):
            sum_b2 = engine.add(sum_b2, b_m[i])
            b = engine.multiply(b_m[i],inv_b, relinearization_key)
        b[n-1] = 1 - sum_b2
    return b




def main():
    data = [1,2,7,9,5,10,6,3,8,4] # 최댓값 위치: 6번쨰
    scale_down_factor = 10
    # 논문 권장대로 [1/2, 3/2] 구간으로 스케일링
    data_scaled = [0.5 + x / scale_down_factor for x in data]
    data_scaled_input = [x/len(data_scaled) for x in data_scaled] #함수에 들어가게 하기 위해 n으로 나누는 작업을 main 함수에서 시행

    enc_orig = engine.encrypt(data_scaled_input, public_key)
    maxidx_result = MaxIdx(enc_orig, d_a=11, d_b=3, m=2, t=3, relinearization_key=relinearization_key)
    maxidx_plt = engine.decrypt(maxidx_result,secret_key)
    # b 값이 1에 가까운 인덱스가 최대값 위치
    max_index = maxidx_result.index(max(maxidx_plt))
    # scale up 1: n 곱하기
    print(f"최대값 위치: {max_index+1}, 데이터 원본값: {maxidx_plt[max_index]}")


if __name__ == '__main__':
    main()


Threshold Count
- 특정 값(b)보다 큰(작은)수를 count하는 함수의 CKKS(Desilo) 근사 구현

In [None]:
# Threshold Count

from Basic_operations import Basic_Inv

def comp_max(engine, a, b, d_a, d_b, t, m, relinearization_key): # 1/2 <= x < 3/2
    a_2 = engine.multiply(a,0.5)
    a = engine.multiply(a_2,Basic_Inv(engine, (a+b)/2,d_a, relinearization_key)) # a = a/2 * 2/(a+b) =  a/(a+b)
    b = engine.subtract(1,a) # b = 1 - a
    a_m=a
    b_m=b
    for _ in  range(m-1):
        a_m = engine.multiply(a_m, a, relinearization_key)
        b_m = engine.multiply(b_m, b, relinearization_key)
    for _ in range(t):
        inv = Basic_Inv(engine.add(a_m,b_m),d_b)
        a=engine.multiply(a_m,inv)
        b=engine.subtract(1,a)
    return a

def comp_min(engine, a, b, d_a, d_b, t, m, relinearization_key): # 1/2 <= x < 3/2
    a_2 = engine.multiply(a,0.5)
    a = engine.multiply(a_2,Basic_Inv(engine, (a+b)/2,d_a, relinearization_key)) # a = a/2 * 2/(a+b) =  a/(a+b)
    b = engine.subtract(1,a) # b = 1 - a
    a_m=a
    b_m=b
    for _ in  range(m):
        a_m = engine.square(a, relinearization_key)
        b_m = engine.square(b, relinearization_key)
    for _ in range(t):
        inv = Basic_Inv(engine.add(a_m,b_m),d_b)
        a=engine.multiply(a_m,inv)
        b=engine.subtract(1,a)
    return b

def Threshold(enc_value,b,d_a,d_b,t,m, relinearization_key): #0<= enc_value[i] < 1, 0 <= b <1
    c = comp_max(engine, enc_value, b, d_a, d_b, t, m, relinearization_key) 
    sum= engine.add(c,0)      
    return sum

def main():
    data = [1,2,7,9,5,10,6,3,8,4]
    scale_down_factor = 10
    data_scaled = [x / scale_down_factor for x in data]
    Threshold_scaled = 5/ scale_down_factor
    
    enc_orig = engine.encrypt(data_scaled, public_key)

    
    Threshold_count = Threshold(enc_orig, Threshold_scaled, 5, 5, 6, 2, relinearization_key)
    Threshold_count_plt = engine.decrypt(Threshold_count, secret_key)
    # Threshold보다 큰 수의 개수
    print(f"Threshold인 {Threshold_scaled*scale_down_factor}보다 큰 수의 개수: {Threshold_count_plt}")


if __name__ == '__main__':
    main()

Top-k Max algorithm
- 상위 k개 max값을 추출하는 함수의 CKKS(Desilo) 근사 구현

In [None]:
#Top-k Max algorithm

from Basic_operations import MaxIdx


def Top_k_max(enc_value, d_a, d_b, t, m, k, relinearization_key):
    c = enc_value  # 입력 복사
    results = []
    for _ in range(k):
        b = MaxIdx(c, d_a, d_b, t, m, relinearization_key)
        m = engine.multiply(b,c, relinearization_key)
        c = engine.multiply(engine.subtract(1, b),c)
    return results  # k개 리스트(각 top-1, top-2, ... 위치 값)


def main():
    data = [1,2,7,9,5,10,6,3,8,4]
    scale_down_factor = 10
    data_scaled = [x / (scale_down_factor*len(data)) for x in data]
    k_value = 4 # Top-k에서 k 값 설정
    
    enc_orig = engine.encrypt(data_scaled, public_key)
    print(f"Scale down 후 암호화 -> {enc_orig}")


    Top_K_value = Top_k_max(enc_orig, d_a=5, d_b=5, t=6, m=2, k=k_value, relinearization_key=relinearization_key)
    Top_K_value_plt = engine.decrypt(Top_K_value, secret_key)
    print(f"Top-{k_value} Max 값: {Top_K_value_plt}")


if __name__ == '__main__':
    main()