Skip to content

Commit

Permalink
Merge branch 'release/0.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
sritchie committed Jun 5, 2012
2 parents abd5958 + eac9154 commit 41f9817
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 63 deletions.
4 changes: 2 additions & 2 deletions project.clj
@@ -1,4 +1,4 @@
(defproject com.twitter/maple "0.1.9"
(defproject com.twitter/maple "0.2.0"
:source-path "src/clj"
:java-source-path "src/jvm"
:description "All the Cascading taps we have to offer."
Expand All @@ -9,6 +9,6 @@
[asm "3.2"]
[org.apache.hbase/hbase "0.90.2"
:exclusions [org.apache.hadoop/hadoop-core asm]]
[cascading/cascading-hadoop "2.0.0-wip-288"
[cascading/cascading-hadoop "2.0.0"
:exclusions [org.codehaus.janino/janino
org.apache.hadoop/hadoop-core]]])
25 changes: 12 additions & 13 deletions src/jvm/com/twitter/maple/hbase/HBaseScheme.java
Expand Up @@ -12,10 +12,15 @@

package com.twitter.maple.hbase;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.util.Util;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
Expand All @@ -28,15 +33,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.util.Util;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;

/**
* The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to
Expand Down
29 changes: 12 additions & 17 deletions src/jvm/com/twitter/maple/hbase/HBaseTap.java
Expand Up @@ -12,15 +12,14 @@

package com.twitter.maple.hbase;

import java.io.IOException;
import java.util.UUID;

import cascading.flow.FlowProcess;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
Expand All @@ -29,15 +28,11 @@
import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.flow.FlowProcess;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.HadoopTupleEntrySchemeIterator;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import java.io.IOException;
import java.util.UUID;

/**
* The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with the {@HBaseFullScheme}
* to allow for the reading and writing of data to and from a HBase cluster.
Expand Down Expand Up @@ -153,12 +148,12 @@ public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {

@Override public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess,
RecordReader recordReader) throws IOException {
return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
}

@Override public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess,
OutputCollector outputCollector) throws IOException {
throw new NotImplementedException();
throw new NotImplementedException();
}

@Override public boolean createResource(JobConf jobConf) throws IOException {
Expand Down
18 changes: 4 additions & 14 deletions src/jvm/com/twitter/maple/jdbc/JDBCTap.java
Expand Up @@ -17,7 +17,7 @@
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tap.hadoop.HadoopTupleEntrySchemeIterator;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import com.twitter.maple.jdbc.db.DBConfiguration;
Expand Down Expand Up @@ -279,19 +279,9 @@ private JobConf getSourceConf( FlowProcess<JobConf> flowProcess, JobConf conf, S
}

@Override
public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input )
throws IOException {
// this is only called when, on the client side, a user wants to open a tap for writing on a client
// MultiRecordReader will create a new RecordReader instance for use across any file parts
// or on the cluster side during accumulation for a Join
//
// if custom jobConf properties need to be passed down, use the FlowProcess<JobConf> copy constructor
//
if( input == null )
return new HadoopTupleEntrySchemeIterator( flowProcess, this );

// this is only called cluster task side when Hadoop is providing a RecordReader instance it owns
// during processing of an InputSplit
public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader input ) throws IOException {
// input may be null when this method is called on the client side or cluster side when accumulating
// for a HashJoin
return new HadoopTupleEntrySchemeIterator( flowProcess, this, input );
}

Expand Down
19 changes: 3 additions & 16 deletions src/jvm/com/twitter/maple/tap/MemorySourceTap.java
Expand Up @@ -6,7 +6,7 @@
import cascading.scheme.SourceCall;
import cascading.tap.SourceTap;
import cascading.tap.Tap;
import cascading.tap.hadoop.HadoopTupleEntrySchemeIterator;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryIterator;
Expand All @@ -15,8 +15,6 @@
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -25,11 +23,9 @@

public class MemorySourceTap extends SourceTap<JobConf, RecordReader<TupleWrapper, NullWritable>>
implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(MemorySourceTap.class);

public static class MemorySourceScheme
extends Scheme<JobConf, RecordReader<TupleWrapper, NullWritable>, Void, Object[], Void> {
private static final Logger logger = LoggerFactory.getLogger(MemorySourceScheme.class);

private transient List<Tuple> tuples;
private final String id;
Expand Down Expand Up @@ -133,17 +129,8 @@ public boolean equals(Object object) {
@Override
public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader<TupleWrapper,
NullWritable> input ) throws IOException {
// this is only called when, on the client side, a user wants to open a tap for writing on a client
// MultiRecordReader will create a new RecordReader instance for use across any file parts
// or on the cluster side during accumulation for a Join
//
// if custom jobConf properties need to be passed down, use the HadoopFlowProcess copy constructor
//
if( input == null )
return new HadoopTupleEntrySchemeIterator( flowProcess, this );

// this is only called cluster task side when Hadoop is providing a RecordReader instance it owns
// during processing of an InputSplit
// input may be null when this method is called on the client side or cluster side when accumulating
// for a HashJoin
return new HadoopTupleEntrySchemeIterator( flowProcess, this, input );
}

Expand Down
9 changes: 8 additions & 1 deletion src/jvm/com/twitter/maple/tap/TupleMemoryInputFormat.java
Expand Up @@ -143,7 +143,14 @@ public static List<Tuple> retrieveTuples(JobConf conf, String key) {

String[] pieces = s.split(":");
int size = Integer.valueOf(pieces[0]);
byte[] val = decodeBytes(pieces[1]);

byte[] val;

if (pieces.length > 1){
val = decodeBytes(pieces[1]);
}else{
val = new byte[0];
}

SerializationFactory factory = new SerializationFactory(conf);
Deserializer<Tuple> deserializer = factory.getDeserializer(Tuple.class);
Expand Down

0 comments on commit 41f9817

Please sign in to comment.