In [1]:
import redis
from redis.commands.json.path import Path
import redis.commands.search.reducers as reducers
from redis.commands.search.field import TextField, NumericField, TagField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import NumericFilter, Query

import random

r = redis.Redis(host='localhost', port=6379)
r.select(0)

client_num = 10

for i in range(client_num):
    cpu, memory, os = random.randint(0, 100), random.randint(0, 100), random.randint(0, 100) 
    print(f"client {i}: {cpu}-{memory}-{os}")
    client = {
        "client":{
            "timestamp": i,
            "cpu": cpu,
            "memory": memory,
            "os": os
        }
    }
    r.json().set(f"client:{i}", Path.root_path(), client)
    

schema = (
          NumericField("$.client.timestamp", as_name="timestamp"),
          NumericField("$.client.cpu", as_name="cpu"),
          NumericField("$.client.memory", as_name="memory"),
          NumericField("$.client.os", as_name="os"))

try:
    r.ft("client").create_index(schema, 
                    definition=IndexDefinition(prefix=["client:"], index_type=IndexType.JSON))
except:
    pass

client 0: 78-48-51
client 1: 75-3-0
client 2: 23-44-67
client 3: 49-37-14
client 4: 15-83-92
client 5: 82-5-28
client 6: 78-20-32
client 7: 12-9-16
client 8: 54-20-66
client 9: 15-13-92


In [3]:
q = Query('*')
result = r.ft('client').search(q)
print(result)

