Skip to content

Commit

Permalink
FLUME-545: Upgrade to Avro 1.5 (and current Jackson).
Browse files Browse the repository at this point in the history
  • Loading branch information
waywardmonkeys committed Mar 16, 2011
1 parent 140614b commit 0b3f8e3
Show file tree
Hide file tree
Showing 23 changed files with 41 additions and 30 deletions.
7 changes: 6 additions & 1 deletion build.xml
Expand Up @@ -138,6 +138,7 @@ to call at top-level: ant deploy-contrib compile-core-test
<fileset dir="${build.lib.dir}">
<include name="**/*.jar" />
<exclude name="**/excluded/" />
<exclude name="**/avro-tools*.jar" />
</fileset>
</path>

Expand Down Expand Up @@ -337,6 +338,10 @@ to call at top-level: ant deploy-contrib compile-core-test
<path id="avro.classpath">
<fileset dir="${lib.dir}">
<include name="**/*.jar"/>
<exclude name="**/slf4j*.jar"/>
</fileset>
<fileset dir="${build.lib.dir}">
<include name="**/*.jar"/>
</fileset>
</path>
<echo message="Compiling avro file: ${avrofile}.avpr"/>
Expand All @@ -346,7 +351,7 @@ to call at top-level: ant deploy-contrib compile-core-test
<arg value="${avrofile}" />
<arg value="${avrofile}.avpr" />
</java>
<java classname="org.apache.avro.specific.SpecificCompiler">
<java classname="org.apache.avro.compiler.specific.SpecificCompiler">
<classpath refid="avro.classpath" />
<arg value="${avrofile}.avpr" />
<arg value="${avro.gen.dir}" />
Expand Down
Binary file removed lib/avro-1.4.0.jar
Binary file not shown.
Binary file added lib/avro-1.5.0.jar
Binary file not shown.
Binary file added lib/avro-compiler-1.5.0.jar
Binary file not shown.
Binary file added lib/avro-ipc-1.5.0.jar
Binary file not shown.
Binary file removed lib/jackson-core-asl-1.4.0.jar
Binary file not shown.
Binary file added lib/jackson-core-asl-1.7.3.jar
Binary file not shown.
Binary file removed lib/jackson-mapper-asl-1.4.0.jar
Binary file not shown.
Binary file added lib/jackson-mapper-asl-1.7.3.jar
Binary file not shown.
Binary file added libbuild/avro-tools-1.5.0.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions src/java/com/cloudera/flume/agent/AvroMasterRPC.java
Expand Up @@ -26,9 +26,9 @@
import java.util.Map;
import java.util.Map.Entry;

import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificRequestor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
4 changes: 2 additions & 2 deletions src/java/com/cloudera/flume/handlers/avro/AvroEventSink.java
Expand Up @@ -21,9 +21,9 @@
import java.net.URL;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.OutputStream;

