In [1]:
import pyarrow as pa
import pyarrow.plasma as plasma

import numpy as np
import cv2
import os

In [3]:
class CustomClass:
    def __init__(self, var1, var2, var3):
        self.var1 = var1
        self.var2 = var2
        self.var3 = var3

In [13]:
plasma_store_path = '/tmp/plasma_store'
client = plasma.connect(plasma_store_path)
client.list()

{}

## Custom Data Types를 직렬화 하려면 context를 지정해 주어야 함

### 일반적인 경우

In [14]:
cc = CustomClass(1, 'a', 0.5)

In [16]:
def _serialize_cc(data): # 직렬화 시 이 method를 실행하여 CustomClass 를 dictionary 형식으로 변환해 주도록 함 
    return {'var1': data.var1,
            'var2': data.var2,
            'var3': data.var3}

def _deserialize_cc(data): # 역직렬화 시 dictionary로 변환된 CustomClass를 다시 CustomClass 형식으로 변환해 주도록 함 
    return CustomClass(data['var1'], data['var2'], data['var3'])
    

In [19]:
cc_ctx = pa.SerializationContext() # serialization context 생성, 위에 만든 직렬화, 역직렬화 시 수행할 함수를 등록해 줌
cc_ctx.register_type(CustomClass, 'example', 
                     custom_serializer=_serialize_cc,
                     custom_deserializer=_deserialize_cc)

In [17]:
serialized_cc_buf = pa.serialize(cc).to_buffer() # 오류 발생 : CustomClass 형식을 직렬화할때 사용할 context 지정 필요

SerializationCallbackError: pyarrow does not know how to serialize objects of type <class '__main__.CustomClass'>.

In [25]:
serialized_cc_buf = pa.serialize(cc, context=cc_ctx).to_buffer() # 성공 
object_id = plasma.ObjectID(b'a'*20)
client.put(serialized_cc_buf, object_id)
client.list()

{ObjectID(6161616161616161616161616161616161616161): {'data_size': 2568,
  'metadata_size': 0,
  'ref_count': 0,
  'create_time': 1595834318,
  'construct_duration': 0,
  'state': 'sealed'}}

In [36]:
[serialized_cc_object] = client.get([object_id]) # get
deserialized_cc_object = pa.deserialize(serialized_cc_object.to_pybytes(), context=cc_ctx) # 역직렬화 context 지정, 역직렬화 시 직렬화된 object에 to_pybytes()를 사용해야 추후 del을 통해 ref 해제 가능
deserialized_cc_object.__dict__ 

{'var1': 1, 'var2': 'a', 'var3': 0.5}

In [37]:
del(serialized_cc_object)
client.delete([object_id])
client.list()

{}

### Class 변수에 object id를 넣는 경우 

In [46]:
object_id_var = plasma.ObjectID(b'b'*20)
cc_obj_id = CustomClass(1, 'a', object_id_var)

In [61]:
serialized_cc_buf = pa.serialize(cc_obj_id, context=cc_ctx).to_buffer() # 실패 : plasma.ObjectID 타입의 object를 직렬화 하는 방법을 모름 -> 지정해 주어야 함 

SerializationCallbackError: pyarrow does not know how to serialize objects of type <class 'pyarrow._plasma.ObjectID'>.

In [63]:
def _serialize_cc_obj_id(data): 
    return {'var1': data.var1,
            'var2': data.var2,
            'var3': data.var3.binary()} # plasma.ObjectID의 경우 .binary() 함수를 이용

def _deserialize_cc_obj_id(data): 
    return CustomClass(data['var1'], data['var2'], 
                       plasma.ObjectID(data['var3'])) # 다시 ObjectID 형식으로 변환

cc_obj_id_ctx = pa.SerializationContext() # serialization context 생성, 위에 만든 직렬화, 역직렬화 시 수행할 함수를 등록해 줌
cc_obj_id_ctx.register_type(CustomClass, 'example2', 
                     custom_serializer=_serialize_cc_obj_id,
                     custom_deserializer=_deserialize_cc_obj_id)

In [69]:
serialized_cc_buf = pa.serialize(cc_obj_id, context=cc_obj_id_ctx).to_buffer() # 성공 
object_id = client.put(serialized_cc_buf)
client.list()

{ObjectID(46d55d4bb0251d8540ae9faca1554a80ec609df7): {'data_size': 2568,
  'metadata_size': 0,
  'ref_count': 0,
  'create_time': 1595834880,
  'construct_duration': 0,
  'state': 'sealed'}}

In [70]:
[serialized_cc_obj_id_object] = client.get([object_id])
deserialized_cc_obj_id_object = pa.deserialize(serialized_cc_obj_id_object.to_pybytes(), context=cc_obj_id_ctx)
deserialized_cc_obj_id_object.__dict__ # var3이 plasma.ObjectID 형식으로 변환되어 역직렬화 된 것 확인 

{'var1': 1,
 'var2': 'a',
 'var3': ObjectID(6262626262626262626262626262626262626262)}

In [72]:
del (serialized_cc_obj_id_object) # clear
client.delete([object_id])
client.list()

{}

## 직렬화/역직렬화 되는 데이터에 포함되는 데이터는 항상 binary 형식으로 변환 되어야 하는 듯 하다
##  pyarrow가 모르는 데이터 타입의 경우에는 수동으로 custom serializer/deserializer -> SerializationContext를 등록해 주어야 함 