Small attempt

In [3]:
import numpy as np
import random
import time
from datetime import datetime, timedelta

dense_params = np.random.rand(5)  # dense parameters
sparse_params = {}  # sparse parameters stored as a dict

def update_parameters(worker_dense_update, worker_sparse_update):
    global dense_params
    global sparse_params

    # Updating dense parameters
    dense_params = dense_params + worker_dense_update

    # Updating sparse parameters
    for id, embedding in worker_sparse_update.items():
        if id in sparse_params:
            sparse_params[id] = sparse_params[id] + embedding
        else:
            sparse_params[id] = embedding

def train_step():
    time.sleep(10)
    # Small dense updates
    worker_dense_update = np.random.rand(5) * 0.01  
    worker_sparse_update = {random.randint(1, 100): np.random.rand(3) * 0.01 for _ in range(2)}
    update_parameters(worker_dense_update, worker_sparse_update)

def sync_parameters():
    print("Synchronizing parameters...")
    print("Dense parameters:", dense_params)
    print("Sparse parameters:")
    for key, value in sparse_params.items():
        print(f"  ID {key}: {value}")

def expire_old_sparse_parameters(threshold_minutes=1):
    current_time = datetime.now()
    new_sparse_params = {}
    for id, (embedding, timestamp) in sparse_params.items():
        if (current_time - timestamp) < timedelta(minutes=threshold_minutes):
            new_sparse_params[id] = (embedding, timestamp)
    return new_sparse_params


for _ in range(10):
    train_step()

sparse_params = {id: (embedding, datetime.now()) for id, embedding in sparse_params.items()}
sparse_params = expire_old_sparse_parameters()

sync_parameters()


GPT response

In [None]:
import numpy as np
import random
import hashlib
import time

# --------------------- Simulating Dense Parameters ----------------------
# Dense parameters can be represented as a vector or a matrix (numpy array in this case)
class DenseParameters:
    def __init__(self, num_params):
        self.params = np.random.randn(num_params)

    def update(self, gradients):
        learning_rate = 0.1
        self.params -= learning_rate * gradients

    def get_params(self):
        return self.params


# --------------------- Simulating Sparse Parameters ---------------------
# Custom HashTable to simulate sparse parameter storage using Cuckoo hashing as described
class CuckooHashTable:
    def __init__(self):
        self.table0 = {}
        self.table1 = {}

    def _hash0(self, key):
        return int(hashlib.md5(key.encode('utf-8')).hexdigest(), 16) % 10  # Simplified hash function

    def _hash1(self, key):
        return int(hashlib.sha1(key.encode('utf-8')).hexdigest(), 16) % 10  # Simplified hash function

    def insert(self, key, value):
        index0 = self._hash0(key)
        if key not in self.table0:
            self.table0[index0] = (key, value)
            return True
        index1 = self._hash1(key)
        if key not in self.table1:
            self.table1[index1] = (key, value)
            return True
        return False  # For simplicity, we don’t handle collisions or reinsertion

    def lookup(self, key):
        index0 = self._hash0(key)
        if key in self.table0 and self.table0[index0][0] == key:
            return self.table0[index0][1]
        index1 = self._hash1(key)
        if key in self.table1 and self.table1[index1][0] == key:
            return self.table1[index1][1]
        return None

    def update(self, key, new_value):
        index0 = self._hash0(key)
        if key in self.table0 and self.table0[index0][0] == key:
            self.table0[index0] = (key, new_value)
            return True
        index1 = self._hash1(key)
        if key in self.table1 and self.table1[index1][0] == key:
            self.table1[index1] = (key, new_value)
            return True
        return False  # For simplicity, we don’t handle updating an item that doesn't exist


# -------------------- Simulating Training and Synchronization Process -------------------

# Create a model container with dense and sparse parameters
class MonolithModel:
    def __init__(self):
        self.dense_params = DenseParameters(5)  # Fixed size dense parameters
        self.sparse_params = CuckooHashTable()  # Dynamic sparse parameters represented by hash table

    def train_dense(self, gradients):
        self.dense_params.update(gradients)

    def train_sparse(self, key, value):
        self.sparse_params.insert(key, value)

    def get_dense_params(self):
        return self.dense_params.get_params()

    def get_sparse_params(self, key):
        return self.sparse_params.lookup(key)

# Simulate worker and PS (parameter server) interactions
def train_and_sync_model():
    model_in_training_PS = MonolithModel()
    model_in_serving_PS = MonolithModel()

    # Training Loop, for simplicity we simulate both dense and sparse updates
    for _ in range(10):  # Simulate 10 training steps
        # Simulate sparse parameter training
        example_id = "item"+str(random.randint(0, 9))   # Example sparse feature (item ID)
        embedding_update = np.random.randn(5)            # Random embedding representing training update
        model_in_training_PS.train_sparse(example_id, embedding_update)

        # Simulate dense parameter training
        dense_gradients = np.random.randn(5)             # Random gradient update
        model_in_training_PS.train_dense(dense_gradients)

        # Simulate synchronization from training_PS to serving_PS (dense parameters less frequently)
        if _ % 2 == 0:  # Sync dense every 2 steps
            model_in_serving_PS.dense_params.params = model_in_training_PS.get_dense_params()
        # Sparse parameters can be synchronized more frequently
        model_in_serving_PS.sparse_params.insert(example_id, model_in_training_PS.get_sparse_params(example_id))

        time.sleep(1)  # Simulate time required for training step

    # Print out the final parameters for comparison
    print("Final dense parameters in training PS:", model_in_training_PS.get_dense_params())
    print("Final dense parameters in serving PS:", model_in_serving_PS.get_dense_params())

    # Print out an example of final sparse parameters for a comparison
    example_id = "item5"
    print("Final sparse parameters for item5 in training PS:", model_in_training_PS.get_sparse_params(example_id))
    print("Final sparse parameters for item5 in serving PS:", model_in_serving_PS.get_sparse_params(example_id))

train_and_sync_model()


Final dense parameters in training PS: [-0.34595181  1.1959901  -0.93633135 -0.51692196  0.01520432]
Final dense parameters in serving PS: [-0.34595181  1.1959901  -0.93633135 -0.51692196  0.01520432]
Final sparse parameters for item5 in training PS: None
Final sparse parameters for item5 in serving PS: None
