Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add code that will automatically simplify induced schema while parsin…

…g at least X% of the input data. The user indicates what X should be.
  • Loading branch information...
commit ee99bac813a73798defe3ad680adf2092201e775 1 parent e4308c8
@mikecafarella mikecafarella authored
View
4 src/main/scala/com/cloudera/recordbreaker/learnstructure2/Infer.scala
@@ -30,10 +30,12 @@ object Infer {
*/
def discover(cs: Chunks): HigherType = {
HigherType.resetFieldCount()
- internalDiscover(cs) match {
+ val result = internalDiscover(cs) match {
case a: HTStruct => a
case x: HigherType => HTStruct(List(x))
}
+
+ Rewrite.refineAll(result, cs)
}
private def internalDiscover(cs:Chunks): HigherType = {
//println("internalDiscover()...")
View
39 src/main/scala/com/cloudera/recordbreaker/learnstructure2/RBTypes.scala
@@ -193,6 +193,17 @@ object RBTypes {
println()
ht.prettyprint(0, true, denomCount)
}
+
+ def getLowestCountUnionBranch(ht: HigherType): Tuple2[Int, Option[String]] = {
+ ht match {
+ case a: HTUnion => (a.value.map(x=> getLowestCountUnionBranch(x)) ++ a.value.map(z => (z.linesProcessed, Some(z.name())))).minBy(_._1)
+ case b: HTStruct => b.value.map(x=> getLowestCountUnionBranch(x)).minBy(_._1)
+ case c: HTArray => getLowestCountUnionBranch(c.value)
+ case d: HTArrayFW => getLowestCountUnionBranch(d.value)
+ case e: HTOption => getLowestCountUnionBranch(e.value)
+ case f: HTBaseType => (Int.MaxValue, None)
+ }
+ }
}
abstract class HigherType {
var fc = HigherType.getFieldCount()
@@ -213,7 +224,7 @@ object RBTypes {
def getDefaultValue(): JsonNode = {
return null
}
- def prettyprint(offset: Int, showStatistics: Boolean, denom: Int)
+ def prettyprint(offset: Int, showStatistics: Boolean, denom: Int): Unit
}
case class HTStruct(value: List[HigherType]) extends HigherType {
@@ -230,11 +241,11 @@ object RBTypes {
linesProcessed = 0
value.foreach(_.resetUsageStatistics())
}
- def prettyprint(offset: Int, showStatistics: Boolean, denom: Int) {
+ def prettyprint(offset: Int, showStatistics: Boolean, denom: Int): Unit = {
print(" " * offset)
print("HTStruct " + name())
if (showStatistics && denom > 0) {
- print(" (" + (100 * (linesProcessed / denom.toFloat)) + "%)")
+ print(" (" + (100 * (linesProcessed / denom.toDouble)) + "%)")
}
println()
for (v <- value) {
@@ -286,11 +297,11 @@ object RBTypes {
linesProcessed = 0
value.resetUsageStatistics()
}
- def prettyprint(offset: Int, showStatistics: Boolean, denom: Int) {
+ def prettyprint(offset: Int, showStatistics: Boolean, denom: Int): Unit = {
print(" " * offset)
print("HTArray " + name())
if (showStatistics && denom > 0) {
- print(" (" + (100 * (linesProcessed / denom.toFloat)) + "%)")
+ print(" (" + (100 * (linesProcessed / denom.toDouble)) + "%)")
}
println()
@@ -338,11 +349,11 @@ object RBTypes {
linesProcessed = 0
value.foreach(_.resetUsageStatistics())
}
- def prettyprint(offset: Int, showStatistics: Boolean, denom: Int) {
+ def prettyprint(offset: Int, showStatistics: Boolean, denom: Int): Unit = {
print(" " * offset)
print("HTUnion " + name())
if (showStatistics && denom > 0) {
- print(" (" + (100 * (linesProcessed / denom.toFloat)) + "%)")
+ print(" (" + (100 * (linesProcessed / denom.toDouble)) + "%)")
}
println()
@@ -364,11 +375,11 @@ object RBTypes {
def resetUsageStatistics(): Unit = {
linesProcessed = 0
}
- def prettyprint(offset: Int, showStatistics: Boolean, denom: Int) {
+ def prettyprint(offset: Int, showStatistics: Boolean, denom: Int): Unit = {
print(" " * offset)
print("HTBaseType(" + value + ") " + name())
if (showStatistics && denom > 0) {
- print(" (" + (100 * (linesProcessed / denom.toFloat)) + "%)")
+ print(" (" + (100 * (linesProcessed / denom.toDouble)) + "%)")
}
println()
}
@@ -389,7 +400,7 @@ object RBTypes {
def resetUsageStatistics(): Unit = {
linesProcessed = 0
}
- def prettyprint(offset: Int, showStatistics: Boolean, denom: Int) {
+ def prettyprint(offset: Int, showStatistics: Boolean, denom: Int): Unit = {
print(" " * offset)
println("HTNoop")
}
@@ -406,11 +417,11 @@ object RBTypes {
linesProcessed = 0
value.resetUsageStatistics()
}
- def prettyprint(offset: Int, showStatistics: Boolean, denom: Int) {
+ def prettyprint(offset: Int, showStatistics: Boolean, denom: Int): Unit = {
print(" " * offset)
print("HTArrayFW " + name() + " size=" + size )
if (showStatistics && denom > 0) {
- print(" (" + (100 * (linesProcessed / denom.toFloat)) + "%)")
+ print(" (" + (100 * (linesProcessed / denom.toDouble)) + "%)")
}
println()
value.prettyprint(offset+1, showStatistics, denom)
@@ -449,11 +460,11 @@ object RBTypes {
linesProcessed = 0
value.resetUsageStatistics()
}
- def prettyprint(offset: Int, showStatistics: Boolean, denom: Int) {
+ def prettyprint(offset: Int, showStatistics: Boolean, denom: Int): Unit = {
print(" " * offset)
print("HTOption " + name())
if (showStatistics && denom > 0) {
- print(" (" + (100 * (linesProcessed / denom.toFloat)) + "%)")
+ print(" (" + (100 * (linesProcessed / denom.toDouble)) + "%)")
}
println()
value.prettyprint(offset+1, showStatistics, denom)
View
59 src/main/scala/com/cloudera/recordbreaker/learnstructure2/Rewrite.scala
@@ -27,10 +27,14 @@ object Rewrite {
// Code to refine initial schema estimate
///////////////////////////////////////////
- /** refineAll() applies refinement rules to a given HigherType hierarchy and some Chunks
- * It is the only public method in this package
+ /**
+ * refineAll() applies refinement rules to a given HigherType hierarchy and some Chunks.
+ *
+ * It is not optional: many inferred HigherType structures will translate to illegal Avro
+ * structures before they go through the refinment process
*/
val dataIndependentRules = List(removeNestedUnions _, rewriteSingletons _, cleanupStructUnion _, transformUniformStruct _, commonUnionPostfix _, filterNoops _, combineAdjacentStringConstants _, combineHomogeneousArrays _)
+
def refineAll(orig: HigherType, input: Chunks) = {
val dataDependentRules = List()
val round1 = refine(orig, dataIndependentRules, costEncoding)
@@ -42,8 +46,14 @@ object Rewrite {
round4
}
- def dropComponent(orig: HigherType, label: String): HigherType = {
- val dropComponents = List(dropComponent(label) _)
+ /**
+ * dropComponent() removes a targeted UNION branch from a given HigherType structure.
+ * The intention here is that the user picks a UNION branch that processes only a small
+ * and unimportant portion of the overall input file; its removal simplifies the structure
+ * but has minimal impact on the parser's actual operation
+ */
+ def dropComponent(orig: HigherType, labels: List[String]): HigherType = {
+ val dropComponents = List(dropComponent(labels) _)
val origScore = costEncoding(orig)
val round1 = refine(orig, dropComponents, costEncoding)
val newScore = costEncoding(round1)
@@ -56,6 +66,36 @@ object Rewrite {
}
/**
+ * automaticDropComponent() uses an algorithm to figure out how many and which
+ * UNION branches to remove. It repeatedly removes the lowest-impact branches
+ * that still permit it to satisfy the user-given parse fraction requirement
+ */
+ def automaticDropComponent(orig: HigherType, input: ParsedChunks, minParseFraction: Double): HigherType = {
+ def computeMissingFraction(ht: HigherType, inc: ParsedChunks): Double = {
+ HigherType.resetUsageStatistics(ht)
+ Processor.process(inc, ht)
+ HigherType.missingCount / HigherType.denomCount.toDouble
+ }
+
+ // Very un-scala-like. What is the scala idiom for the below case?
+ var curStructure = orig
+ HigherType.resetUsageStatistics(curStructure)
+ Processor.process(input, curStructure)
+ while (true) {
+ val bestTupleToRemove = HigherType.getLowestCountUnionBranch(curStructure)
+ if (bestTupleToRemove._2.isEmpty) {
+ return curStructure
+ }
+ val refinedStructure = Rewrite.dropComponent(curStructure, List(bestTupleToRemove._2.get))
+ if ((1-computeMissingFraction(refinedStructure, input)) < minParseFraction) {
+ return curStructure
+ }
+ curStructure = refinedStructure
+ }
+ return curStructure
+ }
+
+ /**
* A single step in the refinement process
*/
private def refine(orig: HigherType, rewriteRules:List[HigherType=>HigherType], costFn:HigherType=>Double): HigherType = {
@@ -77,6 +117,11 @@ object Rewrite {
n.fc = d.fc
n
}
+ case e: HTOption => {
+ val n = HTOption(refine(e.value, rewriteRules, costFn))
+ n.fc = e.fc
+ n
+ }
case _ => orig
}
oneStep(newVersion, rewriteRules, costFn)
@@ -185,11 +230,11 @@ object Rewrite {
}
}
- private def dropComponent(label: String)(in: HigherType): HigherType = {
+ private def dropComponent(labels: List[String])(in: HigherType): HigherType = {
in match {
- case a: HTUnion if (a.value.exists(_.name() == label) &&
+ case a: HTUnion if (a.value.exists(x=>labels.contains(x.name())) &&
a.value.length > 1) => {
- HTUnion(a.value.filterNot(_.name() == label))
+ HTUnion(a.value.filterNot(x=>labels.contains(x.name())))
}
case _ => in
}
Please sign in to comment.
Something went wrong with that request. Please try again.