Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,28 +229,31 @@ private class DatasetWorkUnitGenerator implements Callable<Void> {
Iterator<FileSet<CopyEntity>> fileSets =
this.copyableDataset.getFileSetIterator(this.targetFs, this.copyConfiguration);

while (fileSets.hasNext() && !this.workUnitList.hasRejectedFileSet()) {
while (fileSets.hasNext() && !shouldStopGeneratingWorkUnits(this.workUnitList)) {
FileSet<CopyEntity> fileSet = fileSets.next();
Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, CopyConfiguration.COPY_PREFIX, fileSet.getName());
List<WorkUnit> workUnitsForPartition = Lists.newArrayList();
for (CopyEntity copyEntity : fileSet.getFiles()) {

CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(this.copyableDataset);
CopyEntity.DatasetAndPartition datasetAndPartition = copyEntity.getDatasetAndPartition(metadata);

WorkUnit workUnit = new WorkUnit(extract);
workUnit.addAll(this.state);
serializeCopyEntity(workUnit, copyEntity);
serializeCopyableDataset(workUnit, metadata);
GobblinMetrics.addCustomTagToState(workUnit, new Tag<>(CopyEventSubmitterHelper.DATASET_ROOT_METADATA_NAME,
this.copyableDataset.datasetURN()));
workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, datasetAndPartition.toString());
workUnit.setProp(SlaEventKeys.DATASET_URN_KEY, this.copyableDataset.datasetURN());
workUnit.setProp(SlaEventKeys.PARTITION_KEY, copyEntity.getFileSet());
computeAndSetWorkUnitGuid(workUnit);
workUnitsForPartition.add(workUnit);

if (!fileSet.getFiles().isEmpty()) {
Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, CopyConfiguration.COPY_PREFIX, fileSet.getName());
List<WorkUnit> workUnitsForPartition = Lists.newArrayList();
for (CopyEntity copyEntity : fileSet.getFiles()) {

CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(this.copyableDataset);
CopyEntity.DatasetAndPartition datasetAndPartition = copyEntity.getDatasetAndPartition(metadata);

WorkUnit workUnit = new WorkUnit(extract);
workUnit.addAll(this.state);
serializeCopyEntity(workUnit, copyEntity);
serializeCopyableDataset(workUnit, metadata);
GobblinMetrics.addCustomTagToState(workUnit,
new Tag<>(CopyEventSubmitterHelper.DATASET_ROOT_METADATA_NAME, this.copyableDataset.datasetURN()));
workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, datasetAndPartition.toString());
workUnit.setProp(SlaEventKeys.DATASET_URN_KEY, this.copyableDataset.datasetURN());
workUnit.setProp(SlaEventKeys.PARTITION_KEY, copyEntity.getFileSet());
computeAndSetWorkUnitGuid(workUnit);
workUnitsForPartition.add(workUnit);
}
this.workUnitList.addFileSet(fileSet, workUnitsForPartition);
}
this.workUnitList.addFileSet(fileSet, workUnitsForPartition);
}
} catch (IOException ioe) {
throw new RuntimeException("Failed to generate work units for dataset " + this.copyableDataset.datasetURN(), ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;

import com.google.common.base.Function;
import com.google.common.base.Optional;
Expand All @@ -38,54 +41,88 @@


/**
* Finds {@link HiveDataset}s. Will look for tables in a database specified by {@link #DB_KEY}, possibly filtering them
* Finds {@link HiveDataset}s. Will look for tables in a database specified by {@link #DB_PATTERN_KEY}, possibly filtering them
* with pattern {@link #TABLE_PATTERN_KEY}, and create a {@link HiveDataset} for each one.
*/
public class HiveDatasetFinder implements IterableDatasetFinder<HiveDataset> {

public static final String HIVE_DATASET_PREFIX = "hive.dataset";
public static final String HIVE_METASTORE_URI_KEY = HIVE_DATASET_PREFIX + ".hive.metastore.uri";
public static final String DB_KEY = HIVE_DATASET_PREFIX + ".database";
public static final String DB_PATTERN_KEY = HIVE_DATASET_PREFIX + ".database.pattern";
public static final String TABLE_PATTERN_KEY = HIVE_DATASET_PREFIX + ".table.pattern";
public static final String DEFAULT_TABLE_PATTERN = "*";

private final HiveRegProps hiveProps;
private final Properties properties;
private final HiveMetastoreClientPool clientPool;
private final FileSystem fs;
private final String db;
private final String dbPattern;
private final String tablePattern;

public HiveDatasetFinder(FileSystem fs, Properties properties) throws IOException {

Preconditions.checkArgument(properties.containsKey(DB_KEY));
Preconditions.checkArgument(properties.containsKey(DB_PATTERN_KEY));

this.fs = fs;
this.clientPool = HiveMetastoreClientPool.get(properties,
Optional.fromNullable(properties.getProperty(HIVE_METASTORE_URI_KEY)));
this.hiveProps = this.clientPool.getHiveRegProps();

this.db = properties.getProperty(DB_KEY);
this.dbPattern = properties.getProperty(DB_PATTERN_KEY);
this.tablePattern = properties.getProperty(TABLE_PATTERN_KEY, DEFAULT_TABLE_PATTERN);
this.properties = properties;
}

/**
* Get all tables in db with given table pattern.
*/
public Collection<Table> getTables(String db, String tablePattern) throws IOException {
List<Table> tables = Lists.newArrayList();

try(AutoReturnableObject<IMetaStoreClient> client = this.clientPool.getClient()) {
List<String> tableNames = client.get().getTables(db, tablePattern);
for (String tableName : tableNames) {
tables.add(client.get().getTable(db, tableName));
}
public Iterator<Table> getTables() throws IOException {
try (AutoReturnableObject<IMetaStoreClient> client = this.clientPool.getClient()) {
List<String> dbNames = client.get().getDatabases(this.dbPattern);
return new TableIterator(dbNames);
} catch (Exception exc) {
throw new IOException(exc);
}
}

private class TableIterator implements Iterator<Table> {

private final Iterator<String> databases;

private Iterator<String> tablesInDb;
private String currentDb;

public TableIterator(List<String> databases) {
this.databases = databases.iterator();
}

@Override
public boolean hasNext() {
return (this.tablesInDb.hasNext() || this.databases.hasNext());
}

@Override
public Table next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

try (AutoReturnableObject<IMetaStoreClient> client = clientPool.getClient()) {
if (!this.tablesInDb.hasNext()) {
this.currentDb = this.databases.next();
this.tablesInDb = client.get().getTables(this.currentDb, tablePattern).iterator();
}

return tables;
return client.get().getTable(this.currentDb, this.tablesInDb.next());
} catch (IOException | TException ioe) {
throw new RuntimeException(ioe);
}
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}

@Override public List<HiveDataset> findDatasets() throws IOException {
Expand All @@ -95,7 +132,7 @@ public Collection<Table> getTables(String db, String tablePattern) throws IOExce
@Override
public Iterator<HiveDataset> getDatasetsIterator()
throws IOException {
return Iterators.transform(getTables(this.db, this.tablePattern).iterator(), new Function<Table, HiveDataset>() {
return Iterators.transform(getTables(), new Function<Table, HiveDataset>() {
@Nullable
@Override
public HiveDataset apply(@Nullable Table table) {
Expand Down