diff --git a/project.clj b/project.clj index 16bebc2..d4ea6f6 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.twitter/maple "0.1.9" +(defproject com.twitter/maple "0.2.0" :source-path "src/clj" :java-source-path "src/jvm" :description "All the Cascading taps we have to offer." @@ -9,6 +9,6 @@ [asm "3.2"] [org.apache.hbase/hbase "0.90.2" :exclusions [org.apache.hadoop/hadoop-core asm]] - [cascading/cascading-hadoop "2.0.0-wip-288" + [cascading/cascading-hadoop "2.0.0" :exclusions [org.codehaus.janino/janino org.apache.hadoop/hadoop-core]]]) diff --git a/src/jvm/com/twitter/maple/hbase/HBaseScheme.java b/src/jvm/com/twitter/maple/hbase/HBaseScheme.java index 6b204e8..4b8be88 100644 --- a/src/jvm/com/twitter/maple/hbase/HBaseScheme.java +++ b/src/jvm/com/twitter/maple/hbase/HBaseScheme.java @@ -12,10 +12,15 @@ package com.twitter.maple.hbase; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; - +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -28,15 +33,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import cascading.flow.FlowProcess; -import cascading.scheme.Scheme; -import cascading.scheme.SinkCall; -import cascading.scheme.SourceCall; -import cascading.tap.Tap; -import cascading.tuple.Fields; -import cascading.tuple.Tuple; -import cascading.tuple.TupleEntry; -import cascading.util.Util; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; /** * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to diff --git a/src/jvm/com/twitter/maple/hbase/HBaseTap.java b/src/jvm/com/twitter/maple/hbase/HBaseTap.java index da619a2..3b461a3 100644 --- a/src/jvm/com/twitter/maple/hbase/HBaseTap.java +++ b/src/jvm/com/twitter/maple/hbase/HBaseTap.java @@ -12,15 +12,14 @@ package com.twitter.maple.hbase; -import java.io.IOException; -import java.util.UUID; - +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapred.FileInputFormat; @@ -29,15 +28,11 @@ import org.apache.hadoop.mapred.RecordReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import cascading.flow.FlowProcess; -import cascading.tap.SinkMode; -import cascading.tap.Tap; -import cascading.tap.hadoop.HadoopTupleEntrySchemeIterator; -import cascading.tuple.TupleEntryCollector; -import cascading.tuple.TupleEntryIterator; import sun.reflect.generics.reflectiveObjects.NotImplementedException; +import java.io.IOException; +import java.util.UUID; + /** * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with the {@HBaseFullScheme} * to allow for the reading and writing of data to and from a HBase cluster. @@ -153,12 +148,12 @@ public void sinkConfInit(FlowProcess process, JobConf conf) { @Override public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) throws IOException { - return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); + return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); } @Override public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) throws IOException { - throw new NotImplementedException(); + throw new NotImplementedException(); } @Override public boolean createResource(JobConf jobConf) throws IOException { diff --git a/src/jvm/com/twitter/maple/jdbc/JDBCTap.java b/src/jvm/com/twitter/maple/jdbc/JDBCTap.java index 7828781..f708480 100644 --- a/src/jvm/com/twitter/maple/jdbc/JDBCTap.java +++ b/src/jvm/com/twitter/maple/jdbc/JDBCTap.java @@ -17,7 +17,7 @@ import cascading.tap.SinkMode; import cascading.tap.Tap; import cascading.tap.TapException; -import cascading.tap.hadoop.HadoopTupleEntrySchemeIterator; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; import cascading.tuple.TupleEntryCollector; import cascading.tuple.TupleEntryIterator; import com.twitter.maple.jdbc.db.DBConfiguration; @@ -279,19 +279,9 @@ private JobConf getSourceConf( FlowProcess flowProcess, JobConf conf, S } @Override - public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader input ) - throws IOException { - // this is only called when, on the client side, a user wants to open a tap for writing on a client - // MultiRecordReader will create a new RecordReader instance for use across any file parts - // or on the cluster side during accumulation for a Join - // - // if custom jobConf properties need to be passed down, use the FlowProcess copy constructor - // - if( input == null ) - return new HadoopTupleEntrySchemeIterator( flowProcess, this ); - - // this is only called cluster task side when Hadoop is providing a RecordReader instance it owns - // during processing of an InputSplit + public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader input ) throws IOException { + // input may be null when this method is called on the client side or cluster side when accumulating + // for a HashJoin return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); } diff --git a/src/jvm/com/twitter/maple/tap/MemorySourceTap.java b/src/jvm/com/twitter/maple/tap/MemorySourceTap.java index 9ce6966..1d07de3 100644 --- a/src/jvm/com/twitter/maple/tap/MemorySourceTap.java +++ b/src/jvm/com/twitter/maple/tap/MemorySourceTap.java @@ -6,7 +6,7 @@ import cascading.scheme.SourceCall; import cascading.tap.SourceTap; import cascading.tap.Tap; -import cascading.tap.hadoop.HadoopTupleEntrySchemeIterator; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntryIterator; @@ -15,8 +15,6 @@ import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -25,11 +23,9 @@ public class MemorySourceTap extends SourceTap> implements Serializable { - private static final Logger logger = LoggerFactory.getLogger(MemorySourceTap.class); public static class MemorySourceScheme extends Scheme, Void, Object[], Void> { - private static final Logger logger = LoggerFactory.getLogger(MemorySourceScheme.class); private transient List tuples; private final String id; @@ -133,17 +129,8 @@ public boolean equals(Object object) { @Override public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader input ) throws IOException { - // this is only called when, on the client side, a user wants to open a tap for writing on a client - // MultiRecordReader will create a new RecordReader instance for use across any file parts - // or on the cluster side during accumulation for a Join - // - // if custom jobConf properties need to be passed down, use the HadoopFlowProcess copy constructor - // - if( input == null ) - return new HadoopTupleEntrySchemeIterator( flowProcess, this ); - - // this is only called cluster task side when Hadoop is providing a RecordReader instance it owns - // during processing of an InputSplit + // input may be null when this method is called on the client side or cluster side when accumulating + // for a HashJoin return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); } diff --git a/src/jvm/com/twitter/maple/tap/TupleMemoryInputFormat.java b/src/jvm/com/twitter/maple/tap/TupleMemoryInputFormat.java index a77eb47..bf346de 100644 --- a/src/jvm/com/twitter/maple/tap/TupleMemoryInputFormat.java +++ b/src/jvm/com/twitter/maple/tap/TupleMemoryInputFormat.java @@ -143,7 +143,14 @@ public static List retrieveTuples(JobConf conf, String key) { String[] pieces = s.split(":"); int size = Integer.valueOf(pieces[0]); - byte[] val = decodeBytes(pieces[1]); + + byte[] val; + + if (pieces.length > 1){ + val = decodeBytes(pieces[1]); + }else{ + val = new byte[0]; + } SerializationFactory factory = new SerializationFactory(conf); Deserializer deserializer = factory.getDeserializer(Tuple.class);