Skip to content

Commit

Permalink
[Metadata Extraction] Examples showing full, incremental and filtered…
Browse files Browse the repository at this point in the history
… metadata extractions supported.

1. Functionality added in NavApi Cient to form POST requests for large queries. Body object is map with "query", "limit"  and "cursorMark" sent with POST requests to same url's. Incremental extraction is based on extractorRunIds to determine which files have been added/modified/extracted since previous extraction as specified in marker (and not modified since end marker, if specified).

2. Custom iterable and iterator created to handle pagination of results by using batch size limit and extractorRunId query parameters into batches. MetadataType Enum moved to examples package, and IncrementalExtractIterator stores type and calls appropriate method in NavApiCient for entity or relation extraction.

3. Wrapper classes for Entity and Relation results batches, aggregated in MetadataResultSet wrapper class. QueryCriteria class acts a wrapper for info to be sent in the body of a post request.

4. Examples in extraction examples module run full extraction unless a file path for a marker is specified. FilteredMetadataExtraction shows sample queries. Example scripts take in client configuration (required), start marker file path, end marker file path, file path to write next marker to.
  • Loading branch information
Nadia Wallace committed Jul 20, 2015
1 parent 8b5a718 commit e27b6c5
Show file tree
Hide file tree
Showing 34 changed files with 1,402 additions and 41 deletions.
4 changes: 3 additions & 1 deletion .gitignore
@@ -1,4 +1,6 @@
*.class
*.swp
/examples/performace_report.txt

# Mobile Tools for Java (J2ME)
.mtj.tmp/
Expand All @@ -15,4 +17,4 @@ hs_err_pid*
*.iml

