Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Mar 3, 2017
2 parents 0780363 + 1ded641 commit 432a03a
Show file tree
Hide file tree
Showing 27 changed files with 314 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
<arg>-sourcePaths</arg>
<arg>${sourceDir}</arg>
<arg>-targetPath</arg>
<arg>${targetClusterFS}${targetDir}</arg>
<arg>${targetDir}</arg>
<arg>-falconFeedStorageType</arg>
<arg>FILESYSTEM</arg>
<arg>-availabilityFlag</arg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {

private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
private EventUtils eventUtils;
ScheduledThreadPoolExecutor timer;
private ScheduledThreadPoolExecutor timer;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
FileSystem fs= FileSystem.get(
FileSystem fs = FileSystem.get(
FileUtils.getConfiguration(context.getConfiguration(),
conf.get(HiveDRArgs.TARGET_NN.getName()),
conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName())));
conf.get(HiveDRArgs.TARGET_NN.getName()),
conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName())));
hiveDRStore = new HiveDRStatusStore(fs);
}

Expand All @@ -67,7 +67,7 @@ public int compare(ReplicationStatus r1, ReplicationStatus r2) {

@Override
protected void reduce(Text key, Iterable<Text> values, final Context context)
throws IOException, InterruptedException {
throws IOException, InterruptedException {
List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>();
ReplicationStatus rs;
timer = new ScheduledThreadPoolExecutor(1);
Expand Down
5 changes: 5 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
<artifactId>jersey-multipart</artifactId>
</dependency>

<dependency>
<groupId>org.jvnet.mimepull</groupId>
<artifactId>mimepull</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
7 changes: 5 additions & 2 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,6 +55,7 @@ public final class ExtensionHandler {
public static final Logger LOG = LoggerFactory.getLogger(ExtensionHandler.class);
private static final String UTF_8 = CharEncoding.UTF_8;
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String PATH_SEPARATOR = "_";
private static final String LOCATION = "location";
private static final String TYPE = "type";
private static final String NAME = "extensionName";
Expand Down Expand Up @@ -110,6 +112,7 @@ public static List<Entity> loadAndPrepare(String extensionName, String jobName,
public static List<Entity> prepare(String extensionName, String jobName, InputStream configStream, List<URL> urls)
throws IOException, FalconException {
ClassLoader extensionClassLoader = ExtensionClassLoader.load(urls);
LOG.debug("Urls loaded:" + StringUtils.join(", ", urls));
if (extensionClassLoader.getResourceAsStream(EXTENSION_BUILDER_INTERFACE_SERVICE_FILE) == null) {
throw new FalconCLIException("The extension build time jars do not contain "
+ EXTENSION_BUILDER_INTERFACE_SERVICE_FILE);
Expand Down Expand Up @@ -144,8 +147,8 @@ private static void stageEntities(List<Entity> entities, String stagePath) {
}

private static String createStagePath(String extensionName, String jobName) {
String stagePath = TMP_BASE_DIR + File.separator + extensionName + File.separator + jobName
+ File.separator + System.currentTimeMillis()/1000;
String stagePath = TMP_BASE_DIR + File.separator + extensionName + PATH_SEPARATOR + jobName
+ PATH_SEPARATOR + System.currentTimeMillis()/1000;
File tmpPath = new File(stagePath);
if (tmpPath.mkdir()) {
throw new FalconCLIException("Failed to create stage directory" + tmpPath.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ public void init() throws FalconException {

@Override
public void destroy() throws FalconException {
graphiteReporter.stop();
try {
// reporting final metrics into graphite before stopping
graphiteReporter.report();
} finally {
graphiteReporter.stop();
}
}

private MetricGauge createMetric(final String metricName){
Expand Down
69 changes: 69 additions & 0 deletions common/src/main/java/org/apache/falcon/util/FSDRUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
* Util class for FS DR.
*/
public final class FSDRUtils {
private FSDRUtils() {
}

private static final List<String> HDFS_SCHEME_PREFIXES =
Arrays.asList("file", "hdfs", "hftp", "hsftp", "webhdfs", "swebhdfs");

private static Configuration defaultConf = new Configuration();

public static Configuration getDefaultConf() {
return defaultConf;
}

public static void setDefaultConf(Configuration conf) {
defaultConf = conf;
}

public static boolean isHCFS(Path filePath) throws FalconException {
if (filePath == null) {
throw new FalconException("filePath cannot be empty");
}

String scheme;
try {
FileSystem f = FileSystem.get(filePath.toUri(), getDefaultConf());
scheme = f.getScheme();
if (StringUtils.isBlank(scheme)) {
throw new FalconException("Cannot get valid scheme for " + filePath);
}
} catch (IOException e) {
throw new FalconException(e);
}

return HDFS_SCHEME_PREFIXES.contains(scheme.toLowerCase().trim()) ? false : true;
}
}
61 changes: 61 additions & 0 deletions common/src/test/java/org/apache/falcon/util/FSDRUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.falcon.util;

import org.apache.falcon.FalconException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
* Tests FSDRUtils.
*/
public final class FSDRUtilsTest {

@BeforeClass
private void setup() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.s3n.awsAccessKeyId", "testS3KeyId");
conf.set("fs.s3n.awsSecretAccessKey", "testS3AccessKey");
conf.set("fs.azure.account.key.mystorage.blob.core.windows.net", "dGVzdEF6dXJlQWNjZXNzS2V5");
FSDRUtils.setDefaultConf(conf);
}

@Test(expectedExceptions = FalconException.class, expectedExceptionsMessageRegExp = "filePath cannot be empty")
public void testIsHCFSEmptyPath() throws Exception {
FSDRUtils.isHCFS(null);
}

@Test
public void testIsHCFS() throws Exception {
boolean isHCFSPath = FSDRUtils.isHCFS(new Path("/apps/dr"));
Assert.assertFalse(isHCFSPath);

isHCFSPath = FSDRUtils.isHCFS(new Path("hdfs://localhost:54136/apps/dr"));
Assert.assertFalse(isHCFSPath);

isHCFSPath = FSDRUtils.isHCFS(new Path("hftp://localhost:54136/apps/dr"));
Assert.assertFalse(isHCFSPath);

isHCFSPath = FSDRUtils.isHCFS(new Path("s3n://testBucket/apps/dr"));
Assert.assertTrue(isHCFSPath);
}
}
2 changes: 1 addition & 1 deletion docs/src/site/twiki/restapi/ExtensionDelete.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Result of the delete operation.
---++ Examples
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/extensions/delete/sales-monthly
POST http://localhost:15000/api/extension/delete/sales-monthly
</verbatim>
---+++ Result
<verbatim>
Expand Down
2 changes: 1 addition & 1 deletion docs/src/site/twiki/restapi/ExtensionInstances.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ A list of entities of the job, each followed by a set of instances.
---++ Examples
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/extensions/instances/daily-health-bill?start=2012-04-01T00:00
GET http://localhost:15000/api/extension/instances/daily-health-bill?start=2012-04-01T00:00
</verbatim>
---+++ Result
<verbatim>
Expand Down
2 changes: 1 addition & 1 deletion docs/src/site/twiki/restapi/ExtensionList.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Total number of results and a list of jobs implementing the given extension.
---++ Examples
---+++ Rest Call
<verbatim>
GET http://localhost:15000/api/extensions/list/billCollection
GET http://localhost:15000/api/extension/list/billCollection
</verbatim>
---+++ Result
<verbatim>
Expand Down
2 changes: 1 addition & 1 deletion docs/src/site/twiki/restapi/ExtensionResume.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Result of the resume operation.
---++ Examples
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/extensions/resume/sales-monthly
POST http://localhost:15000/api/extension/resume/sales-monthly
</verbatim>
---+++ Result
<verbatim>
Expand Down
2 changes: 1 addition & 1 deletion docs/src/site/twiki/restapi/ExtensionSchedule.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Result of the schedule operation.
---++ Examples
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/extensions/schedule/sales-monthly
POST http://localhost:15000/api/extension/schedule/sales-monthly
</verbatim>
---+++ Result
<verbatim>
Expand Down
2 changes: 1 addition & 1 deletion docs/src/site/twiki/restapi/ExtensionSubmit.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Result of submission.
---++ Examples
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/extensions/submit/hdfs-mirroring
POST http://localhost:15000/api/extension/submit/hdfs-mirroring
jobName=sales-monthly
jobClustername=primaryCluster
jobClusterValidityStart=2015-03-13T00:00Z
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Result of the submit and schedule operation.
---++ Examples
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/extensions/submitAndSchedule/hdfs-mirroring
POST http://localhost:15000/api/extension/submitAndSchedule/hdfs-mirroring
jobName=sales-monthly
jobClustername=primaryCluster
jobClusterValidityStart=2015-03-13T00:00Z
Expand Down
2 changes: 1 addition & 1 deletion docs/src/site/twiki/restapi/ExtensionSuspend.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Result of the suspend operation.
---++ Examples
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/extensions/suspend/sales-monthly
POST http://localhost:15000/api/extension/suspend/sales-monthly
</verbatim>
---+++ Result
<verbatim>
Expand Down
2 changes: 1 addition & 1 deletion docs/src/site/twiki/restapi/ExtensionUpdate.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Result of update.
---++ Examples
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/extensions/update/hdfs-mirroring
POST http://localhost:15000/api/extension/update/hdfs-mirroring
jobName=sales-monthly
jobClustername=primaryCluster
jobClusterValidityStart=2015-03-13T00:00Z
Expand Down
2 changes: 1 addition & 1 deletion docs/src/site/twiki/restapi/ExtensionValidate.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Result of validation.
---++ Examples
---+++ Rest Call
<verbatim>
POST http://localhost:15000/api/extensions/validate/hdfs-mirroring
POST http://localhost:15000/api/extension/validate/hdfs-mirroring
jobName=sales-monthly
jobClustername=primaryCluster
jobClusterValidityStart=2015-03-13T00:00Z
Expand Down
Loading

0 comments on commit 432a03a

Please sign in to comment.