Skip to content
Permalink
Browse files

[GOBBLIN-852] Reorganize the code for hive registration to isolate fu…

…nction

Closes #2708 from ZihanLi58/ETL-8815
  • Loading branch information...
Zihan Li suvasude
Zihan Li authored and suvasude committed Aug 13, 2019
1 parent 50280ee commit be97acfd596b47edaa0e613d15f0ca267e525fc5
@@ -76,9 +76,9 @@
private static final boolean DEFAULT_PATH_DEDUPE_ENABLED = true;

private final Closer closer = Closer.create();
private final HiveRegister hiveRegister;
private final ExecutorService hivePolicyExecutor;
private final MetricContext metricContext;
protected final HiveRegister hiveRegister;
protected final ExecutorService hivePolicyExecutor;
protected final MetricContext metricContext;

/**
* The configuration to determine if path deduplication should be enabled during Hive Registration process.
@@ -88,7 +88,7 @@
*
* e.g. In streaming mode, there could be cases that files(e.g. avro) under single topic folder carry different schema.
*/
private boolean isPathDedupeEnabled;
protected boolean isPathDedupeEnabled;

/**
* Make the deduplication of path to be registered in the Publisher level,
@@ -118,25 +118,14 @@ public void close() throws IOException {
}
}

@Deprecated
@Override
public void initialize() throws IOException {}

/**
* @param states This is a collection of TaskState.
*/
@Override
public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
CompletionService<Collection<HiveSpec>> completionService =
new ExecutorCompletionService<>(this.hivePolicyExecutor);

protected int computeSpecs(Collection<? extends WorkUnitState> states, CompletionService<Collection<HiveSpec>> completionService) {
// Each state in states is task-level State, while superState is the Job-level State.
// Using both State objects to distinguish each HiveRegistrationPolicy so that
// they can carry task-level information to pass into Hive Partition and its corresponding Hive Table.

// Here all runtime task-level props are injected into superstate which installed in each Policy Object.
// runtime.props are comma-separated props collected in runtime.
int toRegisterPathCount = 0 ;
int toRegisterPathCount = 0;
for (State state:states) {
State taskSpecificState = state;
if (state.contains(ConfigurationKeys.PUBLISHER_DIRS)) {
@@ -147,17 +136,20 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
LIST_SPLITTER_COMMA.splitToList(this.hiveRegister.getProps().getUpstreamDataAttrName().get())){
if (state.contains(attrName)) {
taskSpecificState.appendToListProp(HiveMetaStoreUtils.RUNTIME_PROPS,
attrName + ":" + state.getProp(attrName));
attrName + ":" + state.getProp(attrName));
}
}
}

final HiveRegistrationPolicy policy = HiveRegistrationPolicyBase.getPolicy(taskSpecificState);
for ( final String path : state.getPropAsList(ConfigurationKeys.PUBLISHER_DIRS) ) {
if (isPathDedupeEnabled && pathsToRegisterFromSingleState.contains(path)){
continue;
if (isPathDedupeEnabled){
if (pathsToRegisterFromSingleState.contains(path)) {
continue;
} else {
pathsToRegisterFromSingleState.add(path);
}
}
pathsToRegisterFromSingleState.add(path);
toRegisterPathCount += 1;
completionService.submit(new Callable<Collection<HiveSpec>>() {
@Override
@@ -169,8 +161,22 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
});
}
}
else continue;
}
return toRegisterPathCount;
}
@Deprecated
@Override
public void initialize() throws IOException {}

/**
* @param states This is a collection of TaskState.
*/
@Override
public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
CompletionService<Collection<HiveSpec>> completionService =
new ExecutorCompletionService<>(this.hivePolicyExecutor);

int toRegisterPathCount = computeSpecs(states, completionService);
for (int i = 0; i < toRegisterPathCount; i++) {
try {
for (HiveSpec spec : completionService.take().get()) {

0 comments on commit be97acf

Please sign in to comment.
You can’t perform that action at this time.