Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fury Example #8

Open
chaokunyang opened this issue Mar 2, 2021 · 7 comments
Open

Fury Example #8

chaokunyang opened this issue Mar 2, 2021 · 7 comments

Comments

@chaokunyang
Copy link
Owner

import pyfury
import pyarrow as pa

class Bar:
  __slots__= ("f1", "f2")

class Foo:
  __slots__= ("f1", "f2", "f3", "f4, "f5")

bar_schema = pa.schema([
    ("f1", pa.int32()),
    ("f2", pa.utf8()),
])
foo_schema = pa.schema([
    ("f1", pa.int32()),
    ("f2", pa.utf8()),
    ("f3", pa.list_(pa.utf8())),
    ("f4", pa.map_(pa.utf8(), pa.int32())),
    pa.field('f5', pa.struct(bar_schema), metadata={"cls": fury.cls(Bar)}),
], metadata={"cls": fury.cls(Foo)})
encoder = pyfury.encoder(foo_schema)

def write(foo):
  return encoder.to_row(foo).to_bytes()

def read_bytes(data_bytes):
  return encoder.from_row(data_bytes)
@chaokunyang
Copy link
Owner Author

chaokunyang commented Mar 2, 2021

Fury fury = Fury.builder().withLanguage(Fury.Language.JAVA)
         .withReferenceTracking(true).build();
SomeClass obj = new SomeClass();
byte[] bytes = fury.serialize(obj);
Object newobj = fury.deserialize(bytes);
SomeClass object = new SomeClass();
{
  Fury fury = Fury.builder().withLanguage(Fury.Language.JAVA)
      .withReferenceTracking(true).build();
  byte[] bytes = fury.serialize(object);
  Object o = fury.deserialize(bytes);
}
{
  ThreadSafeFury fury = Fury.builder().withLanguage(Fury.Language.JAVA)
      .withReferenceTracking(true).buildThreadSafeFury();
  byte[] bytes = fury.serialize(object);
  Object o = fury.deserialize(bytes);
}
{
  ThreadSafeFury fury = new ThreadSafeFury(() -> {
    Fury fury = Fury.builder().withLanguage(Fury.Language.JAVA)
        .withReferenceTracking(true).build();
    fury.register(xxx.class);
    return fury;
  });
  byte[] bytes = fury.serialize(object);
  Object o = fury.deserialize(bytes);
}

@chaokunyang
Copy link
Owner Author

chaokunyang commented Mar 2, 2021

Foo foo = Foo.create();
Encoder<Foo> encoder = Encoders.bean(Foo.class);
int initSize = 512;
MemoryBuffer buffer = MemoryUtils.buffer(initSize);
int numRows = 128;

VectorSchemaRoot root = ArrowUtils.
  createVectorSchemaRoot(encoder.schema());
ArrowWriter arrowWriter = new ArrowWriter(root);
try (ArrowStreamWriter writer = new ArrowStreamWriter(
    root, null, new FuryOutputStream(buffer))) {
  writer.start();
  for (int i = 0; i < numRows; i++) {
    arrowWriter.write(encoder.toRow(foo));
  }
  arrowWriter.finish();
  writer.writeBatch();
  writer.end();
}
Foo foo = Foo.create();
Encoder<Foo> encoder = Encoders.bean(Foo.class);
int initSize = 512;
MemoryBuffer buffer = MemoryUtils.buffer(initSize);
int numRows = 128;

buffer.writerIndex(0);
ArrowWriter arrowWriter = ArrowUtils.createArrowWriter(encoder.schema());
for (int i = 0; i < numRows; i++) {
  BinaryRow row = encoder.toRow(foo);
  arrowWriter.write(row);
}
ArrowRecordBatch recordBatch = arrowWriter.finishAsRecordBatch();
DataTypes.serializeSchema(encoder.schema(), buffer);
ArrowUtils.serializeRecordBatch(recordBatch, buffer);
arrowWriter.reset();
ArrowStreamWriter.writeEndOfStream(new WriteChannel(Channels.newChannel(
    new FuryOutputStream(buffer))), new IpcOption());
def test_record_batch(record_batch_bytes):
    buf = pa.py_buffer(record_batch_bytes)
    reader = pa.ipc.open_stream(buf)
    foo_schema_without_meta = pa.schema(
        [pa.field(f.name, f.type, f.nullable)
         for f in foo_schema])
    assert reader.schema == foo_schema_without_meta
    batches = [batch for batch in reader]
    assert len(batches) == 1
    batch = batches[0]

    encoder = pyfury.encoder(foo_schema)
    writer = pyfury.ArrowWriter(foo_schema)
    num_rows = 128
    for _ in range(num_rows):
        foo = create_foo()
        writer.write(encoder.to_row(foo))
    record_batch = writer.finish()
    assert batch == record_batch

@chaokunyang
Copy link
Owner Author

chaokunyang commented Apr 9, 2021

public class Foo {
  public int f1;
  public String f2;
  public List<String> f3;
  public Map<String, Integer> f4;
  public Bar f45;
}

public class Bar {
  public int f1;
  public String f2;
}

Encoder<Foo> encoder = Encoders.encoder(Foo);

public byte[] write(Foo foo) {
  return encoder.toRow(foo).toBytes();
}

public Foo read(byte[] bytes) {
  BinaryRow newRow = new BinaryRow(encoder.schema());
  newRow.pointTo(bytes, 0, bytes.size());
  return encoder.fromRow(newRow);
}

@chaokunyang
Copy link
Owner Author

chaokunyang commented May 28, 2021

Java API

public class Foo {
  public int f1;
  public Map<String, Bar> f2;
  public Long f3;
}

public class Bar {
  public String f1;
  public List<String> f2;
}

Encoder<Foo> encoder = Encoders
  .encoder(Foo.class);
byte[] bytes = encoder.toBytes(foo))