target/
*/target/
*/target/
@@ -0,0 +1,181 @@
/*
* Copyright (c) 2015 Cloudera, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.nav.plugin.client.examples.extraction;

import com.cloudera.nav.plugin.client.ClientUtils;
import com.cloudera.nav.plugin.client.NavApiCient;
import com.cloudera.nav.plugin.client.PluginConfigurationFactory;
import com.cloudera.nav.plugin.client.PluginConfigurations;
import com.cloudera.nav.plugin.model.Source;
import com.cloudera.nav.plugin.model.SourceType;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Example calls using the MetadataExtractor class to perform
* incremental extraction. Examples shown utilize the
* {@link MetadataExtractor#extractMetadata(String, String, String)}
* method with a marker and with query strings for specifying Entities and
* Relations to be retrieved.
*/
public class FilteredMetadataExtraction {

private static final Logger LOG =
LoggerFactory.getLogger(FilteredMetadataExtraction.class);

public static void getHDFSEntities(NavApiCient client,
MetadataExtractor extractor,
String marker){
Iterable<Map<String, Object>> HdfsAll =
extractor.extractMetadata(marker, null, "sourceType:HDFS", null)
.getEntities();
getFirstResult(HdfsAll);

Source hdfsSource = client.getOnlySource(SourceType.HDFS);
Iterable<Map<String, Object>> HdfsSingleSource =
extractor.extractMetadata(marker, null, "sourceType:HDFS AND " +
"sourceId:" + hdfsSource.getIdentity(), null).getEntities();
getFirstResult(HdfsSingleSource);
}

public static void getHive(MetadataExtractor extractor,
String marker,
String colName){
Iterable<Map<String, Object>> hiveDb = extractor.extractMetadata(marker,
null, "sourceType:HIVE AND type:DATABASE", null).getEntities();
getFirstResult(hiveDb);

Iterable<Map<String, Object>> hiveTable = extractor.extractMetadata(marker,
null, "sourceType:HIVE AND type:TABLE", null).getEntities();
getFirstResult(hiveTable);

Iterable<Map<String, Object>> hiveView = extractor.extractMetadata(marker,
null, "sourceType:HIVE AND type:VIEW", null).getEntities();
getFirstResult(hiveView);

Iterable<Map<String, Object>> hiveColumn = extractor.extractMetadata(marker,
null, "sourceType:HIVE AND type:FIELD " +
"AND originalName:" + colName, null).getEntities();
getFirstResult(hiveColumn);

Iterable<Map<String, Object>> hiveRelation = extractor.extractMetadata(
marker, null,
"sourceType:HIVE AND type:(DIRECTORY OR FILE)",
"endpoint1SourceType:HIVE AND endpoint2SourceType: HIVE " +
"AND type:PARENT_CHILD AND endpoint1Type: DIRECTORY " +
"AND endpoint2Type: FILE").getRelations();
getFirstResult(hiveRelation);
//Further data processing with iterable.iterator()
}

public static void getHiveOperations(MetadataExtractor extractor, String marker){
Iterable<Map<String, Object>> hiveOpEntities = extractor.extractMetadata(
marker, null, "sourceType:HIVE AND type:OPERATION_EXECUTION", null)
.getEntities();
getFirstResult(hiveOpEntities);

Iterable<Map<String, Object>> hiveOpRelations = extractor.extractMetadata(
marker, null,
"sourceType:HIVE AND type:OPERATION_EXECUTION",
"type:LOGICAL_PHYSICAL AND endpoint1SourceType:HIVE " +
"AND endpoint1Type:OPERATION_EXECUTION").getRelations();
getFirstResult(hiveOpRelations);
//Further data processing with iterable.iterator()
}

public static String getMRandYarn(MetadataExtractor extractor, String marker){
Iterable<Map<String, Object>> yarnOpEntities = extractor.extractMetadata(
marker, null, "sourceType:(MAPREDUCE OR YARN) AND type:OPERATION_EXECUTION",
null).getEntities();
getFirstResult(yarnOpEntities);

//Alternative with buildQuery
List<String> sourceTypes = Lists.newArrayList("MAPREDUCE", "YARN");
List<String> types = Lists.newArrayList("OPERATION EXECUTION");
String entityQuery = ClientUtils.buildQuery(sourceTypes, types);
Iterable<Map<String, Object>> yarnOpEntities2 = extractor.extractMetadata(
marker, null, entityQuery, "").getEntities();
getFirstResult(yarnOpEntities2);

MetadataResultSet yarnOp = extractor.extractMetadata(
marker, null,
"sourceType:(MAPREDUCE OR YARN) AND type:OPERATION_EXECUTION",
"type:DATA_FLOW AND endpoint1SourceType:HDFS OR endpoint2SourceType:HDFS");
Iterable<Map<String, Object>> yarnOpRelations = yarnOp.getRelations();
getFirstResult(yarnOpRelations);
//Further data processing with iterable.iterator()
return yarnOp.getMarker();
}

private static void getFirstResult(Iterable<Map<String, Object>> iterable){
Iterator<Map<String, Object>> iterator = iterable.iterator();
if(iterator.hasNext()) {
Map<String, Object> result = iterator.next();
LOG.info("source: " + result.get("sourceType") +
" type: " + result.get("type"));
} else {
LOG.info("no elements found");
}
}

public static void main(String[] args){
String configFilePath = args[0];
PluginConfigurations config = (new PluginConfigurationFactory())
.readConfigurations(configFilePath);
NavApiCient client = new NavApiCient(config);
MetadataExtractor extractor = new MetadataExtractor(client, null);
String marker;
try {
String markerReadPath = args[1];
File markerFile = new File(markerReadPath);
FileReader fr = new FileReader(markerFile);
BufferedReader markerReader = new BufferedReader(fr);
marker = markerReader.readLine();
markerReader.close();
fr.close();
} catch (IOException e) {
throw Throwables.propagate(e);
} catch (ArrayIndexOutOfBoundsException e){
marker=null;
}
getHDFSEntities(client, extractor, marker);
getHive(extractor, marker, "city_id");
getHiveOperations(extractor, marker);
String nextMarker = getMRandYarn(extractor, marker);
try {
String markerWritePath = args[2];
PrintWriter markerWriter = new PrintWriter(markerWritePath, "UTF-8");
markerWriter.println(nextMarker);
markerWriter.close();
} catch(IOException e) {
throw Throwables.propagate(e);
} catch (ArrayIndexOutOfBoundsException e){
LOG.error("Please specify a file to save next marker");
}
}
}
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2015 Cloudera, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.nav.plugin.client.examples.extraction;

import com.cloudera.nav.plugin.client.NavApiCient;

import java.util.Map;

