Skip to content

Commit

Permalink
Backport #1886 (Fix Python memory issues). (#1913)
Browse files Browse the repository at this point in the history
  • Loading branch information
matiasinsaurralde authored and buger committed Sep 17, 2018
1 parent bc15488 commit 2209455
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 42 deletions.
2 changes: 1 addition & 1 deletion coprocess/lua/binding.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

static void LuaInit();

static struct CoProcessMessage* LuaDispatchHook(struct CoProcessMessage*);
static void LuaDispatchHook(struct CoProcessMessage*, struct CoProcessMessage*);
static void LuaDispatchEvent(char*);

void LoadCachedMiddleware(void*);
Expand Down
2 changes: 1 addition & 1 deletion coprocess/native_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
// Dispatcher defines a basic interface for the CP dispatcher, check PythonDispatcher for reference.
type Dispatcher interface {
// Dispatch takes and returns a pointer to a CoProcessMessage struct, see coprocess/api.h for details. This is used by CP bindings.
Dispatch(unsafe.Pointer) unsafe.Pointer
Dispatch(unsafe.Pointer, unsafe.Pointer)

// DispatchEvent takes an event JSON, as bytes. Doesn't return.
DispatchEvent([]byte)
Expand Down
2 changes: 1 addition & 1 deletion coprocess/python/binding.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ static int Python_LoadDispatcher();
static int Python_NewDispatcher(char*, char*, char*);
static void Python_ReloadDispatcher();

static struct CoProcessMessage* Python_DispatchHook(struct CoProcessMessage*);
static void Python_DispatchHook(struct CoProcessMessage*, struct CoProcessMessage*);
static void Python_DispatchEvent(char*);

#endif
13 changes: 5 additions & 8 deletions coprocess_lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ static void LoadMiddlewareIntoState(lua_State* L, char* middleware_name, char* m
luaL_dostring(L, middleware_contents);
}
static struct CoProcessMessage* LuaDispatchHook(struct CoProcessMessage* object) {
struct CoProcessMessage* outputObject = malloc(sizeof *outputObject);
static void LuaDispatchHook(struct CoProcessMessage* object, struct CoProcessMessage* outputObject) {
lua_State *L = luaL_newstate();
luaL_openlibs(L);
Expand All @@ -52,7 +49,7 @@ static struct CoProcessMessage* LuaDispatchHook(struct CoProcessMessage* object)
outputObject->p_data = (void*)output;
outputObject->length = lua_output_length;
return outputObject;
return;
}
static void LuaDispatchEvent(char* event_json) {
Expand Down Expand Up @@ -107,10 +104,10 @@ type LuaDispatcher struct {
}

// Dispatch takes a CoProcessMessage and sends it to the CP.
func (d *LuaDispatcher) Dispatch(objectPtr unsafe.Pointer) unsafe.Pointer {
func (d *LuaDispatcher) Dispatch(objectPtr unsafe.Pointer, newObjectPtr unsafe.Pointer) {
object := (*C.struct_CoProcessMessage)(objectPtr)
newObjectPtr := C.LuaDispatchHook(object)
return unsafe.Pointer(newObjectPtr)
newObject := (*C.struct_CoProcessMessage)(newObjectPtr)
C.LuaDispatchHook(object, newObject)
}

// Reload will perform a middleware reload when a hot reload is triggered.
Expand Down
28 changes: 21 additions & 7 deletions coprocess_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package main
import "C"

import (
"errors"

"github.com/golang/protobuf/proto"

"github.com/TykTechnologies/tyk/coprocess"
Expand All @@ -29,13 +31,20 @@ import (

// Dispatch prepares a CoProcessMessage, sends it to the GlobalDispatcher and gets a reply.
func (c *CoProcessor) Dispatch(object *coprocess.Object) (*coprocess.Object, error) {
if GlobalDispatcher == nil {
return nil, errors.New("Dispatcher not initialized")
}

var objectMsg []byte
var err error
switch MessageType {
case coprocess.ProtobufMessage:
objectMsg, _ = proto.Marshal(object)
objectMsg, err = proto.Marshal(object)
case coprocess.JsonMessage:
objectMsg, _ = json.Marshal(object)
objectMsg, err = json.Marshal(object)
}
if err != nil {
return nil, err
}

objectMsgStr := string(objectMsg)
Expand All @@ -46,21 +55,26 @@ func (c *CoProcessor) Dispatch(object *coprocess.Object) (*coprocess.Object, err
objectPtr.p_data = unsafe.Pointer(CObjectStr)
objectPtr.length = C.int(len(objectMsg))

newObjectPtr := (*C.struct_CoProcessMessage)(GlobalDispatcher.Dispatch(unsafe.Pointer(objectPtr)))
newObjectPtr := (*C.struct_CoProcessMessage)(C.malloc(C.size_t(unsafe.Sizeof(C.struct_CoProcessMessage{}))))

// Call the dispatcher (objectPtr is freed during this call):
GlobalDispatcher.Dispatch(unsafe.Pointer(objectPtr), unsafe.Pointer(newObjectPtr))
newObjectBytes := C.GoBytes(newObjectPtr.p_data, newObjectPtr.length)

newObject := &coprocess.Object{}

switch MessageType {
case coprocess.ProtobufMessage:
proto.Unmarshal(newObjectBytes, newObject)
err = proto.Unmarshal(newObjectBytes, newObject)
case coprocess.JsonMessage:
json.Unmarshal(newObjectBytes, newObject)
err = json.Unmarshal(newObjectBytes, newObject)
}
if err != nil {
return nil, err
}

C.free(unsafe.Pointer(CObjectStr))
C.free(unsafe.Pointer(objectPtr))
// Free the returned object memory:
C.free(unsafe.Pointer(newObjectPtr.p_data))
C.free(unsafe.Pointer(newObjectPtr))

return newObject, nil
Expand Down
51 changes: 27 additions & 24 deletions coprocess_python.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,34 +124,36 @@ static void Python_SetEnv(char* python_path) {
setenv("PYTHONPATH", python_path, 1 );
}
static struct CoProcessMessage* Python_DispatchHook(struct CoProcessMessage* object) {
struct CoProcessMessage* outputObject = malloc(sizeof *outputObject);
static void Python_DispatchHook(struct CoProcessMessage* object, struct CoProcessMessage* new_object) {
if (object->p_data == NULL) {
return outputObject;
free(object);
return;
}
gilState = PyGILState_Ensure();
PyObject *args = PyTuple_Pack( 1, PyBytes_FromStringAndSize(object->p_data, object->length) );
PyObject *result = PyObject_CallObject( dispatcher_hook, args );
PyObject* input = PyBytes_FromStringAndSize(object->p_data, object->length);
PyObject* args = PyTuple_Pack( 1, input );
PyObject* result = PyObject_CallObject( dispatcher_hook, args );
free(object->p_data);
free(object);
Py_DECREF(input);
Py_DECREF(args);
if( result == NULL ) {
PyErr_Print();
} else {
PyObject* new_object_msg_item = PyTuple_GetItem( result, 0 );
char* output = PyBytes_AsString(new_object_msg_item);
PyObject* new_object_msg_length = PyTuple_GetItem( result, 1 );
int msg_length = PyLong_AsLong(new_object_msg_length);
outputObject->p_data = (void*)output;
outputObject->length = msg_length;
PyGILState_Release(gilState);
return;
}
PyObject* new_object_msg_item = PyTuple_GetItem( result, 0 );
char* output = PyBytes_AsString(new_object_msg_item);
PyObject* new_object_msg_length = PyTuple_GetItem( result, 1 );
int msg_length = PyLong_AsLong(new_object_msg_length);
// Copy the message in order to avoid accessing the result PyObject internal buffer:
char* output_copy = malloc(msg_length);
memcpy(output_copy, output, msg_length);
Py_DECREF(result);
new_object->p_data= (void*)output_copy;
new_object->length = msg_length;
PyGILState_Release(gilState);
return outputObject;
return;
}
static void Python_DispatchEvent(char* event_json) {
Expand Down Expand Up @@ -191,10 +193,11 @@ type PythonDispatcher struct {
}

// Dispatch takes a CoProcessMessage and sends it to the CP.
func (d *PythonDispatcher) Dispatch(objectPtr unsafe.Pointer) unsafe.Pointer {
func (d *PythonDispatcher) Dispatch(objectPtr unsafe.Pointer, newObjectPtr unsafe.Pointer) {
object := (*C.struct_CoProcessMessage)(objectPtr)
newObjectPtr := C.Python_DispatchHook(object)
return unsafe.Pointer(newObjectPtr)
newObject := (*C.struct_CoProcessMessage)(newObjectPtr)
// TODO: restore error result
C.Python_DispatchHook(object, newObject)
}

// DispatchEvent dispatches a Tyk event.
Expand Down

0 comments on commit 2209455

Please sign in to comment.