Skip to content
This repository has been archived by the owner on Dec 19, 2023. It is now read-only.

Commit

Permalink
Removing base64 decode option and moving expand gzip option to Kinesi…
Browse files Browse the repository at this point in the history
…s handler config (#99)
  • Loading branch information
khanuzair committed Apr 17, 2020
1 parent f028e39 commit ca58a2a
Show file tree
Hide file tree
Showing 17 changed files with 651 additions and 465 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.avro.util;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;

/** Utility to present {@link ByteBuffer} data as an {@link InputStream}. */
public class ByteBufferInputStream extends InputStream {
private List<ByteBuffer> buffers;
private int current;

public ByteBufferInputStream(List<ByteBuffer> buffers) {
this.buffers = buffers;
}

/**
* @see InputStream#read()
*/
@Override
public int read() throws IOException {
ByteBuffer buffer = getBuffer();
if (buffer == null) {
return -1;
}
return buffer.get() & 0xff;
}

/**
* @see InputStream#read(byte[], int, int)
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (len == 0)
return 0;
ByteBuffer buffer = getBuffer();
if (buffer == null) {
return -1;
}
int remaining = buffer.remaining();
if (len > remaining) {
buffer.get(b, off, remaining);
return remaining;
} else {
buffer.get(b, off, len);
return len;
}
}

/**
* Read a buffer from the input without copying, if possible.
*/
public ByteBuffer readBuffer(int length) throws IOException {
if (length == 0)
return ByteBuffer.allocate(0);
ByteBuffer buffer = getBuffer();
if (buffer == null) {
return ByteBuffer.allocate(0);
}
if (buffer.remaining() == length) { // can return current as-is?
current++;
return buffer; // return w/o copying
}
// punt: allocate a new buffer & copy into it
ByteBuffer result = ByteBuffer.allocate(length);
int start = 0;
while (start < length)
start += read(result.array(), start, length - start);
return result;
}

/**
* Returns the next non-empty buffer.
*/
private ByteBuffer getBuffer() throws IOException {
while (current < buffers.size()) {
ByteBuffer buffer = buffers.get(current);
if (buffer.hasRemaining())
return buffer;
current++;
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.nextdoor.bender.handler;

public class KinesisIteratorException extends RuntimeException {

public KinesisIteratorException(Exception e) {
super("Unable to use the iterator", e);
}

public KinesisIteratorException(String msg, Exception e) {
super(msg, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@

package com.nextdoor.bender.deserializer.json;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Base64;
import java.util.List;
import java.util.zip.GZIPInputStream;

import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
Expand All @@ -33,7 +28,6 @@
import com.nextdoor.bender.deserializer.DeserializedEvent;
import com.nextdoor.bender.deserializer.Deserializer;
import com.nextdoor.bender.deserializer.json.GenericJsonDeserializerConfig.FieldConfig;
import org.apache.commons.io.IOUtils;

/**
* Converts a JSON string into a JsonElement object.
Expand All @@ -42,50 +36,21 @@ public class GenericJsonDeserializer extends Deserializer {
protected JsonParser parser;
private final List<FieldConfig> nestedFieldConfigs;
private String rootNodeOverridePath;
private Base64.Decoder base64decoder;
private ByteArrayOutputStream byteArrayOutputStream;
private final boolean performBase64DecodeAndExpandGzip;
private final int bufferSize;

public GenericJsonDeserializer(List<FieldConfig> nestedFieldConfigs) {
this(nestedFieldConfigs, null, false, 1024);
this(nestedFieldConfigs, null);
}

public GenericJsonDeserializer(List<FieldConfig> nestedFieldConfigs,
String rootNodeOverridePath,
boolean performBase64DecodeAndUnzip,
int bufferSize) {
String rootNodeOverridePath) {
this.nestedFieldConfigs = nestedFieldConfigs;
this.rootNodeOverridePath = rootNodeOverridePath;
this.base64decoder = Base64.getDecoder();
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.performBase64DecodeAndExpandGzip = performBase64DecodeAndUnzip;
this.bufferSize = bufferSize;
}

public byte[] readGzipCompressedData(byte[] data) throws IOException {
try (GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(data))) {
IOUtils.copy(gzipInputStream, byteArrayOutputStream, bufferSize);
return byteArrayOutputStream.toByteArray();
} finally {
byteArrayOutputStream.reset(); //clears output so it can be used again later
}
}

@Override
public DeserializedEvent deserialize(String raw) {
GenericJsonEvent devent = new GenericJsonEvent(null);

if (performBase64DecodeAndExpandGzip) {
try {
byte[] decoded = base64decoder.decode(raw);
byte[] unzipped = readGzipCompressedData(decoded);
raw = new String(unzipped);
} catch (Exception e) {
throw new DeserializationException(e);
}
}

JsonElement elm;
try {
elm = parser.parse(raw);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaDefault;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaDescription;
import com.nextdoor.bender.deserializer.DeserializerConfig;

Expand All @@ -33,16 +32,6 @@ public class GenericJsonDeserializerConfig extends DeserializerConfig {
@JsonProperty(required = false)
private List<FieldConfig> nestedFieldConfigs = Collections.emptyList();

@JsonSchemaDescription("When true the deserializer will assume that the raw event strings are base64 encoded and will"
+ "attempt to decode them and then expand the gzip file.")
@JsonSchemaDefault(value = "false")
@JsonProperty(required = false)
private Boolean performBase64DecodeAndExpandGzip = false;

@JsonSchemaDescription("This sets the buffer size (default 1024) when an event object is a gzip and needs to be expanded.")
@JsonProperty(required = false)
private Integer bufferSize = 1024;

@JsonSchemaDescription("Path to a JSON node which is promoted to root node. See https://github.com/jayway/JsonPath")
@JsonProperty(required = false)
private String rootNodeOverridePath;
Expand Down Expand Up @@ -89,22 +78,6 @@ public void setRootNodeOverridePath(String rootNodeOverridePath) {
this.rootNodeOverridePath = rootNodeOverridePath;
}

public Boolean getPerformBase64DecodeAndExpandGzip() {
return performBase64DecodeAndExpandGzip;
}

public void setPerformBase64DecodeAndExpandGzip(Boolean performBase64DecodeAndExpandGzip) {
this.performBase64DecodeAndExpandGzip = performBase64DecodeAndExpandGzip;
}

public Integer getBufferSize() {
return bufferSize;
}

public void setBufferSize(Integer bufferSize) {
this.bufferSize = bufferSize;
}

@Override
public Class<GenericJsonDeserializerFactory> getFactoryClass() {
return GenericJsonDeserializerFactory.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ public class GenericJsonDeserializerFactory implements DeserializerFactory {

@Override
public Deserializer newInstance() {
return new GenericJsonDeserializer(this.config.getNestedFieldConfigs(),
this.config.getRootNodeOverridePath(),
this.config.getPerformBase64DecodeAndExpandGzip(),
this.config.getBufferSize());
return new GenericJsonDeserializer(this.config.getNestedFieldConfigs(), this.config.getRootNodeOverridePath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,68 +45,16 @@ private DeserializedEvent getEvent(String filename) throws IOException {
return getEvent(filename, null);
}

private DeserializedEvent getEvent(String filename, boolean isEncodedAndZipped) throws IOException {
return getEvent(filename, null, isEncodedAndZipped);
}

private DeserializedEvent getEvent(String filename, String path) throws IOException {
return getEvent(filename, path, false);
}

private DeserializedEvent getEvent(String filename, String path, boolean isEncodedAndGzip)
throws IOException {
String input = TestUtils.getResourceString(this.getClass(), filename);
GenericJsonDeserializerConfig.FieldConfig fconfig =
new GenericJsonDeserializerConfig.FieldConfig();
fconfig.setField("MESSAGE");
GenericJsonDeserializer deser = new GenericJsonDeserializer(Arrays.asList(fconfig), path, isEncodedAndGzip, 1024);
GenericJsonDeserializer deser = new GenericJsonDeserializer(Arrays.asList(fconfig), path);
deser.init();
return deser.deserialize(input);
}

@Test
public void testBase64EncodedString() throws IOException {
DeserializedEvent devent = getEvent("base_64_encoded.txt", true);

/*
* Verify payload type
*/
assertNotNull(devent.getPayload());
assertEquals(devent.getPayload().getClass(), JsonObject.class);

/*
* Verify payload data
*/
JsonObject obj = (JsonObject) devent.getPayload();

assertTrue(obj.has("messageType"));
assertTrue(obj.get("messageType").isJsonPrimitive());
assertTrue(obj.get("messageType").getAsJsonPrimitive().isString());

assertTrue(obj.has("owner"));
assertTrue(obj.get("owner").isJsonPrimitive());
assertTrue(obj.get("owner").getAsJsonPrimitive().isString());


assertTrue(obj.has("logGroup"));
assertTrue(obj.get("logGroup").isJsonPrimitive());
assertTrue(obj.get("logGroup").getAsJsonPrimitive().isString());

assertTrue(obj.has("logStream"));
assertTrue(obj.get("logStream").isJsonPrimitive());
assertTrue(obj.get("logStream").getAsJsonPrimitive().isString());

assertTrue(obj.has("subscriptionFilters"));
assertTrue(obj.get("subscriptionFilters").isJsonArray());
assertTrue(obj.get("subscriptionFilters").getAsJsonArray().isJsonArray());

assertTrue(obj.has("logEvents"));
assertTrue(obj.get("logEvents").isJsonArray());
assertTrue(obj.get("logEvents").getAsJsonArray().get(0).getAsJsonObject().has("id"));
assertTrue(obj.get("logEvents").getAsJsonArray().get(0).getAsJsonObject().has("timestamp"));
assertTrue(obj.get("logEvents").getAsJsonArray().get(0).getAsJsonObject().has("message"));
}

@Test
public void testBasicJson() throws UnsupportedEncodingException, IOException {
DeserializedEvent devent = getEvent("basic.json");
Expand Down

This file was deleted.

0 comments on commit ca58a2a

Please sign in to comment.