In [None]:
!python3 -m pip install psycopg2-binary

In [None]:
!python3 -m pip install mmh3

In [14]:
!python3 -m pip install kafka-python

[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[K     |████████████████████████████████| 246 kB 549 kB/s eta 0:00:01
[?25hInstalling collected packages: kafka-python
[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m
Successfully installed kafka-python-2.0.2


In [1]:
import psycopg2

In [2]:
def get_postgres_connection():
    conn = psycopg2.connect(host="localhost", database="postgres", user="postgres", password="root123")
    cur = conn.cursor()
    
    return conn, cur

In [3]:
import redis
redis_con = redis.Redis()

In [None]:
from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))

In [4]:
conn, cur = get_postgres_connection()

In [5]:
import hashlib

def create_test(test_name, conn, cur):
    schema_name = 'test__' + str(hashlib.md5(test_name.encode()).hexdigest())
    
    cur.execute('CREATE SCHEMA IF NOT EXISTS global')
    cur.execute('CREATE TABLE IF NOT EXISTS global.tests(schema text primary key, is_active boolean)')
    cur.execute('INSERT INTO global.tests(schema, is_active) VALUES (%s, TRUE) ON CONFLICT DO NOTHING', (schema_name,))
    
    cur.execute('CREATE SCHEMA IF NOT EXISTS ' + schema_name)
    cur.execute('CREATE TABLE IF NOT EXISTS ' + schema_name + '.user_assignment(user_id varchar(10) primary key, model_id bigint)')
    cur.execute('CREATE INDEX IF NOT EXISTS model_index ON ' + schema_name + '.user_assignment(model_id)')
    cur.execute('CREATE TABLE IF NOT EXISTS ' + schema_name + '.model_config(model_id bigint primary key, model_endpoint text)')
    cur.execute('CREATE TABLE IF NOT EXISTS ' + schema_name + '.ctr_tracking(model_id bigint primary key, num_displayed bigint, num_clicked bigint)', (schema_name,))
    
    conn.commit()

In [6]:
create_test('movie_recommendations', conn, cur)

In [7]:
import mmh3, redis, pickle, time

def add_model_random_assignment(test_name, model_endpoint, conn, cur, redis_conn):
    schema_name = 'test__' + str(hashlib.md5(test_name.encode()).hexdigest())
    
    model_id = mmh3.hash(model_endpoint, signed=False)
    cur.execute('INSERT INTO ' + schema_name + '.model_config(model_id, model_endpoint) VALUES (%s, %s) ON CONFLICT (model_id) DO UPDATE SET model_endpoint=%s', (model_id, model_endpoint, model_endpoint))
    conn.commit()
    
    model_vids = []
    
    for i in range(100):
        g = model_endpoint + str(i)
        mid = mmh3.hash(g, signed=False)
        
        b1 = redis_conn.zscore('consistent_hash_model', str(model_id)+':'+str(mid))
        b2 = redis_conn.zscore('consistent_hash_user', str(model_id)+':'+str(mid))
        
        while b1 is not None or b2 is not None:
            mid = mmh3.hash(g+str(time.time()), signed=False)
        
            b1 = redis_conn.zscore('consistent_hash_model', str(model_id)+':'+str(mid))
            b2 = redis_conn.zscore('consistent_hash_user', str(model_id)+':'+str(mid))
        
        model_vids.append(mid)
    
    cur.execute('SELECT is_active FROM global.tests WHERE schema=%s', (schema_name,))
    row = cur.fetchone()
    
    if len(row) > 0 and row[0]:
        cur.execute('SELECT user_id, model_id FROM ' + schema_name + '.user_assignment')
        rows = cur.fetchall()
        
        if len(rows) == 0:
            with open('../sampled_data.pkl', 'rb') as f:
                sampled_data = pickle.load(f)

            user_ids = [x for x, y, z in sampled_data]
    
            for i in range(len(user_ids)):
                redis_conn.zadd('consistent_hash_user', {user_ids[i] : mmh3.hash(user_ids[i], signed=False)})
                cur.execute('INSERT INTO ' + schema_name + '.user_assignment(user_id, model_id) VALUES (%s, %s) ON CONFLICT (user_id) DO UPDATE SET model_id=%s', (user_ids[i], model_id, model_id))
            
            conn.commit()
            
        else:
            mhashes = redis_conn.zrange('consistent_hash_model', 0, -1, withscores=True)
            uhashes = redis_conn.zrange('consistent_hash_user', 0, -1, withscores=True)
            
            pairs = []
            
            for model_vid in model_vids:
                left, right = 0, len(mhashes)-1
                p = len(mhashes)-1
                
                while left <= right:
                    mid = int((left+right)/2)
                    
                    if mhashes[mid][1] < model_vid:
                        p = mid
                        left = mid+1
                    else:
                        right = mid-1
                
                
                left, right = 0, len(uhashes)-1
                q = len(uhashes)-1
                
                while left <= right:
                    mid = int((left+right)/2)
                    
                    if uhashes[mid][1] < model_vid:
                        q = mid
                        left = mid+1
                    else:
                        right = mid-1
                
                while uhashes[q][1] > mhashes[p][1]:
                    pairs.append((uhashes[q][0].decode("utf-8") , model_vid))
                    q = (q-1) % len(uhashes)
            
            for x, y in pairs:
                redis_conn.zadd('consistent_hash_user', {x : mmh3.hash(x, signed=False)})
                cur.execute('INSERT INTO ' + schema_name + '.user_assignment(user_id, model_id) VALUES (%s, %s) ON CONFLICT (user_id) DO UPDATE SET model_id=%s', (x, model_id, model_id))
            
            conn.commit()
            
    for model_vid in model_vids:
        redis_conn.zadd('consistent_hash_model', {str(model_id)+':'+str(model_vid) : model_vid})

In [8]:
add_model_random_assignment('movie_recommendations', '127.0.0.1:5001/recommendations', conn, cur, redis_con)

In [12]:
add_model_random_assignment('movie_recommendations', '127.0.0.1:5002/recommendations', conn, cur, redis_con)

In [13]:
add_model_random_assignment('movie_recommendations', '127.0.0.1:5003/recommendations', conn, cur, redis_con)

In [None]:
conn.close()

In [9]:
uhashes = redis_con.zrange('consistent_hash_user', 0, -1, withscores=True)

In [11]:
len(uhashes)

996

In [10]:
for x, y in uhashes:
    print(x.decode("utf-8"))

1513476
1897386
479148
999504
433490
1169378
35870
2108672
501346
1814991
1553318
1142799
670051
5507
339980
2410056
321724
160534
1282445
2286129
1971790
2127976
1793274
591748
2335003
1331887
2265780
802772
2444052
289356
2584544
1232966
2052271
124652
1509801
2522672
633505
1831673
2187301
1581500
1912002
2495459
1252829
2509568
947232
1966369
49593
2348951
911516
1660860
27467
619263
420266
716369
1234291
327691
1467955
214571
2628134
2324358
2114639
889776
2591796
988217
1581467
224836
1890099
1748681
1235008
2175440
1350191
2095341
708669
999012
1331764
2623009
2490870
1831489
2473882
1330032
896633
1826465
2205342
1192769
804004
2155529
1004748
1460631
2198612
921924
744250
383132
1834419
854831
2203465
1115451
1612756
293235
1450771
1018629
255611
382041
228833
50472
1075675
2446734
2389380
2044713
1122969
2596947
1222147
79224
2514559
966446
899799
910733
1900854
2003891
1303776
2019083
1867238
1523426
1346924
2149393
2422145
72663
2170831
1253737
122126
667203
1448110
90821
5