Skip to content

Commit

Permalink
Merge branch 'main' of github.com:IITH-Compilers/MLCompilerBridge int…
Browse files Browse the repository at this point in the history
…o main
  • Loading branch information
pranav-159 committed Jan 13, 2024
2 parents b0701a9 + 499793a commit 7ce476e
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 402 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ else()
add_library(MLCompilerBridge STATIC tools.cpp)
target_link_libraries(MLCompilerBridge PUBLIC SerDesLib ModelRunnerLib ONNXModelRunnerLib LLVM-10 ${llvm_libs})

add_executable(MLCompilerBridgeTest $<TARGET_OBJECTS:LLVMMLBridgeTest>)
# add_library(MLCompilerBridgeTest SHARED $<TARGET_OBJECTS:LLVMHelloMLBridgeTest>)
add_executable(MLCompilerBridgeTest $<TARGET_OBJECTS:MLBridgeCPPTest>)
target_link_libraries(MLCompilerBridgeTest PUBLIC MLCompilerBridge)
set_property(TARGET MLCompilerBridge PROPERTY POSITION_INDEPENDENT_CODE 1)

Expand Down
30 changes: 25 additions & 5 deletions CompilerInterface/SerDes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import os, io
import json
import log_reader
import ctypes
import struct

class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, ctypes.c_long):
return obj.value
if isinstance(obj, ctypes.c_double):
return obj.value
return super(NpEncoder, self).default(obj)

## Class for serialization and deserialization in various formats for communication.
class SerDes:
Expand Down Expand Up @@ -56,16 +65,27 @@ def serializeData(self, data):

## Serializes data to JSON
def serializeJson(self, data):
msg = json.dumps({"out": data}).encode("utf-8")
msg = json.dumps({"out": data}, cls=NpEncoder).encode("utf-8")
hdr = len(msg).to_bytes(8, "little")
self.buffer = hdr + msg

## Serializes data to bitstream
def serializeBytes(self, data):
if isinstance(data, list):
msg = b"".join([x.to_bytes(4, "little", signed=True) for x in data])
else:
msg = data.to_bytes(4, "little", signed=True)
def _pack(data):
if isinstance(data, int):
return struct.pack("i", data)
elif isinstance(data, float):
return struct.pack("f", data)
elif isinstance(data, str) and len(data) == 1:
return struct.pack('c', data)
elif isinstance(data, ctypes.c_double):
return struct.pack('d', data.value)
elif isinstance(data, ctypes.c_long):
return struct.pack('l', data.value)
elif isinstance(data, list):
return b"".join([_pack(x) for x in data])

msg = _pack(data)
hdr = len(msg).to_bytes(8, "little")
self.buffer = hdr + msg

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import log_reader
import argparse
import os, io, json
import SerDes

from PipeCompilerInterface import PipeCompilerInterface
from GrpcCompilerInterface import GrpcCompilerInterface
import sys
import torch, torch.nn as nn

Expand Down Expand Up @@ -32,6 +30,12 @@
required=False,
default=False,
)
parser.add_argument(
"--server_port",
type=int,
help="Server Port",
default=5050,
)
args = parser.parse_args()


Expand All @@ -46,29 +50,30 @@ def forward(self, input):


def run_pipe_communication(data_format, pipe_name):
serdes = SerDes.SerDes(data_format, "/tmp/" + pipe_name)
print("Serdes init...")
serdes.init()
compiler_interface = PipeCompilerInterface(data_format, '/tmp/' + pipe_name)
print("PipeCompilerInterface init...")
compiler_interface.reset_pipes()

while True:
try:
data = serdes.readObservation()
data = compiler_interface.evaluate()
if data_format == "json":
data = data["tensor"]
# print("Data: ", data["tensor"])
elif data_format == "bytes":
data = [x for x in data[0]]
# print("Data: ", [x for x in data[0]])
print("len(tensor): ", len(data))
print("data: ", data)
model = DummyModel(input_dim=len(data))
action = model(torch.Tensor(data))
serdes.sendData(3)
compiler_interface.populate_buffer(3)
except Exception as e:
print("*******Exception*******", e)
serdes.init()
compiler_interface.reset_pipes()


class service_server(helloMLBridge_pb2_grpc.HelloMLBridgeService):
def __init__(self, data_format, pipe_name):
def __init__(self):
# self.serdes = SerDes.SerDes(data_format, pipe_name)
# self.serdes.init()
pass
Expand Down Expand Up @@ -99,22 +104,9 @@ def test_func():