/**
* Iterable used for entities and relations in MetadataResultSet object returned
* by extractMetadata() calls.
*/
public class IncrementalExtractIterable implements Iterable<Map<String, Object>> {

private final NavApiCient client;
private final MetadataType type;
private final String query;
private final Integer limit;
private final Iterable<String> extractorRunIds;

public IncrementalExtractIterable(NavApiCient client, MetadataType type,
String query, Integer limit,
Iterable<String> extractorRunIds){
this.query = query;
this.type = type;
this.client = client;
this.limit =limit;
this.extractorRunIds = extractorRunIds;
}

@Override
public IncrementalExtractIterator iterator() {
return new IncrementalExtractIterator(client, type, query, limit, extractorRunIds);
}
}
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2015 Cloudera, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.nav.plugin.client.examples.extraction;

import com.cloudera.nav.plugin.client.ClientUtils;
import com.cloudera.nav.plugin.client.NavApiCient;
import com.cloudera.nav.plugin.client.QueryCriteria;
import com.cloudera.nav.plugin.client.ResultsBatch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;

import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.List;

/**
* Custom iterator class used to retrieve entities and relations results from
* incremental extraction in extractMetadata(). Handles iterating through batches
* of results with a cursor per query, and iterates through a set of queries.
*/
public class IncrementalExtractIterator implements Iterator<Map<String, Object>> {

private final NavApiCient client;
private final Integer limit;
private final Integer MAX_QUERY_PARTITION_SIZE = 800;
private final MetadataType type;
private final String userQuery;
private boolean hasNext;
private Iterator<List<String>> partitionIterator;
private List<Map<String, Object>> resultsBatch;
private Iterator<Map<String, Object>> resultsBatchIterator;
private String cursorMark="*";
private String fullQuery;

public IncrementalExtractIterator(NavApiCient client,
MetadataType type, String query, Integer limit,
Iterable<String> extractorRunIds){
this.client = client;
this.type = type;
this.userQuery = query;
this.limit = limit;
this.partitionIterator =
Iterables.partition(extractorRunIds,MAX_QUERY_PARTITION_SIZE).iterator();
if(!Iterables.isEmpty(extractorRunIds)) {
getNextQuery();
} else {
fullQuery = userQuery;
}
getNextBatch();
}

@Override
public boolean hasNext() {
return hasNext;
}

@Override
public Map<String, Object> next() {
if(!hasNext()){
throw new NoSuchElementException();
}
Map<String, Object> nextResult = resultsBatchIterator.next();
//if at last element in batch
if(!resultsBatchIterator.hasNext()){
//if on last batch
if(resultsBatch.size()<limit) {
//if on last query, leave loop
if (!partitionIterator.hasNext()) {
hasNext = false;
//Update query and get next batch
} else {
getNextQuery();
getNextBatch();
}
//fetch next batch
} else {
getNextBatch();
}
}
return nextResult;
}

@VisibleForTesting
public void getNextBatch(){
try {
ResultsBatch<Map<String, Object>> response = getResultsBatch();
resultsBatch = response.getResults();
resultsBatchIterator = resultsBatch.iterator();
hasNext = resultsBatchIterator.hasNext();
cursorMark = response.getCursorMark();
if (!hasNext && partitionIterator.hasNext()) {
getNextQuery();
getNextBatch();
}
} catch (Exception e){
throw Throwables.propagate(e);
}
}

private ResultsBatch<Map<String, Object>> getResultsBatch() throws EnumConstantNotPresentException{
QueryCriteria queryCriteria = new QueryCriteria(fullQuery, limit, cursorMark);
switch(type) {
case ENTITIES:
return client.getEntityBatch(queryCriteria);
case RELATIONS:
return client.getRelationBatch(queryCriteria);
default:
throw new EnumConstantNotPresentException(MetadataType.class, type.name());
}
}

private void getNextQuery(){
cursorMark="*";
List<String> extractorRunIdNext = partitionIterator.next();
String extractorString = ClientUtils.buildConjunctiveClause("extractorRunId",
extractorRunIdNext);
fullQuery = ClientUtils.conjoinSolrQueries(userQuery, extractorString);
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}

0 comments on commit e27b6c5

Please sign in to comment.