Skip to content

Commit

Permalink
Add compatibility with ES 2.0
Browse files Browse the repository at this point in the history
Remove compatibility with ES 0.90

relates elastic#524
relates elastic#525
  • Loading branch information
costin committed Aug 18, 2015
1 parent 83f1781 commit ce4df55
Show file tree
Hide file tree
Showing 36 changed files with 368 additions and 299 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Expand Up @@ -208,8 +208,10 @@ allprojects { project ->
// force the use of commons-http from Hadoop 1.x
// to avoid SLF4J warnings, force only one version
force 'commons-httpclient:commons-httpclient:3.0.1' //, 'org.slf4j:slf4j-log4j12:1.7.6', 'org.slf4j:slf4j-api:1.7.6'
force 'com.google.guava:guava:14.0.1' // force guava version for Spark
//force 'com.google.guava:guava:14.0.1' // force guava version for Spark
force 'joda-time:joda-time:2.8' // force guava version for Spark
force "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion", "org.codehaus.jackson:jackson-core-asl:$jacksonVersion"
force "commons-cli:commons-cli:1.2"

eachDependency { details ->
// for slf4j use each dependency since there are so many variants
Expand Down
Expand Up @@ -30,9 +30,9 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.elasticsearch.hadoop.cfg.HadoopSettingsManager;
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.HadoopSettingsManager;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
Expand Down Expand Up @@ -64,7 +64,7 @@ class EsHadoopScheme extends Scheme<JobConf, RecordReader, OutputCollector, Obje
private final String nodes;
private final int port;
private final Properties props;
private boolean IS_ES_10;
private boolean IS_ES_20;

private static Log log = LogFactory.getLog(EsHadoopScheme.class);

Expand All @@ -91,7 +91,7 @@ public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[],
Settings settings = loadSettings(flowProcess.getConfigCopy(), true);
context[2] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_10 = SettingsUtils.isEs10(settings);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

@Override
Expand All @@ -110,7 +110,7 @@ public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], Out
Settings settings = loadSettings(flowProcess.getConfigCopy(), false);
context[0] = CascadingUtils.fieldToAlias(settings, getSinkFields());
sinkCall.setContext(context);
IS_ES_10 = SettingsUtils.isEs10(settings);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

public void sinkCleanup(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
Expand Down Expand Up @@ -176,24 +176,17 @@ public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], Rec
if (entry.getFields().isDefined()) {
// lookup using writables
Text lookupKey = new Text();
// TODO: it's worth benchmarking whether using an index/offset yields significantly better performance
for (Comparable<?> field : entry.getFields()) {
if (IS_ES_10) {
// check for multi-level alias
Object result = data;
for (String level : StringUtils.tokenize(alias.toES(field.toString()), ".")) {
lookupKey.set(level);
result = ((Map) result).get(lookupKey);
if (result == null) {
break;
}
// check for multi-level alias (since ES 1.0)
Object result = data;
for (String level : StringUtils.tokenize(alias.toES(field.toString()), ".")) {
lookupKey.set(level);
result = ((Map) result).get(lookupKey);
if (result == null) {
break;
}
CascadingUtils.setObject(entry, field, result);
}
else {
lookupKey.set(alias.toES(field.toString()));
CascadingUtils.setObject(entry, field, data.get(lookupKey));
}
CascadingUtils.setObject(entry, field, result);
}
}
else {
Expand Down
Expand Up @@ -24,8 +24,8 @@
import java.util.Properties;

import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.HadoopSettingsManager;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.mr.Counter;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.RestRepository;
Expand Down Expand Up @@ -58,7 +58,7 @@ class EsLocalScheme extends Scheme<Properties, ScrollQuery, Object, Object[], Ob
private final Properties props;
private transient RestRepository client;

private boolean IS_ES_10;
private boolean IS_ES_20;

EsLocalScheme(String host, int port, String index, String query, Fields fields, Properties props) {
this.resource = index;
Expand All @@ -80,7 +80,7 @@ public void sourcePrepare(FlowProcess<Properties> flowProcess, SourceCall<Object
Settings settings = HadoopSettingsManager.loadFrom(flowProcess.getConfigCopy()).merge(props);
context[0] = CascadingUtils.alias(settings);
sourceCall.setContext(context);
IS_ES_10 = SettingsUtils.isEs10(settings);
IS_ES_20 = SettingsUtils.isEs20(settings);
}

@Override
Expand Down Expand Up @@ -158,23 +158,16 @@ public boolean source(FlowProcess<Properties> flowProcess, SourceCall<Object[],

if (entry.getFields().isDefined()) {
// lookup using writables
// TODO: it's worth benchmarking whether using an index/offset yields significantly better performance
for (Comparable<?> field : entry.getFields()) {
if (IS_ES_10) {
Object result = data;
// check for multi-level alias
for (String level : StringUtils.tokenize(alias.toES(field.toString()), ".")) {
result = ((Map) result).get(level);
if (result == null) {
break;
}
Object result = data;
// check for multi-level alias
for (String level : StringUtils.tokenize(alias.toES(field.toString()), ".")) {
result = ((Map) result).get(level);
if (result == null) {
break;
}
entry.setObject(field, result);
}
else {
//NB: coercion should be applied automatically by the TupleEntry
entry.setObject(field, data.get(alias.toES(field.toString())));
}
entry.setObject(field, result);
}
}
else {
Expand Down
178 changes: 89 additions & 89 deletions dist.gradle
Expand Up @@ -8,30 +8,30 @@ def customizePom(pom, gradleProject) {
generatedPom.dependencies.removeAll { dep ->
dep.scope == 'test' || dep.artifactId == 'elasticsearch-hadoop-mr'
}
// for es-hadoop optional is best served as provided/optional vs compile/optional
generatedPom.dependencies.findAll { it.optional == true }.each {
it.scope = "provided"
}
ext.cascading = generatedPom.dependencies.any { it.groupId == 'cascading' }
ext.storm = generatedPom.dependencies.any { it.groupId == 'org.apache.storm' }
if (cascading || storm)
generatedPom.project {
repositories {
if (cascading)
repository {
id = 'conjars.org'
url = 'http://conjars.org/repo'
}
if (storm)
repository {
id = 'clojars.org'
url = 'http://clojars.org/repo'
}
}
}
// for es-hadoop optional is best served as provided/optional vs compile/optional
generatedPom.dependencies.findAll { it.optional == true }.each {
it.scope = "provided"
}
ext.cascading = generatedPom.dependencies.any { it.groupId == 'cascading' }
ext.storm = generatedPom.dependencies.any { it.groupId == 'org.apache.storm' }
if (cascading || storm)
generatedPom.project {
repositories {
if (cascading)
repository {
id = 'conjars.org'
url = 'http://conjars.org/repo'
}
if (storm)
repository {
id = 'clojars.org'
url = 'http://clojars.org/repo'
}
}
}

// add all items necessary for maven central publication
generatedPom.project {
Expand Down Expand Up @@ -65,42 +65,42 @@ def customizePom(pom, gradleProject) {
}
}
}
groupId = "org.elasticsearch"
artifactId = project.archivesBaseName
}
groupId = "org.elasticsearch"
artifactId = project.archivesBaseName
}
}

ext.deployUsername = { project.hasProperty("repoUsername") ? getProperty("repoUsername") : "" }
ext.deployPassword = { project.hasProperty("repoPassword") ? getProperty("repoPassword") : "" }

repositories {
flatDir {
name "fileRepo"
dirs "repo"
}
flatDir {
name "fileRepo"
dirs "repo"
}
}

uploadArchives {
repositories {
//add project.repositories.fileRepo
mavenDeployer {
customizePom(pom, project)
repository(url: "https://oss.sonatype.org/service/local/staging/deploy/maven2/") {
authentication(userName: deployUsername(), password: deployPassword())
repositories {
//add project.repositories.fileRepo
mavenDeployer {
customizePom(pom, project)
repository(url: "https://oss.sonatype.org/service/local/staging/deploy/maven2/") {
authentication(userName: deployUsername(), password: deployPassword())
}
snapshotRepository(url: "https://oss.sonatype.org/content/repositories/snapshots/") {
authentication(userName: deployUsername(), password: deployPassword())
}
}
}
snapshotRepository(url: "https://oss.sonatype.org/content/repositories/snapshots/") {
authentication(userName: deployUsername(), password: deployPassword())
}
}
}
}

install {
repositories.mavenInstaller {
customizePom(pom, project)
}
repositories.mavenInstaller {
customizePom(pom, project)
}
}


Expand All @@ -112,50 +112,50 @@ install {
// need to share it across scripts so the method is defined as a task instead of a def
// lazy depends could work but we avoid it since not all projects use distZip
task uploadToS3() {
group = "Distribution"
description = "Upload ZIPs to S3"
group = "Distribution"
description = "Upload ZIPs to S3"

logging.level = LogLevel.INFO
// execution phase only
doLast() {
// distZip might not create an s3 config so check its existence first
if (configurations.find({ it.name == 's3' })) {
uploadArtifactsToS3(project, toDir)
}
}
logging.level = LogLevel.INFO
// execution phase only
doLast() {
// distZip might not create an s3 config so check its existence first
if (configurations.find({ it.name == 's3' })) {
uploadArtifactsToS3(project, toDir)
}
}
}
def uploadArtifactsToS3(target, toDir) {
configurations { antlibs }
dependencies {
antlibs "org.springframework.build:org.springframework.build.aws.ant:3.0.6.RELEASE"
antlibs "net.java.dev.jets3t:jets3t:0.8.1"
}
configurations { antlibs }
dependencies {
antlibs "org.springframework.build:org.springframework.build.aws.ant:3.0.6.RELEASE"
antlibs "net.java.dev.jets3t:jets3t:0.8.1"
}

// see ant logging
target.logging.level = LogLevel.INFO
ant {
taskdef(resource: 'org/springframework/build/aws/ant/antlib.xml', classpath: configurations.antlibs.asPath)
s3(accessKey: s3AccessKey, secretKey: s3SecretAccessKey) {
target.configurations["s3"].artifacts.each { artifact ->
def archive = artifact.archiveTask
upload(bucketName: 'download.elasticsearch.org', file: archive.archivePath,
toFile: toDir + "/${archive.archiveName}",
publicRead: 'false') {
metadata(name: 'project.name', value: project)
metadata(name: 'package.file.name', value: archive.archiveName)
}
// checksum
def checksum = file(archive.archivePath.absolutePath + ".sha1.txt")
if (checksum.exists()) {
upload(bucketName: 'download.elasticsearch.org', file: archive.archivePath.absolutePath + ".sha1.txt",
toFile: toDir + "/${archive.archiveName}" + ".sha1.txt",
publicRead: 'false')
}
}
}
}
// see ant logging
target.logging.level = LogLevel.INFO
ant {
taskdef(resource: 'org/springframework/build/aws/ant/antlib.xml', classpath: configurations.antlibs.asPath)
s3(accessKey: s3AccessKey, secretKey: s3SecretAccessKey) {
target.configurations["s3"].artifacts.each { artifact ->
def archive = artifact.archiveTask
upload(bucketName: 'download.elasticsearch.org', file: archive.archivePath,
toFile: toDir + "/${archive.archiveName}",
publicRead: 'false') {
metadata(name: 'project.name', value: project)
metadata(name: 'package.file.name', value: archive.archiveName)
}
// checksum
def checksum = file(archive.archivePath.absolutePath + ".sha1.txt")
if (checksum.exists()) {
upload(bucketName: 'download.elasticsearch.org', file: archive.archivePath.absolutePath + ".sha1.txt",
toFile: toDir + "/${archive.archiveName}" + ".sha1.txt",
publicRead: 'false')
}
}
}
}
}
7 changes: 4 additions & 3 deletions gradle.properties
Expand Up @@ -35,10 +35,11 @@ hamcrestVersion = 1.3
# Hive 0.11 finally updated antlr to 3.4 so there are no more conflicts with Pig
antlrVersion = 3.4
thriftVersion = 0.5.0
esVersion = 1.7.1
#esVersion = 1.7.1
esVersion = 2.0.0-beta1-SNAPSHOT

luceneVersion = 4.10.4
groovyVersion = 2.3.2
luceneVersion = 5.2.1
groovyVersion = 2.4.4

# --------------------
# Project wide version
Expand Down
Expand Up @@ -25,15 +25,11 @@
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import static org.elasticsearch.hadoop.integration.hive.HiveSuite.provisionEsLib;
import static org.elasticsearch.hadoop.integration.hive.HiveSuite.server;
import static org.elasticsearch.hadoop.integration.hive.HiveSuite.*;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

public class AbstractHiveExtraTests {

Expand Down Expand Up @@ -81,6 +77,7 @@ public void testDate() throws Exception {
String resource = "hive/date-as-long";
RestUtils.touch("hive");
RestUtils.putMapping(resource, "org/elasticsearch/hadoop/hive/hive-date.json");

RestUtils.postData(resource + "/1", "{\"type\" : 1, \"&t\" : 1407239910771}".getBytes());

RestUtils.refresh("hive");
Expand Down

0 comments on commit ce4df55

Please sign in to comment.