From e2fad7ebccf7013aac32651c722a6a9894a78905 Mon Sep 17 00:00:00 2001 From: Ryan Senior Date: Mon, 7 May 2012 08:04:40 -0500 Subject: [PATCH 1/6] Added support for memory taps that are empty --- .../com/twitter/maple/tap/TupleMemoryInputFormat.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/jvm/com/twitter/maple/tap/TupleMemoryInputFormat.java b/src/jvm/com/twitter/maple/tap/TupleMemoryInputFormat.java index a77eb47..bf346de 100644 --- a/src/jvm/com/twitter/maple/tap/TupleMemoryInputFormat.java +++ b/src/jvm/com/twitter/maple/tap/TupleMemoryInputFormat.java @@ -143,7 +143,14 @@ public static List retrieveTuples(JobConf conf, String key) { String[] pieces = s.split(":"); int size = Integer.valueOf(pieces[0]); - byte[] val = decodeBytes(pieces[1]); + + byte[] val; + + if (pieces.length > 1){ + val = decodeBytes(pieces[1]); + }else{ + val = new byte[0]; + } SerializationFactory factory = new SerializationFactory(conf); Deserializer deserializer = factory.getDeserializer(Tuple.class); From be8e69574a878f03c811c3a5c8af779637e81b69 Mon Sep 17 00:00:00 2001 From: Argyris Zymnis Date: Fri, 1 Jun 2012 10:32:16 -0700 Subject: [PATCH 2/6] Upgrade to cascading 2.0 wip 310 --- project.clj | 2 +- src/jvm/com/twitter/maple/hbase/HBaseTap.java | 2 +- src/jvm/com/twitter/maple/jdbc/JDBCTap.java | 14 ++------------ src/jvm/com/twitter/maple/tap/MemorySourceTap.java | 14 ++------------ 4 files changed, 6 insertions(+), 26 deletions(-) diff --git a/project.clj b/project.clj index 16bebc2..9403ebc 100644 --- a/project.clj +++ b/project.clj @@ -9,6 +9,6 @@ [asm "3.2"] [org.apache.hbase/hbase "0.90.2" :exclusions [org.apache.hadoop/hadoop-core asm]] - [cascading/cascading-hadoop "2.0.0-wip-288" + [cascading/cascading-hadoop "2.0.0-wip-310" :exclusions [org.codehaus.janino/janino org.apache.hadoop/hadoop-core]]]) diff --git a/src/jvm/com/twitter/maple/hbase/HBaseTap.java b/src/jvm/com/twitter/maple/hbase/HBaseTap.java index da619a2..a4e5d04 100644 --- a/src/jvm/com/twitter/maple/hbase/HBaseTap.java +++ b/src/jvm/com/twitter/maple/hbase/HBaseTap.java @@ -33,7 +33,7 @@ import cascading.flow.FlowProcess; import cascading.tap.SinkMode; import cascading.tap.Tap; -import cascading.tap.hadoop.HadoopTupleEntrySchemeIterator; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; import cascading.tuple.TupleEntryCollector; import cascading.tuple.TupleEntryIterator; import sun.reflect.generics.reflectiveObjects.NotImplementedException; diff --git a/src/jvm/com/twitter/maple/jdbc/JDBCTap.java b/src/jvm/com/twitter/maple/jdbc/JDBCTap.java index 7828781..166a20e 100644 --- a/src/jvm/com/twitter/maple/jdbc/JDBCTap.java +++ b/src/jvm/com/twitter/maple/jdbc/JDBCTap.java @@ -17,7 +17,7 @@ import cascading.tap.SinkMode; import cascading.tap.Tap; import cascading.tap.TapException; -import cascading.tap.hadoop.HadoopTupleEntrySchemeIterator; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; import cascading.tuple.TupleEntryCollector; import cascading.tuple.TupleEntryIterator; import com.twitter.maple.jdbc.db.DBConfiguration; @@ -281,17 +281,7 @@ private JobConf getSourceConf( FlowProcess flowProcess, JobConf conf, S @Override public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader input ) throws IOException { - // this is only called when, on the client side, a user wants to open a tap for writing on a client - // MultiRecordReader will create a new RecordReader instance for use across any file parts - // or on the cluster side during accumulation for a Join - // - // if custom jobConf properties need to be passed down, use the FlowProcess copy constructor - // - if( input == null ) - return new HadoopTupleEntrySchemeIterator( flowProcess, this ); - - // this is only called cluster task side when Hadoop is providing a RecordReader instance it owns - // during processing of an InputSplit + // This deals with null inputs too return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); } diff --git a/src/jvm/com/twitter/maple/tap/MemorySourceTap.java b/src/jvm/com/twitter/maple/tap/MemorySourceTap.java index 9ce6966..0f6ad15 100644 --- a/src/jvm/com/twitter/maple/tap/MemorySourceTap.java +++ b/src/jvm/com/twitter/maple/tap/MemorySourceTap.java @@ -6,7 +6,7 @@ import cascading.scheme.SourceCall; import cascading.tap.SourceTap; import cascading.tap.Tap; -import cascading.tap.hadoop.HadoopTupleEntrySchemeIterator; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntryIterator; @@ -133,17 +133,7 @@ public boolean equals(Object object) { @Override public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader input ) throws IOException { - // this is only called when, on the client side, a user wants to open a tap for writing on a client - // MultiRecordReader will create a new RecordReader instance for use across any file parts - // or on the cluster side during accumulation for a Join - // - // if custom jobConf properties need to be passed down, use the HadoopFlowProcess copy constructor - // - if( input == null ) - return new HadoopTupleEntrySchemeIterator( flowProcess, this ); - - // this is only called cluster task side when Hadoop is providing a RecordReader instance it owns - // during processing of an InputSplit + // This deals with null inputs too return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); } From bdb547c7a1fbf8bf7391dbe07c9a8ba689e676d8 Mon Sep 17 00:00:00 2001 From: Argyris Zymnis Date: Fri, 1 Jun 2012 10:39:19 -0700 Subject: [PATCH 3/6] Bump version to 0.1.10 --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 9403ebc..54565c5 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.twitter/maple "0.1.9" +(defproject com.twitter/maple "0.1.10" :source-path "src/clj" :java-source-path "src/jvm" :description "All the Cascading taps we have to offer." From 8542e94da003664939d8d8d333eb872c972cc04c Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 5 Jun 2012 10:22:25 -0500 Subject: [PATCH 4/6] bump to Cascading 2.0 and upgrade openForRead method in each tap. --- project.clj | 4 +-- src/jvm/com/twitter/maple/hbase/HBaseTap.java | 28 ++++++++----------- src/jvm/com/twitter/maple/jdbc/JDBCTap.java | 6 ++-- .../twitter/maple/tap/MemorySourceTap.java | 7 ++--- 4 files changed, 19 insertions(+), 26 deletions(-) diff --git a/project.clj b/project.clj index 54565c5..b00ebe1 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.twitter/maple "0.1.10" +(defproject com.twitter/maple "0.2.0-SNAPSHOT" :source-path "src/clj" :java-source-path "src/jvm" :description "All the Cascading taps we have to offer." @@ -9,6 +9,6 @@ [asm "3.2"] [org.apache.hbase/hbase "0.90.2" :exclusions [org.apache.hadoop/hadoop-core asm]] - [cascading/cascading-hadoop "2.0.0-wip-310" + [cascading/cascading-hadoop "2.0.0" :exclusions [org.codehaus.janino/janino org.apache.hadoop/hadoop-core]]]) diff --git a/src/jvm/com/twitter/maple/hbase/HBaseTap.java b/src/jvm/com/twitter/maple/hbase/HBaseTap.java index a4e5d04..b8490dd 100644 --- a/src/jvm/com/twitter/maple/hbase/HBaseTap.java +++ b/src/jvm/com/twitter/maple/hbase/HBaseTap.java @@ -12,15 +12,14 @@ package com.twitter.maple.hbase; -import java.io.IOException; -import java.util.UUID; - +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.mapred.FileInputFormat; @@ -30,14 +29,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import cascading.flow.FlowProcess; -import cascading.tap.SinkMode; -import cascading.tap.Tap; -import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; -import cascading.tuple.TupleEntryCollector; -import cascading.tuple.TupleEntryIterator; import sun.reflect.generics.reflectiveObjects.NotImplementedException; +import java.io.IOException; +import java.util.UUID; + /** * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with the {@HBaseFullScheme} * to allow for the reading and writing of data to and from a HBase cluster. @@ -153,12 +149,12 @@ public void sinkConfInit(FlowProcess process, JobConf conf) { @Override public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) throws IOException { - return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); + return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); } @Override public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) throws IOException { - throw new NotImplementedException(); + throw new NotImplementedException(); } @Override public boolean createResource(JobConf jobConf) throws IOException { diff --git a/src/jvm/com/twitter/maple/jdbc/JDBCTap.java b/src/jvm/com/twitter/maple/jdbc/JDBCTap.java index 166a20e..f708480 100644 --- a/src/jvm/com/twitter/maple/jdbc/JDBCTap.java +++ b/src/jvm/com/twitter/maple/jdbc/JDBCTap.java @@ -279,9 +279,9 @@ private JobConf getSourceConf( FlowProcess flowProcess, JobConf conf, S } @Override - public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader input ) - throws IOException { - // This deals with null inputs too + public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader input ) throws IOException { + // input may be null when this method is called on the client side or cluster side when accumulating + // for a HashJoin return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); } diff --git a/src/jvm/com/twitter/maple/tap/MemorySourceTap.java b/src/jvm/com/twitter/maple/tap/MemorySourceTap.java index 0f6ad15..1d07de3 100644 --- a/src/jvm/com/twitter/maple/tap/MemorySourceTap.java +++ b/src/jvm/com/twitter/maple/tap/MemorySourceTap.java @@ -15,8 +15,6 @@ import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -25,11 +23,9 @@ public class MemorySourceTap extends SourceTap> implements Serializable { - private static final Logger logger = LoggerFactory.getLogger(MemorySourceTap.class); public static class MemorySourceScheme extends Scheme, Void, Object[], Void> { - private static final Logger logger = LoggerFactory.getLogger(MemorySourceScheme.class); private transient List tuples; private final String id; @@ -133,7 +129,8 @@ public boolean equals(Object object) { @Override public TupleEntryIterator openForRead( FlowProcess flowProcess, RecordReader input ) throws IOException { - // This deals with null inputs too + // input may be null when this method is called on the client side or cluster side when accumulating + // for a HashJoin return new HadoopTupleEntrySchemeIterator( flowProcess, this, input ); } From 8c541612dc2d468f66bd5f935085630e4a251bf5 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 5 Jun 2012 11:19:33 -0500 Subject: [PATCH 5/6] optimize imports. --- .../com/twitter/maple/hbase/HBaseScheme.java | 25 +++++++++---------- src/jvm/com/twitter/maple/hbase/HBaseTap.java | 1 - 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/jvm/com/twitter/maple/hbase/HBaseScheme.java b/src/jvm/com/twitter/maple/hbase/HBaseScheme.java index 6b204e8..4b8be88 100644 --- a/src/jvm/com/twitter/maple/hbase/HBaseScheme.java +++ b/src/jvm/com/twitter/maple/hbase/HBaseScheme.java @@ -12,10 +12,15 @@ package com.twitter.maple.hbase; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; - +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -28,15 +33,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import cascading.flow.FlowProcess; -import cascading.scheme.Scheme; -import cascading.scheme.SinkCall; -import cascading.scheme.SourceCall; -import cascading.tap.Tap; -import cascading.tuple.Fields; -import cascading.tuple.Tuple; -import cascading.tuple.TupleEntry; -import cascading.util.Util; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; /** * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to diff --git a/src/jvm/com/twitter/maple/hbase/HBaseTap.java b/src/jvm/com/twitter/maple/hbase/HBaseTap.java index b8490dd..3b461a3 100644 --- a/src/jvm/com/twitter/maple/hbase/HBaseTap.java +++ b/src/jvm/com/twitter/maple/hbase/HBaseTap.java @@ -28,7 +28,6 @@ import org.apache.hadoop.mapred.RecordReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import sun.reflect.generics.reflectiveObjects.NotImplementedException; import java.io.IOException; From eac91543c2606708bd851118e9ee5ce7699ec40b Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 5 Jun 2012 15:13:11 -0500 Subject: [PATCH 6/6] Bump for release --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index b00ebe1..d4ea6f6 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.twitter/maple "0.2.0-SNAPSHOT" +(defproject com.twitter/maple "0.2.0" :source-path "src/clj" :java-source-path "src/jvm" :description "All the Cascading taps we have to offer."