Python API

@dataclass
class Foo:
    f1: pa.int32()
    f2: Dict[str, 'Bar']
    f3: pa.int64()

@dataclass
class Bar:
    f1: str
    f2: List[str]

encoder = pyfury.encoder(Foo)
new_foo: Foo = encoder\
    .from_bytes(bytes_from_java)

Proto

message Foo {
    int32 f1 = 1;
    map<string, Bar> f2 = 2;
    int64 f3 = 3;
}

message Bar {
    string f1 = 1;
    repeated string f2 = 2;
} 
public static byte[] serialize(Foo foo) {
  Map<String, Generated.Bar> pbF2 =
      foo.f2.entrySet().stream()
      .collect(Collectors.toMap(
          Map.Entry::getKey, e -> {
        Bar bar = e.getValue();
        return Generated.Bar.newBuilder()
            .setF1(bar.f1).addAllF2(bar.f2)
            .build();
      }));
  return Generated.Foo.newBuilder()
      .setF1(foo.f1)
      .putAllF2(pbF2)
      .setF3(foo.f3)
      .build()
      .toByteArray();
}

public Foo deserialize(byte[] bytes) throws Exception {
  Generated.Foo pbFoo = Generated.Foo.parseFrom(bytes);
  Foo foo = new Foo();

  foo.f1 = pbFoo.getF1();
  foo.f2 = pbFoo.getF2Map().entrySet().stream().collect(
      Collectors.toMap(Map.Entry::getKey, e -> {
        Generated.Bar value = e.getValue();
        Bar bar = new Bar();
        bar.f1 = value.getF1();
        bar.f2 = new ArrayList<>(value.getF2List());
        return bar;
      }));
  foo.f3 = pbFoo.getF3();
  return foo;
}

@chaokunyang
Copy link
Owner Author

chaokunyang commented May 29, 2021

Old Java API

// Normal task
RayObject<String> hello = Ray.call(MyClass::echo, "hello");
RayObject<String> res = Ray.call(
        new PyRemoteFunction<>("module", "func", String.class));

// Actor
ActorCreationOptions options = new ActorCreationOptions.Builder()
    .setMaxRestarts(-1).createActorCreationOptions();
RayActor<MyActor> myActor = Ray.createActor(MyActor::new, 10, options);
Integer result = Ray.call(MyActor::increaseAndGet, actor, 1).get();
RayPyActor actor = Ray.createActor(new PyActorClass("module", "func"));
RayObject<byte[]> res = Ray.call(new PyActorMethod<>("fetch", byte[].class), actor);

New Java API

// Normal task
ObjectRef<String> obj = Ray.task(MyClass::echo, "hello").remote();
ObjectRef<String> res = Ray.task(
          PyFunction.of("module", "func", String.class)).remote();

// Actor
ActorHandle<MyActor> myActor = Ray.actor(MyActor::new, 10)
    .setMaxRestarts(-1).remote();
Integer obj = myActor.task(MyActor::increaseAndGet, 1).remote().get();
PyActorHandle pyActor = Ray.actor(PyActorClass.of("module", "Actor")).remote();
ObjectRef<byte[]> res = actor.task(PyActorMethod.of("fetch", byte[].class),

@chaokunyang
Copy link
Owner Author

chaokunyang commented Jun 12, 2021

Ray Java Call Python

public Foo foo(Bar bar, Map<String, List<Integer>> map){
  return new Foo(bar, map);
}

Ray.task(PyFunction.of("ray_demo", "foo", Foo.class),
  bar, map).remote();
pyActorHandle.task(...).remote(bar, map);

Ray Python Call Java

def foo(bar: Bar, map: Dict[str, List[pa.int32()]]) -> Foo:
    return Foo(bar, map)

add_function = ray.java_function(
    "io.ray.Demo", "foo")
foo_ref = add_function.remote(bar, map)
java_actor_handle.foo.remote(bar, map)

@chaokunyang
Copy link
Owner Author

chaokunyang commented Feb 13, 2022

Java API

public class Vertex {
  public int index;
  public Map<String, Double> props;
  public Vertex to
}
fury.registerClass(Vertex.class, "example.Vertex");
List<Vertex> vertices = Arrays.asList(create(), create());
vertices.get(0).to = vertices.get(1);
vertices.get(1).to = vertices.get(0);
byte[] bytes = fury.serialize(vertices));

Python API

@dataclass
class Vertex:
    index: pa.int32()
    props: Dict[str, pa.float64()]
    to: Vertex
fury.registerclass(Vertex, "example.Vertex")
data = [newvertex(), newvertex()]
data[0].to, data[1].to = data[1],  data[0]
serialized = fury.serialize(data)

Golang API

type Vertex struct {
  Index int32
  Props map[string][float64]       
  To Foo
}
fury.RegisterType(*Vertex(nil), "example.Vertex")
data := []*Vertex{CreateVertex(), CreateVertex()}
data[0].to, data[1].to = data[1],  data[0]
serialized := fury.serialize(data)

Decode

public class Vertex {
  public int index;
  public Map<String, Double> props;
  public Vertex to
}
fury.registerClass(Vertex.class, "example.Vertex");
List<Vertex> vertices = fury.deserialize(binary));

Python API

@dataclass
class Vertex:
    index: pa.int32()
    props: Dict[str, pa.float64()]
    to: Vertex
fury.registerclass(Vertex, "example.Vertex")
vertices = fury.deserialize(binary)

Golang API

type Vertex struct {
  Index int32
  Props map[string][float64]       
  To Foo
}
fury.RegisterType(*Vertex(nil), "example.Vertex")
vertices := fury.deserialize(binary).([]*Vertex)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant