Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Refine SchemaSuggest so that it doesn't examine schemas that are so d…

…issimilar from the target that they can't possibly be returned. This speeds up schema inference by roughly 3x
  • Loading branch information...
commit f9346c16a7e8af423cc2ef2f7b75b7d1ff0ec5f4 1 parent 681532b
@mikecafarella mikecafarella authored
View
13 src/java/com/cloudera/recordbreaker/schemadict/SchemaStatisticalSummary.java
@@ -1476,6 +1476,19 @@ SummaryNode buildStructure(Schema s, String docStr) {
/////////////////////////////////////////////////////////
// Schema distance computation
/////////////////////////////////////////////////////////
+ /**
+ * Get the minimum mapping cost from a schema of size k to one of size m.
+ * This helps us avoid mapping computations that couldn't possibly produce
+ * a low-distance mapping.
+ */
+ public static double getMinimumMappingCost(int k, int m) {
+ return Math.abs(k - m) * Math.min(MATCHCOST_CREATE, MATCHCOST_DELETE);
+ }
+
+ /**
+ * Find the best mapping between the current schema summary and the one provided
+ * by the parameter.
+ */
public SchemaMapping getBestMapping(SchemaStatisticalSummary other) {
SummaryNode t1 = root;
SummaryNode t2 = other.root;
View
97 src/java/com/cloudera/recordbreaker/schemadict/SchemaSuggest.java
@@ -50,13 +50,34 @@
* @author mjc
****************************************************************/
public class SchemaSuggest {
+ int MIN_ELTS_SUGGESTED = 10;
+ int NUM_BUCKETS = 20;
SchemaDictionary dict;
-
+ List<List<SchemaDictionaryEntry>> dictBySize;
+
/**
* Load in the Schema Dictionary from the indicated file.
*/
public SchemaSuggest(File dataDir) throws IOException {
this.dict = new SchemaDictionary(dataDir);
+
+ // The 'dictBySize' structure allows us to perform schema inference
+ // more quickly, by avoiding examination of schemas that can't possibly
+ // be returned by inferSchemaMapping().
+ this.dictBySize = new ArrayList<List<SchemaDictionaryEntry>>();
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ dictBySize.add(new ArrayList<SchemaDictionaryEntry>());
+ }
+
+ for (SchemaDictionaryEntry elt: dict.contents()) {
+ Schema comparisonSchema = elt.getSchema();
+ int comparisonSchemaSize = comparisonSchema.getFields().size();
+ if (comparisonSchemaSize < dictBySize.size()-1) {
+ dictBySize.get(comparisonSchemaSize-1).add(elt);
+ } else {
+ dictBySize.get(dictBySize.size()-1).add(elt);
+ }
+ }
}
/**
@@ -75,12 +96,78 @@ public SchemaSuggest(File dataDir) throws IOException {
// Compare the statistics to the database of schema statistics. Find the closest matches, both
// on a per-attribute basis and structurally.
//
+ int schemaSize = srcSchema.getFields().size();
+ //
+ // We start testing the input database against known schemas that have an identical
+ // number of attributes, which should allow for the best matches. This gives us an
+ // initial set of distances. We then expand the search to schemas of greater or fewer
+ // attributes, as long as a given bucket of size-k schemas has a min-distance of less
+ // than the current top-k matches.
+ //
+ //
TreeSet<DictionaryMapping> sorter = new TreeSet<DictionaryMapping>();
- for (SchemaDictionaryEntry elt: dict.contents()) {
- SchemaMapping mapping = srcSummary.getBestMapping(elt.getSummary());
- sorter.add(new DictionaryMapping(mapping, elt));
- }
+ int numMatches = 0;
+ List<Integer> seenIndexes = new ArrayList<Integer>();
+ int searchRadius = 0;
+ boolean seenAllCandidates = false;
+ int srcSchemaSize = srcSchema.getFields().size();
+
+ while (! seenAllCandidates) {
+ // Examine the relevant schema buckets, compute all matches to those schemas
+ for (int j = Math.max(0, srcSchemaSize - searchRadius);
+ j <= Math.min(NUM_BUCKETS, srcSchemaSize + searchRadius); j++) {
+ if (seenIndexes.contains(j-1)) {
+ continue;
+ }
+ for (SchemaDictionaryEntry elt: dictBySize.get(j-1)) {
+ SchemaMapping mapping = srcSummary.getBestMapping(elt.getSummary());
+ sorter.add(new DictionaryMapping(mapping, elt));
+ if (numMatches % 10 == 0) {
+ //System.err.println(" Matched object " + numMatches);
+ }
+ numMatches++;
+ }
+ seenIndexes.add(j-1);
+ }
+
+ // Have we examined the entire corpus of known schemas?
+ if ((srcSchemaSize - searchRadius) <= 0 && (srcSchemaSize + searchRadius) >= NUM_BUCKETS) {
+ seenAllCandidates = true;
+ } else {
+ // Test to see if the best matches are good enough that we can stop looking.
+ // We compare the lowest known match distance to the minimum distance for matches
+ // in the closest non-examined buckets.
+ int lowestSize = srcSchemaSize - searchRadius - 1;
+ int highestSize = srcSchemaSize + searchRadius + 1;
+ double minNearbyDistance = Double.MAX_VALUE;
+ if (lowestSize >= 1) {
+ minNearbyDistance = Math.min(minNearbyDistance,
+ SchemaStatisticalSummary.getMinimumMappingCost(srcSchemaSize, lowestSize));
+ }
+ if (highestSize <= NUM_BUCKETS) {
+ minNearbyDistance = Math.min(minNearbyDistance,
+ SchemaStatisticalSummary.getMinimumMappingCost(srcSchemaSize, highestSize));
+ }
+ // Grab from the Sorter the elt that is MIN_ELTS_SUGGESTED into the sorted list
+ if (sorter.size() >= MIN_ELTS_SUGGESTED) {
+ DictionaryMapping testDictMapping = null;
+ int idx = 0;
+ for (DictionaryMapping cur: sorter) {
+ idx++;
+ if (idx == MIN_ELTS_SUGGESTED) {
+ testDictMapping = cur;
+ break;
+ }
+ }
+ if (testDictMapping.getMapping().getDist() < minNearbyDistance) {
+ seenAllCandidates = true;
+ }
+ }
+ }
+ searchRadius++;
+ }
+
// Return the k best schema mappings
List<DictionaryMapping> dsts = new ArrayList<DictionaryMapping>();
for (DictionaryMapping dp: sorter) {
View
3  test/com/cloudera/recordbreaker/schemadictionary/test/TestSchemaDictionary.java
@@ -76,7 +76,7 @@ public void prepare() {
@Test(timeout=100000)
public void testSchemaDictionary() throws IOException {
- int maxDictSize = 100;
+ int maxDictSize = 500;
int maxTestSize = Math.min(10, maxDictSize);
int MAX_MAPPINGS = 5;
double MINIMUM_MEAN_RECIPROCAL_RANK = 0.75;
@@ -113,6 +113,7 @@ public void testSchemaDictionary() throws IOException {
int i = 0;
// Iterate through all files in the test dir
+ System.err.println("Examining: " + testDbDir);
for (File f: testDbDir.listFiles()) {
if (f.getName().endsWith(".avro")) {
String testName = f.getName();
Please sign in to comment.
Something went wrong with that request. Please try again.