Skip to content

Commit

Permalink
add missing classes
Browse files Browse the repository at this point in the history
relates to #9
  • Loading branch information
costin committed Jan 17, 2014
1 parent 2708cca commit 3c822da
Show file tree
Hide file tree
Showing 8 changed files with 601 additions and 0 deletions.
44 changes: 44 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/mr/WritableBytesWriter.java
@@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.mr;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.elasticsearch.hadoop.serialization.JdkBytesWriter;
import org.elasticsearch.hadoop.util.BytesArray;

public class WritableBytesWriter extends JdkBytesWriter {

@Override
public void write(Object from, BytesArray to) {
// handle common cases
if (from instanceof Text) {
Text t = (Text) from;
to.bytes(t.getBytes(), t.getLength());
return;
}
if (from instanceof BytesWritable) {
BytesWritable b = (BytesWritable) from;
to.bytes(b.getBytes(), b.getLength());
return;
}

super.write(from, to);
}
}
77 changes: 77 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/pig/PigBytesWritable.java
@@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.pig;

import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.elasticsearch.hadoop.serialization.JdkBytesWriter;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.BytesArray;

public class PigBytesWritable extends JdkBytesWriter {

@Override
public void write(Object from, BytesArray to) {

// expect PigTuple holding a Tuple with only one field - chararray or bytearray
Assert.isTrue(from instanceof PigTuple,
String.format("Unexpected object type, expecting [%s], given [%s]", PigTuple.class, from.getClass()));

PigTuple pt = (PigTuple) from;
ResourceFieldSchema schema = pt.getSchema();

// unwrap the tuple

ResourceSchema tupleSchema = schema.getSchema();

// empty tuple shortcut
if (tupleSchema == null) {
// write empty doc
to.bytes("{}");
}

ResourceFieldSchema[] fields = tupleSchema.getFields();
Assert.isTrue(fields.length == 1, "When using JSON input, only one field is expected");

Object object;
byte type;

try {
object = pt.getTuple().get(0);
type = pt.getTuple().getType(0);
} catch (Exception ex) {
throw new IllegalStateException("Encountered exception while processing tuple", ex);
}


if (type == DataType.BIGCHARARRAY || type == DataType.CHARARRAY) {
to.bytes(object.toString());
return;
}
if (type == DataType.BYTEARRAY) {
DataByteArray dba = (DataByteArray) object;
to.bytes(dba.get(), dba.size());
return;
}

throw new IllegalArgumentException(String.format("Cannot handle Pig type [%s]; expecting [%s,%s]", object.getClass(), String.class, DataByteArray.class));
}
}
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization;

import org.elasticsearch.hadoop.util.BytesArray;

public interface BytesWriter {

/**
* Writes the given object to the given {@link BytesArray}.
*
* @param from object to write
* @param to storage
*/
void write(Object from, BytesArray to);
}
@@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization;

import org.elasticsearch.hadoop.util.BytesArray;

public class JdkBytesWriter implements BytesWriter {

@Override
public void write(Object from, BytesArray to) {
if (from instanceof byte[]) {
byte[] bytes = (byte[]) from;
to.bytes(bytes, bytes.length);
return;
}
if (from instanceof BytesArray) {
((BytesArray) from).copyTo(to);
return;
}
// fall back toString()
to.bytes(from.toString());
}
}
@@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.integration.cascading;

import java.util.Properties;

import org.elasticsearch.hadoop.cascading.EsTap;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.Test;

import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;

public class CascadingLocalJsonSaveTest {

private static final String INPUT = "src/test/resources/artists.json";

@Test
public void testWriteToES() throws Exception {
// local file-system source
Tap in = new FileTap(new TextLine(new Fields("line")), INPUT);
Tap out = new EsTap("json-cascading-local/artists");

Pipe pipe = new Pipe("copy");
Properties props = new TestSettings().getProperties();
props.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_BYTES_CLASS, "true");
new LocalFlowConnector(props).connect(in, out, pipe).complete();
}

@Test(expected = Exception.class)
public void testIndexAutoCreateDisabled() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.setProperty(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "false");
properties.setProperty(ConfigurationOptions.ES_SERIALIZATION_WRITER_BYTES_CLASS, "true");

// local file-system source
Tap in = new FileTap(new TextLine(new Fields("line")), INPUT);
Tap out = new EsTap("json-cascading-local/non-existing", new Fields("line"));
Pipe pipe = new Pipe("copy");
new LocalFlowConnector(properties).connect(in, out, pipe).complete();
}
}
@@ -0,0 +1,85 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.integration.cascading;

import java.io.OutputStream;
import java.util.Collection;
import java.util.Properties;

import org.elasticsearch.hadoop.cascading.EsTap;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.integration.QueryTestParams;
import org.elasticsearch.hadoop.integration.Stream;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import cascading.flow.local.LocalFlowConnector;
import cascading.operation.AssertionLevel;
import cascading.operation.aggregator.Count;
import cascading.operation.assertion.AssertSizeLessThan;
import cascading.operation.filter.FilterNotNull;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;

@RunWith(Parameterized.class)
public class CascadingLocalJsonSearchTest {

@Parameters
public static Collection<Object[]> queries() {
return QueryTestParams.localParams();
}

private final String indexPrefix = "json-";
private final String query;

public CascadingLocalJsonSearchTest(String query) {
this.query = query;
}

private OutputStream OUT = Stream.NULL.stream();

@Test
public void testReadFromES() throws Exception {
Tap in = new EsTap(indexPrefix + "cascading-local/artists");
Pipe pipe = new Pipe("copy");
pipe = new Each(pipe, new FilterNotNull());
pipe = new Each(pipe, AssertionLevel.STRICT, new AssertSizeLessThan(5));
// can't select when using unknown
//pipe = new Each(pipe, new Fields("name"), AssertionLevel.STRICT, new AssertNotNull());
pipe = new GroupBy(pipe);
pipe = new Every(pipe, new Count());

// print out
Tap out = new OutputStreamTap(new TextLine(), OUT);
new LocalFlowConnector(cfg()).connect(in, out, pipe).complete();
}

private Properties cfg() {
Properties props = new TestSettings().getProperties();
props.put(ConfigurationOptions.ES_QUERY, query);
return props;
}
}

0 comments on commit 3c822da

Please sign in to comment.