if __name__ == "__main__":
# test_func()
# exit(0)
if args.use_pipe:
run_pipe_communication(args.data_format, args.pipe_name)
elif args.use_grpc:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=20),
options=[
("grpc.max_send_message_length", 200 * 1024 * 1024),
("grpc.max_receive_message_length", 200 * 1024 * 1024),
],
)
helloMLBridge_pb2_grpc.add_HelloMLBridgeServiceServicer_to_server(
service_server(args.data_format, args.pipe_name), server
)
server.add_insecure_port("localhost:5050")
server.start()
print("Server Running")
server.wait_for_termination()
compiler_interface = GrpcCompilerInterface(mode = 'server', add_server_method=helloMLBridge_pb2_grpc.add_HelloMLBridgeServiceServicer_to_server, grpc_service_obj=service_server(), hostport= args.server_port)
compiler_interface.start_server()

151 changes: 151 additions & 0 deletions CompilerInterface/log_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""Reader for training log.
See lib/Analysis/TrainingLogger.cpp for a description of the format.
"""
import ctypes
import dataclasses
import io
import json
import math
import sys
from typing import List, Optional
from functools import reduce
import operator
import numpy

_element_types = {
"float": ctypes.c_float,
"double": ctypes.c_double,
"int8_t": ctypes.c_int8,
"uint8_t": ctypes.c_uint8,
"int16_t": ctypes.c_int16,
"uint16_t": ctypes.c_uint16,
"int32_t": ctypes.c_int32,
"uint32_t": ctypes.c_uint32,
"int64_t": ctypes.c_int64,
"uint64_t": ctypes.c_uint64,
}


@dataclasses.dataclass(frozen=True)
class TensorSpec:
name: str
port: int
shape: List[int]
element_type: type

@staticmethod
def from_dict(d: dict):
name = d["name"]
port = d["port"]
shape = [int(e) for e in d["shape"]]
element_type_str = d["type"]
if element_type_str not in _element_types:
raise ValueError(f"uknown type: {element_type_str}")
return TensorSpec(
name=name,
port=port,
shape=shape,
element_type=_element_types[element_type_str],
)


class TensorValue:
def __init__(self, spec: TensorSpec, buffer: bytes):
self._spec = spec
self._buffer = buffer
self._view = ctypes.cast(self._buffer, ctypes.POINTER(self._spec.element_type))
# self._len = math.prod(self._spec.shape)
self._len = reduce(operator.mul, self._spec.shape, 1)
# self._view = numpy.frombuffer(self._buffer, float)
# print("Value of", self._spec.name, "is:", self._view)

def spec(self) -> TensorSpec:
return self._spec

def __len__(self) -> int:
return self._len

def __getitem__(self, index):
if index < 0 or index >= self._len:
raise IndexError(f"Index {index} out of range [0..{self._len})")
return self._view[index]


def read_tensor(fs: io.BufferedReader, ts: TensorSpec) -> TensorValue:
size = reduce(operator.mul, ts.shape, 1) * ctypes.sizeof(ts.element_type)
# size = math.prod(ts.shape) * ctypes.sizeof(ts.element_type)
data = fs.read(size)
return TensorValue(ts, data)


def pretty_print_tensor_value(tv: TensorValue):
print(f'{tv.spec().name}: {",".join([str(v) for v in tv])}')


def read_header(f: io.BufferedReader):
line = f.readline()
header = json.loads(line)
tensor_specs = [TensorSpec.from_dict(ts) for ts in header["features"]]
score_spec = TensorSpec.from_dict(header["score"]) if "score" in header else None
advice_spec = TensorSpec.from_dict(header["advice"]) if "advice" in header else None
return tensor_specs, score_spec, advice_spec


def read_one_observation(
context: Optional[str],
event_str: str,
f: io.BufferedReader,
tensor_specs: List[TensorSpec],
score_spec: Optional[TensorSpec],
):
features = []
for ts in tensor_specs:
features.append(read_tensor(f, ts))
f.readline()
return context, None, features, None


def read_stream(fname: str):
with io.BufferedReader(io.FileIO(fname, "rb")) as f:
tensor_specs, score_spec, _ = read_header(f)
context = None
while True:
event_str = f.readline()
if not event_str:
break
context, observation_id, features, score = read_one_observation(
context, event_str, f, tensor_specs, score_spec
)
yield context, observation_id, features, score


def read_stream2(f: io.BufferedReader):
context = None
while True:
tensor_specs, score_spec, _ = read_header(f)
# event_str = f.readline()
# print("Event: ", event_str)
# if not event_str:
# break
context, observation_id, features, score = read_one_observation(
context, "", f, tensor_specs, score_spec
)
yield context, observation_id, features, score


def main(args):
last_context = None
for ctx, obs_id, features, score in read_stream(args[1]):
if last_context != ctx:
print(f"context: {ctx}")
last_context = ctx
print(f"observation: {obs_id}")
for fv in features:
pretty_print_tensor_value(fv)
if score:
pretty_print_tensor_value(score)


if __name__ == "__main__":
main(sys.argv)
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import log_reader
import argparse
import os, io, json
import SerDes
import ctypes
from PipeCompilerInterface import PipeCompilerInterface
from GrpcCompilerInterface import GrpcCompilerInterface
import numpy as np
import ctypes

import sys
import torch, torch.nn as nn
Expand Down Expand Up @@ -92,14 +91,14 @@ def forward(self, input):
}

def run_pipe_communication(data_format, pipe_name):
serdes = SerDes.SerDes(data_format, "/tmp/" + pipe_name)
print('Serdes init...')
serdes.init()
compiler_interface = PipeCompilerInterface(data_format, '/tmp/' + pipe_name)
print("PipeCompilerInterface init...")
compiler_interface.reset_pipes()
i = 0
while True:
i += 1
try:
data = serdes.readObservation()
data = compiler_interface.evaluate()
if data_format == "json":
key = list(data)[0]
data = data[key]
Expand All @@ -120,10 +119,10 @@ def run_pipe_communication(data_format, pipe_name):
print(f"Mismatch in {expected_type[i]}")
# raise Exception(f"Mismatch in {expected_type[i]}")

serdes.sendData(returned_data[i])
compiler_interface.populate_buffer(returned_data[i])
except Exception as e:
print("*******Exception*******", e)
serdes.init()
compiler_interface.reset_pipes()

class service_server(helloMLBridge_pb2_grpc.HelloMLBridgeService):
def __init__(self, data_format, pipe_name):
Expand Down Expand Up @@ -155,22 +154,8 @@ def test_func():
print(decoded, type(decoded))

if __name__ == "__main__":
# test_func()
# exit(0)
if args.use_pipe:
run_pipe_communication(args.data_format, args.pipe_name)
elif args.use_grpc:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=20),
options=[
("grpc.max_send_message_length", 200 * 1024 * 1024),
("grpc.max_receive_message_length", 200 * 1024 * 1024),
],
)
helloMLBridge_pb2_grpc.add_HelloMLBridgeServiceServicer_to_server(
service_server(args.data_format, args.pipe_name), server
)
server.add_insecure_port(f"localhost:{args.server_port}")
server.start()
print("Server Running")
server.wait_for_termination()
compiler_interface = GrpcCompilerInterface(mode = 'server', add_server_method=helloMLBridge_pb2_grpc.add_HelloMLBridgeServiceServicer_to_server, grpc_service_obj=service_server(), hostport= args.server_port)
compiler_interface.start_server()
6 changes: 3 additions & 3 deletions MLModelRunner/Utils/MLConfig.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "MLModelRunner/Utils/MLConfig.h"

llvm::cl::opt<std::string>
llvm::MLConfig::mlconfig("ml-config-path", cl::Hidden, cl::Optional,
cl::desc("Path to ML config files"), cl::init(""));
llvm::cl::opt<std::string> MLBridge::MLConfig::mlconfig(
"ml-config-path", llvm::cl::Hidden, llvm::cl::Optional,
llvm::cl::desc("Path to ML config files"), llvm::cl::init(""));
1 change: 1 addition & 0 deletions MLModelRunner/gRPCModelRunner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ set(CMAKE_CXX_STANDARD 11)
# add_custom_target(protobuf_grpc_version ALL
# COMMAND ${CMAKE_COMMAND} -E echo "protoc path = $<TARGET_FILE:protobuf::protoc> Using Protobuf ${Protobuf_VERSION} Using gRPC ${gRPC_VERSION} have AOT ${LLVM_HAVE_TF_AOT}")


file(GLOB proto_list ${PROTOS_DIRECTORY}/*.proto)
set(proto_dir ${PROTOS_DIRECTORY})
file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/include/grpc)
Expand Down
Loading

0 comments on commit 7ce476e

Please sign in to comment.