Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-1859 Feed export instances are not added to Graph DB
Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>

Reviewers: "Balu Vellanki <balu@apache.org>"

Closes #165 from vramachan/FALCON-1859.Export.GraphDB

(cherry picked from commit 16d2b39)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>
  • Loading branch information
vramachan authored and bvellanki committed Jun 3, 2016
1 parent a8d6dcd commit 5b06e5f6bb3483aec8aa1c7dac749a1803b63897
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 1 deletion.
@@ -200,6 +200,16 @@ private Map<Node, Set<Node>> getEdgesFor(Feed feed) {
feedEdges.add(dbNode);
dbEdges.add(feedNode);
}

if (FeedHelper.isExportEnabled(cluster)) {
Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getExportDatasourceName(cluster));
if (!nodeEdges.containsKey(dbNode)) {
nodeEdges.put(dbNode, new HashSet<Node>());
}
Set<Node> dbEdges = nodeEdges.get(dbNode);
feedEdges.add(dbNode);
dbEdges.add(feedNode);
}
}
return nodeEdges;
}
@@ -20,17 +20,22 @@

import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Load;
import org.apache.falcon.entity.v0.feed.Argument;
import org.apache.falcon.entity.v0.feed.Arguments;
import org.apache.falcon.entity.v0.feed.Clusters;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Extract;
import org.apache.falcon.entity.v0.feed.ExtractMethod;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.FieldsType;
import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
import org.apache.falcon.entity.v0.feed.Import;
import org.apache.falcon.entity.v0.feed.MergeType;
import org.apache.falcon.entity.v0.feed.Export;
import org.apache.falcon.entity.v0.feed.LoadMethod;


import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.process.Input;
@@ -184,6 +189,36 @@ private Import getAnImport(MergeType mergeType, Datasource ds) {
return imp;
}

private Feed addFeedExport(String feed, Cluster cluster, Datasource ds) {

Feed f1 = new Feed();
f1.setName(feed);
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
new org.apache.falcon.entity.v0.feed.Cluster();
feedCluster.setName(cluster.getName());
feedCluster.setType(ClusterType.SOURCE);
Clusters clusters = new Clusters();
clusters.getClusters().add(feedCluster);
f1.setClusters(clusters);

Export exp = getAnExport(LoadMethod.UPDATEONLY, ds);
f1.getClusters().getClusters().get(0).setExport(exp);
return f1;
}

private Export getAnExport(LoadMethod loadMethod, Datasource ds) {

org.apache.falcon.entity.v0.feed.Datasource target = new org.apache.falcon.entity.v0.feed.Datasource();
target.setName(ds.getName());
target.setTableName("test-table");
Load load = new Load();
load.setType(loadMethod);
target.setLoad(load);
Export exp = new Export();
exp.setTarget(target);
return exp;
}

private void attachInput(Process process, Feed feed) {
if (process.getInputs() == null) {
process.setInputs(new Inputs());
@@ -381,6 +416,42 @@ public void testOnAddImport() throws Exception {
store.remove(EntityType.CLUSTER, "ci1");
}

@Test
public void testOnAddExport() throws Exception {

Datasource ds = new Datasource();
ds.setName("test-db");
ds.setColo("c1");

Cluster cluster = new Cluster();
cluster.setName("ci1");
cluster.setColo("c1");

Feed f1 = addFeedExport("fe1", cluster, ds);

store.publish(EntityType.CLUSTER, cluster);
store.publish(EntityType.DATASOURCE, ds);
store.publish(EntityType.FEED, f1);

Set<Entity> entities = graph.getDependents(cluster);
Assert.assertEquals(entities.size(), 1);
Assert.assertTrue(entities.contains(f1));

entities = graph.getDependents(ds);
Assert.assertEquals(entities.size(), 1);
Assert.assertTrue(entities.contains(f1));

entities = graph.getDependents(f1);
Assert.assertEquals(entities.size(), 2);
Assert.assertTrue(entities.contains(cluster));
Assert.assertTrue(entities.contains(ds));

store.remove(EntityType.FEED, "fe1");
store.remove(EntityType.DATASOURCE, "test-db");
store.remove(EntityType.CLUSTER, "ci1");
}


@Test
public void testOnRemoveDatasource() throws Exception {

0 comments on commit 5b06e5f

Please sign in to comment.