Result{10 total, docs: [Document {'id': 'client:4', 'payload': None, 'json': '{"client":{"timestamp":4,"cpu":15,"memory":83,"os":92}}'}, Document {'id': 'client:7', 'payload': None, 'json': '{"client":{"timestamp":7,"cpu":12,"memory":9,"os":16}}'}, Document {'id': 'client:8', 'payload': None, 'json': '{"client":{"timestamp":8,"cpu":54,"memory":20,"os":66}}'}, Document {'id': 'client:3', 'payload': None, 'json': '{"client":{"timestamp":3,"cpu":49,"memory":37,"os":14}}'}, Document {'id': 'client:1', 'payload': None, 'json': '{"client":{"timestamp":1,"cpu":75,"memory":3,"os":0}}'}, Document {'id': 'client:9', 'payload': None, 'json': '{"client":{"timestamp":9,"cpu":15,"memory":13,"os":92}}'}, Document {'id': 'client:6', 'payload': None, 'json': '{"client":{"timestamp":6,"cpu":78,"memory":20,"os":32}}'}, Document {'id': 'client:5', 'payload': None, 'json': '{"client":{"timestamp":5,"cpu":82,"memory":5,"os":28}}'}, Document {'id': 'client:0', 'payload': None, 'json': '{"client":{"timestamp"

In [3]:
r.expire('client:0', 30)

True

In [5]:
r.delete('client:1')

1

In [5]:
import time
def insert(id:int, metrics:tuple[int, int, int]):
    client = {
        "client":{
            "timestamp": int(time.time()),
            "cpu": metrics[0],
            "memory": metrics[1],
            "os": metrics[2],
        }
    }
    r.json().set(f"client:{id}", Path.root_path(), client)

insert(11, (1, 2, 3))

In [16]:
tq = Query('*').sort_by('timestamp', asc=True).no_content()
result = r.ft('client').search(tq)
for doc in result.docs:
    print(doc)

Document {'id': 'client:0', 'payload': None}
Document {'id': 'client:1', 'payload': None}
Document {'id': 'client:2', 'payload': None}
Document {'id': 'client:3', 'payload': None}
Document {'id': 'client:4', 'payload': None}
Document {'id': 'client:5', 'payload': None}
Document {'id': 'client:6', 'payload': None}
Document {'id': 'client:7', 'payload': None}
Document {'id': 'client:8', 'payload': None}
Document {'id': 'client:9', 'payload': None}


In [4]:
import json
result = r.ft('client').search('@cpu: [10, 50]')
for doc in result.docs:
    print(doc)
    dict = json.loads(doc.json)
    print(dict['client']['cpu'])

Document {'id': 'client:5', 'payload': None, 'json': '{"client":{"timestamp":5,"cpu":13,"memory":3,"os":9}}'}
13
Document {'id': 'client:6', 'payload': None, 'json': '{"client":{"timestamp":6,"cpu":22,"memory":5,"os":50}}'}
22


In [2]:

r.select(0)
job = {
        "job":{
            "ip": "localhost",
            "port": 61000,
            "timestamp": 0,
            "constraints":{
                "cpu": 90,
                "memory": 70,
                "os": 70,
            },
            "demand": 10,
            "amount": 0,
            "round" : 10,
            "score": 0,
            "workload": 10,
        }
    }
r.json().set(f"job:10", Path.root_path(), job)

job = {
        "job":{
            "ip": "localhost",
            "port": 61000,
            "timestamp": 1,
            "constraints":{
                "cpu": 70,
                "memory": 70,
                "os": 70,
            },
            "demand": 10,
            "amount": 10,
            "round" : 10,
            "score": 0,
            "workload": 10,
        }
    }
r.json().set(f"job:11", Path.root_path(), job)

job = {
        "job":{
            "ip": "localhost",
            "port": 61000,
            "timestamp": 3,
            "constraints":{
                "cpu": 70,
                "memory": 70,
                "os": 70,
            },
            "demand": 2,
            "amount": 0,
            "round" : 10,
            "score": 0,
            "workload": 10,
        }
    }
r.json().set(f"job:3", Path.root_path(), job)

job = {
        "job":{
            "ip": "localhost",
            "port": 61000,
            "timestamp": 2,
            "constraints":{
                "cpu": 70,
                "memory": 70,
                "os": 70,
            },
            "demand": 5,
            "amount": 0,
            "round" : 10,
            "score": 0,
            "workload": 10,
        }
    }
r.json().set(f"job:2", Path.root_path(), job)

schema = (TextField("$.job.id", as_name="id"),
          NumericField("$.job.timestamp", as_name="timestamp"),
          NumericField("$.job.constraints.cpu", as_name="cpu"),
          NumericField("$.job.constraints.memory", as_name="memory"),
          NumericField("$.job.constraints.os", as_name="os"),
          NumericField("$.job.demand", as_name="demand"),
          NumericField("$.job.amount", as_name="amount"),
          TextField("$.job.ip", as_name="ip"),
          NumericField("$.job.port", as_name="port"),
          NumericField("$.job.round", as_name="round"),
          NumericField("$.job.score", as_name="score"),
          NumericField("$.job.workload", as_name="workload"),)

try:
    r.ft("job").create_index(schema, 
                    definition=IndexDefinition(prefix=["job:"], index_type=IndexType.JSON))
except:
    pass

In [16]:
job = {
        "job":{
            "ip": "localhost",
            "port": 61000,
            "timestamp": 2,
            "constraints":{
                "cpu": 70,
                "memory": 70,
                "os": 70,
            },
            "demand": 5,
            "amount": 0,
            "round" : 10,
            "score": 0,
            "workload": 10,
        }
    }

with r.json().pipeline() as w:
    while True:
        try:
            id = f"job:2"
            pipe.watch(id)
            if pipe.get(id):
                print("exist")
                pipe.unwatch()
                break
            pipe.set(f"job:2", Path.root_path(), job)
            print("success")
            pipe.unwatch()
            break
        except redis.WatchError:
            pass


exist


In [44]:
import time
with r.json().pipeline() as pipe:
    while True:
        try:
            id = f"job:0"
            pipe.watch(id)
            if not pipe.get(id):
                print("Not exist")
                pipe.unwatch()
                break
            pipe.multi()
            time.sleep(10)
            pipe.execute_command('JSON.NUMINCRBY', id, "$.job.amount", 1)
            pipe.execute_command('JSON.SET', id, '$.job.demand', 100)
            pipe.execute()
            break
        except redis.WatchError:
            print("Watch error")
            pass




Watch error
Watch error


In [16]:
print(r.json().get('job:0'))

{'job': {'ip': 'localhost', 'port': 61000, 'timestamp': 0, 'constraints': {'cpu': 90, 'memory': 70, 'os': 70}, 'demand': 10, 'amount': 0, 'round': 10, 'score': 0, 'workload': 10}}


In [7]:
import json
q = Query('*').sort_by('score', asc=False)
try:
    result = r.ft('job').search(q)
except:
    result = None
if result:
    size = result.total
    print(size)
    for doc in result.docs:
        job = json.loads(doc.json)
        job_constraints = (job['job']['constraints']['cpu'],
                        job['job']['constraints']['memory'],
                        job['job']['constraints']['os'])
        offer, size = (doc.id.split(':')[1], size)
        print(doc)
        

4
Document {'id': 'job:10', 'payload': None, 'score': '0', 'json': '{"job":{"ip":"localhost","port":61000,"timestamp":0,"constraints":{"cpu":90,"memory":70,"os":70},"demand":10,"amount":0,"round":10,"score":0,"workload":10}}'}
Document {'id': 'job:3', 'payload': None, 'score': '0', 'json': '{"job":{"ip":"localhost","port":61000,"timestamp":3,"constraints":{"cpu":70,"memory":70,"os":70},"demand":2,"amount":0,"round":10,"score":0,"workload":10}}'}
Document {'id': 'job:11', 'payload': None, 'score': '0', 'json': '{"job":{"ip":"localhost","port":61000,"timestamp":1,"constraints":{"cpu":70,"memory":70,"os":70},"demand":10,"amount":10,"round":10,"score":0,"workload":10}}'}
Document {'id': 'job:2', 'payload': None, 'score': '0', 'json': '{"job":{"ip":"localhost","port":61000,"timestamp":2,"constraints":{"cpu":70,"memory":70,"os":70},"demand":5,"amount":0,"round":10,"score":0,"workload":10}}'}


In [66]:
with r.json().pipeline() as pipe:
    while True:
        try:
            pipe.watch('job:3')
            amount = int(r.json().get('job:3', "$.job.amount")[0])
            demand = int(r.json().get('job:3', "$.job.demand")[0])
            if amount >= demand:
                print("satisfied")
                pipe.unwatch()
                break
            else:
                pipe.multi()
                pipe.execute_command('EVAL', """
                    local json_data = redis.call('JSON.GET', KEYS[1])
                    local data = cjson.decode(json_data)
                    data.job.amount = data.job.amount + 1
                    local updated_json = cjson.encode(data)
                    redis.call('JSON.SET', KEYS[1], '.', updated_json)
                    """, 1, 'job:3')
                pipe.execute()
                print("Increased")
                break
        except redis.WatchError:
            print("Watch Error")
            pass

satisfied


In [34]:
with r.json().pipeline() as pipe:
    id = "job:0"
    pipe.watch('job:0')
    pipe.delete('job:0')
    pipe.unwatch()

In [20]:
result = r.execute_command('FT._LIST')
print(type(result))

<class 'list'>


In [22]:
result = r.execute_command('FT.INFO', 'job')
print(result)

[b'index_name', b'job', b'index_options', [], b'index_definition', [b'key_type', b'JSON', b'prefixes', [b'job:'], b'default_score', b'1'], b'attributes', [[b'identifier', b'$.job.id', b'attribute', b'id', b'type', b'TEXT', b'WEIGHT', b'1'], [b'identifier', b'$.job.timestamp', b'attribute', b'timestamp', b'type', b'NUMERIC'], [b'identifier', b'$.job.constraints.cpu', b'attribute', b'cpu', b'type', b'NUMERIC'], [b'identifier', b'$.job.constraints.memory', b'attribute', b'memory', b'type', b'NUMERIC'], [b'identifier', b'$.job.constraints.os', b'attribute', b'os', b'type', b'NUMERIC'], [b'identifier', b'$.job.demand', b'attribute', b'demand', b'type', b'NUMERIC'], [b'identifier', b'$.job.amount', b'attribute', b'amount', b'type', b'NUMERIC'], [b'identifier', b'$.job.ip', b'attribute', b'ip', b'type', b'TEXT', b'WEIGHT', b'1'], [b'identifier', b'$.job.port', b'attribute', b'port', b'type', b'NUMERIC'], [b'identifier', b'$.job.round', b'attribute', b'round', b'type', b'NUMERIC'], [b'identifi

In [13]:
r.execute_command('JSON.SET', f"job:13", "$.job.score", 100)

ResponseError: new objects must be created at the root

In [12]:
import json
metrics = (70, 70, 70)
q = Query(f'@cpu: [{metrics[0]},{metrics[0]}] @memory: [{metrics[1]},{metrics[1]}] @os: [{metrics[2]},{metrics[2]}]')
result = r.ft('job').search(q)
print(result.total)
for doc in result.docs:
    print(doc)
    dock_id = doc.id
    print(dock_id.split(':')[1])
    print(json.loads(doc.json)["job"]["demand"])

3
Document {'id': 'job:1', 'payload': None, 'json': '{"job":{"ip":"localhost","port":61000,"timestamp":1,"constraints":{"cpu":70,"memory":70,"os":70},"demand":10,"amount":10,"round":10,"score":0,"workload":10}}'}
1
10
Document {'id': 'job:2', 'payload': None, 'json': '{"job":{"ip":"localhost","port":61000,"timestamp":2,"constraints":{"cpu":70,"memory":70,"os":70},"demand":5,"amount":0,"round":10,"score":0,"workload":10}}'}
2
5
Document {'id': 'job:3', 'payload': None, 'json': '{"job":{"ip":"localhost","port":61000,"timestamp":3,"constraints":{"cpu":70,"memory":70,"os":70},"demand":2,"amount":0,"round":10,"score":100,"workload":10}}'}
3
2


In [8]:
info = r.ft('client').info()
cur_size = info['num_docs']
print(cur_size)

10