import org.apache.avro.Schema;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
Expand Down Expand Up @@ -58,13 +59,14 @@ public AvroJsonOutputFormat() {
public void format(OutputStream o, Event e) throws IOException {
if (json == null) {
// first time, no current OutputStream
json = new JsonEncoder(schema, o);
json = EncoderFactory.get().jsonEncoder(schema, o);
cachedOut = o;
}

if (cachedOut != o) {
// different output than last time?
json.init(o);
json.flush();
json = EncoderFactory.get().jsonEncoder(schema, o);
cachedOut = o;
}

Expand Down
Expand Up @@ -19,9 +19,9 @@

import java.io.IOException;

import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.specific.SpecificResponder;
import org.apache.avro.ipc.specific.SpecificResponder;
/**
* This implements the AvroEventServer.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/java/com/cloudera/flume/master/MasterAdminServerAvro.java
Expand Up @@ -24,9 +24,9 @@
import java.util.Map;
import java.util.Map.Entry;

import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.specific.SpecificResponder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Expand Up @@ -24,10 +24,10 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.avro.specific.SpecificResponder;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Expand Up @@ -22,9 +22,9 @@
import java.util.Map;
import java.util.Map.Entry;

import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.specific.SpecificResponder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
Expand Down
2 changes: 1 addition & 1 deletion src/java/com/cloudera/flume/util/AdminRPCAvro.java
Expand Up @@ -28,7 +28,7 @@

import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Expand Up @@ -23,10 +23,10 @@
import java.util.List;
import java.util.Map;

import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.specific.SpecificResponder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.thrift.transport.TTransportException;
import org.junit.Test;
import org.mortbay.log.Log;
Expand Down
4 changes: 2 additions & 2 deletions src/javatest/com/cloudera/flume/agent/TestRPCMechanisms.java
Expand Up @@ -24,10 +24,10 @@
import java.util.List;
import java.util.Map;

import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.specific.SpecificResponder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.mortbay.log.Log;
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectData;
Expand Down Expand Up @@ -119,8 +120,9 @@ public void testSerializeBinary() throws IOException {

ReflectDatumWriter<A> writer = new ReflectDatumWriter<A>(schm);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder json = new BinaryEncoder(out);
writer.write(anA, json);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(anA, encoder);
encoder.flush();

byte[] bs = out.toByteArray();
dump(bs);
Expand All @@ -144,7 +146,7 @@ public void testSerializeJson() throws IOException {

ReflectDatumWriter<A> writer = new ReflectDatumWriter<A>(schm);
ByteArrayOutputStream out = new ByteArrayOutputStream();
JsonEncoder json = new JsonEncoder(schm, out);
JsonEncoder json = EncoderFactory.get().jsonEncoder(schm, out);
writer.write(anA, json);

byte[] bs = out.toByteArray();
Expand All @@ -159,7 +161,7 @@ public void testSerializeJson() throws IOException {

ByteArrayInputStream bais = new ByteArrayInputStream(bs);
ReflectDatumReader<A> reader = new ReflectDatumReader<A>(schm);
Object decoded = reader.read(null, new JsonDecoder(schm, bais));
Object decoded = reader.read(null, DecoderFactory.get().jsonDecoder(schm, bais));
LOG.info(decoded);
}

Expand All @@ -186,7 +188,7 @@ public void testEventSchemaSerializeJson() throws IOException {
ReflectDatumWriter<EventImpl> writer = new ReflectDatumWriter<EventImpl>(
schm);
ByteArrayOutputStream out = new ByteArrayOutputStream();
JsonEncoder json = new JsonEncoder(schm, out);
JsonEncoder json = EncoderFactory.get().jsonEncoder(schm, out);
writer.write(e, json);
json.flush();
byte[] bs = out.toByteArray();
Expand All @@ -196,7 +198,7 @@ public void testEventSchemaSerializeJson() throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(bs);
ReflectDatumReader<EventImpl> reader = new ReflectDatumReader<EventImpl>(
schm);
EventImpl decoded = reader.read(null, new JsonDecoder(schm, bais));
EventImpl decoded = reader.read(null, DecoderFactory.get().jsonDecoder(schm, bais));
LOG.info(decoded);
}

Expand All @@ -212,7 +214,9 @@ public void testEventSchemaSerializeBin() throws IOException {
ReflectDatumWriter<EventImpl> writer = new ReflectDatumWriter<EventImpl>(
schm);
ByteArrayOutputStream out = new ByteArrayOutputStream();
writer.write(e, new BinaryEncoder(out));
Encoder enc = EncoderFactory.get().binaryEncoder(out, null);
writer.write(e, enc);
enc.flush();

byte[] bs = out.toByteArray();
dump(bs);
Expand Down
Expand Up @@ -24,10 +24,10 @@
import java.util.List;
import java.util.Map;

import org.apache.avro.AvroRemoteException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.specific.SpecificResponder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
Expand Down
Expand Up @@ -26,7 +26,7 @@
import java.util.Map;

import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.util.Utf8;
import org.junit.After;
import org.junit.Before;
Expand Down

0 comments on commit 0b3f8e3

Please sign in to comment.