Skip to content

Commit

Permalink
Serialize Fragment metadata using Kryo (#486)
Browse files Browse the repository at this point in the history
In PXF 5.16.0, we introduced Kryo to serialize Fragment's userData for
Hive profiles. The original version of boot, combines Fragment metadata
and userData into metadata. It also serializes metadata into a escaped
JSON, that is then deserialized by the HttpRequestParser during the
bridge call. This introduces a regression in the Hive metadata
optimization. To reduce the payload size, we now serialize metadata
using kryo instead of JSON. This commit fixes the regression introduced
by the original version of boot.
  • Loading branch information
frankgh committed Nov 18, 2020
1 parent ec3b986 commit 905b47d
Show file tree
Hide file tree
Showing 49 changed files with 434 additions and 598 deletions.
Expand Up @@ -170,20 +170,12 @@ private void openTable() throws IOException {
* This assumption is made through HBase's code as well
*/
private void selectTableSplits() {

HBaseFragmentMetadata metadata = context.getFragmentMetadata();
if (metadata == null) {
throw new IllegalArgumentException("Missing fragment metadata information");
}
try {
byte[] startKey = metadata.getStartKey();
byte[] endKey = metadata.getEndKey();
byte[] startKey = metadata.getStartKey();
byte[] endKey = metadata.getEndKey();

if (withinScanRange(startKey, endKey)) {
splits.add(new SplitBoundary(startKey, endKey));
}
} catch (Exception e) {
throw new RuntimeException("Exception while reading expected fragment metadata", e);
if (withinScanRange(startKey, endKey)) {
splits.add(new SplitBoundary(startKey, endKey));
}
}

Expand Down
Expand Up @@ -78,9 +78,8 @@ public List<Fragment> getFragments() throws Exception {
Properties properties = getSchema(tbl);

for (int i = 0; i < fragmentsNum; i++) {
byte[] userData = hiveUtilities.toKryo(properties);
String filePath = getFilePath(tbl);
fragments.add(new Fragment(filePath, localHosts, new HiveFragmentMetadata(i * SPLIT_SIZE, SPLIT_SIZE, userData)));
fragments.add(new Fragment(filePath, localHosts, new HiveFragmentMetadata(i * SPLIT_SIZE, SPLIT_SIZE, properties)));
}

return fragments;
Expand Down
3 changes: 3 additions & 0 deletions server/pxf-api/build.gradle
Expand Up @@ -45,13 +45,16 @@ dependencies {
implementation("com.fasterxml.woodstox:woodstox-core:5.0.3") { transitive = false }
implementation("com.fasterxml.jackson.core:jackson-core") { transitive = false }
implementation("com.fasterxml.jackson.core:jackson-databind") { transitive = false }
implementation("com.esotericsoftware:kryo:3.0.3") { transitive = false }

/*******************************
* Test Dependencies
*******************************/

testCompileOnly("org.apache.hadoop:hadoop-annotations:${hadoopVersion}")

testImplementation("com.esotericsoftware:minlog:1.3.0")
testImplementation("com.esotericsoftware:reflectasm:1.11.6")
testImplementation('org.mockito:mockito-inline:3.1.0')
testImplementation('org.springframework.boot:spring-boot-starter-test')
}
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.greenplum.pxf.api.OneRow;
import org.greenplum.pxf.api.model.Accessor;
import org.greenplum.pxf.api.model.BasePlugin;
import org.greenplum.pxf.api.utilities.FragmentMetadata;

/**
* Internal interface that would defined the access to a file on HDFS, but in
Expand Down Expand Up @@ -55,14 +54,9 @@ public OneRow readNextObject() {
if (fragmentNumber > 0)
return null; /* signal EOF, close will be called */
int fragment = context.getDataFragment();
FragmentMetadata metadata = context.getFragmentMetadata();
DemoFragmentMetadata demoMetadata = context.getFragmentMetadata();
int colCount = context.getColumns();

if (!(metadata instanceof DemoFragmentMetadata))
throw new IllegalArgumentException("invalid metadata");

DemoFragmentMetadata demoMetadata = (DemoFragmentMetadata) metadata;

/* generate row with (colCount) columns */
StringBuilder colValue = new StringBuilder(demoMetadata.getPath() + " row" + (rowNumber + 1));
for (int colIndex = 1; colIndex < colCount; colIndex++) {
Expand Down
@@ -1,19 +1,18 @@
package org.greenplum.pxf.api.examples;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.greenplum.pxf.api.utilities.FragmentMetadata;

@NoArgsConstructor
public class DemoFragmentMetadata implements FragmentMetadata {

@Getter
@Setter
private String path;

@JsonCreator
public DemoFragmentMetadata(@JsonProperty("path") String path) {
public DemoFragmentMetadata(String path) {
this.path = path;
}
}
Expand Up @@ -3,7 +3,6 @@
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/**
* Base class for all plugin types (Accessor, Resolver, Fragmenter, ...).
Expand All @@ -19,7 +18,6 @@ public class BasePlugin implements Plugin {
/**
* {@inheritDoc}
*/
@Autowired
public void setRequestContext(RequestContext context) {
this.context = context;
this.configuration = context.getConfiguration();
Expand Down
Expand Up @@ -19,16 +19,9 @@

package org.greenplum.pxf.api.utilities;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
* Interface that represents Fragment metadata. Each profile can implement it's
* own metadata object
*/
@JsonIgnoreProperties(value={ "className" }, allowGetters=true)
public interface FragmentMetadata {

default String getClassName() {
return this.getClass().getName();
}
}
@@ -1,55 +1,56 @@
package org.greenplum.pxf.api.utilities;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.commons.codec.binary.Base64;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* This class serializes and deserializes {@link FragmentMetadata} objects into
* JSON.
* This class serializes and deserializes {@link FragmentMetadata} objects.
*/
@Component
public class FragmentMetadataSerDe extends StdSerializer<FragmentMetadata> {

private static final long serialVersionUID = 123173996615107417L;
private static final String CLASSNAME = "className";

private final ObjectMapper mapper;
private final SerializationService serializationService;

/**
* Private constructor to prevent initialization
*/
public FragmentMetadataSerDe() {
public FragmentMetadataSerDe(SerializationService serializationService) {
super(FragmentMetadata.class);
mapper = new ObjectMapper();
this.serializationService = serializationService;
}

@Override
public void serialize(FragmentMetadata value, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeString(mapper.writeValueAsString(value));
}

@SuppressWarnings({"unchecked", "rawtypes"})
public FragmentMetadata deserialize(String json) throws JsonProcessingException {
JsonNode node = mapper.readTree(json);
String className = node.get(CLASSNAME).textValue();

Class klass = getObjectClass(className);
return (FragmentMetadata) mapper.readValue(json, klass);
public void serialize(FragmentMetadata value, JsonGenerator gen, SerializerProvider provider)
throws IOException {
Output out = new Output(4 * 1024, 10 * 1024 * 1024);
Kryo kryo = serializationService.borrowKryo();
try {
kryo.writeClassAndObject(out, value);
out.close();
// Serialized fragment metadata is base64 encoded
gen.writeBinary(out.toBytes());
} finally {
serializationService.releaseKryo(kryo);
}
}

@SuppressWarnings("rawtypes")
private Class getObjectClass(String className) {
@SuppressWarnings("unchecked")
public <T extends FragmentMetadata> T deserialize(String metadata) {
Kryo kryo = serializationService.borrowKryo();
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.getMessage());
byte[] decoded = Base64.decodeBase64(metadata);
return (T) kryo.readClassAndObject(new Input(decoded));
} finally {
serializationService.releaseKryo(kryo);
}
}
}
@@ -0,0 +1,47 @@
package org.greenplum.pxf.api.utilities;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.pool.KryoFactory;
import com.esotericsoftware.kryo.pool.KryoPool;
import org.springframework.stereotype.Service;

/**
* The SerializationService class provides {@link Kryo} instances from a
* {@link KryoPool} for serialization/deserialization of objects.
*/
@Service
public class SerializationService {

private final KryoPool kryoPool;

public SerializationService() {
// A simple factory that creates kryo objects
KryoFactory factory = Kryo::new;
kryoPool = new KryoPool.Builder(factory).softReferences().build();
}

/**
* By default, kryo pool uses ConcurrentLinkedQueue which is unbounded.
* To facilitate reuse of kryo object call releaseKryo() after done using
* the kryo instance. The class loader for the kryo instance will be set
* to current thread's context class loader. The KryoPool uses soft
* references that ensures instances in the queue are deleted when there
* is GC memory pressure.
*
* @return kryo instance
*/
public Kryo borrowKryo() {
Kryo kryo = kryoPool.borrow();
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
}

/**
* Release kryo instance back to the pool.
*
* @param kryo - kryo instance to be released
*/
public void releaseKryo(Kryo kryo) {
kryoPool.release(kryo);
}
}
Expand Up @@ -10,6 +10,5 @@ class DemoFragmentMetadataTest {
public void testDemoFragmentMetadata() {
DemoFragmentMetadata fragmentMetadata = new DemoFragmentMetadata("my-path");
assertEquals("my-path", fragmentMetadata.getPath());
assertEquals("org.greenplum.pxf.api.examples.DemoFragmentMetadata", fragmentMetadata.getClassName());
}
}
@@ -1,11 +1,10 @@
package org.greenplum.pxf.api.utilities;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.greenplum.pxf.api.examples.DemoFragmentMetadata;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -23,7 +22,7 @@ class FragmentMetadataSerDeTest {

@BeforeEach
public void setup() {
metadataSerDe = new FragmentMetadataSerDe();
metadataSerDe = new FragmentMetadataSerDe(new SerializationService());
}

@Test
Expand All @@ -36,60 +35,48 @@ public void testSerialize() throws JsonProcessingException {
mapper.registerModule(module);

DemoFragmentMetadata metadata = new DemoFragmentMetadata("abc");
assertEquals("\"{\\\"path\\\":\\\"abc\\\",\\\"className\\\":\\\"org.greenplum.pxf.api.examples.DemoFragmentMetadata\\\"}\"", mapper.writeValueAsString(metadata));
assertEquals("\"AQBvcmcuZ3JlZW5wbHVtLnB4Zi5hcGkuZXhhbXBsZXMuRGVtb0ZyYWdtZW50TWV0YWRhdOEBAWFi4w==\"",
mapper.writeValueAsString(metadata));

TestFragmentMetadata testMetadata = new TestFragmentMetadata("test", 5, 10, new Date(1590649200000L), "foo".getBytes(StandardCharsets.UTF_8));
assertEquals("\"{\\\"a\\\":\\\"test\\\",\\\"b\\\":5,\\\"c\\\":10,\\\"d\\\":1590649200000,\\\"e\\\":\\\"Zm9v\\\",\\\"className\\\":\\\"org.greenplum.pxf.api.utilities.FragmentMetadataSerDeTest$TestFragmentMetadata\\\"}\"",
assertEquals("\"AQDPAW9yZy5ncmVlbnBsdW0ucHhmLmFwaS51dGlsaXRpZXMuRnJhZ21lbnRNZXRhZGF0YVNlckRlVGVzdCRUZXN0RnJhZ21lbnRNZXRhZGF0YQEBdGVz9AoUAQFqYXZhLnNxbC5EYXTlAYC70tClLgEEZm9v\"",
mapper.writeValueAsString(testMetadata));
}

@Test
public void testDeserialize() throws JsonProcessingException {
public void testDeserialize() {

String metadataJson = "{\"path\": \"deserialize me\", \"className\": \"org.greenplum.pxf.api.examples.DemoFragmentMetadata\" }";
String metadataString = "\"AQBvcmcuZ3JlZW5wbHVtLnB4Zi5hcGkuZXhhbXBsZXMuRGVtb0ZyYWdtZW50TWV0YWRhdOEBAWFi4w==\"";

FragmentMetadata metadata = metadataSerDe.deserialize(metadataJson);
FragmentMetadata metadata = metadataSerDe.deserialize(metadataString);
assertNotNull(metadata);
assertTrue(metadata instanceof DemoFragmentMetadata);
assertEquals("deserialize me", ((DemoFragmentMetadata) metadata).getPath());
assertEquals("abc", ((DemoFragmentMetadata) metadata).getPath());

String testMetadataJson = "{\"b\": 25, \"c\": 150, \"a\": \"test me\", \"d\": \"1590649200000\", \"e\": \"Zm9v\", \"className\": \"org.greenplum.pxf.api.utilities.FragmentMetadataSerDeTest$TestFragmentMetadata\"}";
String testMetadataString = "\"AQDPAW9yZy5ncmVlbnBsdW0ucHhmLmFwaS51dGlsaXRpZXMuRnJhZ21lbnRNZXRhZGF0YVNlckRlVGVzdCRUZXN0RnJhZ21lbnRNZXRhZGF0YQEBdGVz9AoUAQFqYXZhLnNxbC5EYXTlAYC70tClLgEEZm9v\"";

FragmentMetadata testMetadata = metadataSerDe.deserialize(testMetadataJson);
FragmentMetadata testMetadata = metadataSerDe.deserialize(testMetadataString);
assertNotNull(testMetadata);
assertTrue(testMetadata instanceof TestFragmentMetadata);
TestFragmentMetadata testFragmentMetadata = (TestFragmentMetadata) testMetadata;
assertEquals("test me", testFragmentMetadata.getA());
assertEquals(25, testFragmentMetadata.getB());
assertEquals(150, testFragmentMetadata.getC());
assertEquals("test", testFragmentMetadata.getA());
assertEquals(5, testFragmentMetadata.getB());
assertEquals(10, testFragmentMetadata.getC());
assertEquals(new Date(1590649200000L), testFragmentMetadata.getD());
assertEquals("foo", new String(testFragmentMetadata.getE(), StandardCharsets.UTF_8));
}

@NoArgsConstructor
@Getter
static class TestFragmentMetadata implements FragmentMetadata {

@Getter
private final String a;
private String a;
private int b;
private int c;
private Date d;
private byte[] e;

@Getter
private final int b;

@Getter
private final int c;

@Getter
private final Date d;

@Getter
private final byte[] e;

@JsonCreator
public TestFragmentMetadata(
@JsonProperty("a") String a,
@JsonProperty("b") int b,
@JsonProperty("c") int c,
@JsonProperty("d") Date d,
@JsonProperty("e") byte[] e) {
public TestFragmentMetadata(String a, int b, int c, Date d, byte[] e) {
this.a = a;
this.b = b;
this.c = c;
Expand Down
2 changes: 2 additions & 0 deletions server/pxf-hbase/build.gradle
Expand Up @@ -38,6 +38,8 @@ dependencies {
* Test Dependencies
*******************************/

testImplementation("com.esotericsoftware:minlog:1.3.0")
testImplementation("com.esotericsoftware:reflectasm:1.11.6")
testCompileOnly("com.google.code.findbugs:annotations:1.3.9")
testCompileOnly("org.apache.hbase:hbase-annotations:${hbaseVersion}")

Expand Down

0 comments on commit 905b47d

Please sign in to comment.