Skip to content

Commit

Permalink
Refactored Integrity Check into column Rule
Browse files Browse the repository at this point in the history
  • Loading branch information
valydia committed Nov 13, 2015
1 parent c7bfcaa commit d0012ef
Show file tree
Hide file tree
Showing 17 changed files with 649 additions and 498 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait AllErrorsMetaDataValidator extends MetaDataValidator {
results.reverse
} else {
val row = rows.next()
val result = validateRow(row, schema)
val result = validateRow(row, schema, Some(rows.hasNext))
validateRows(result :: results)
}
}
Expand All @@ -36,26 +36,27 @@ trait AllErrorsMetaDataValidator extends MetaDataValidator {
v.sequence[MetaDataValidation, Any]
}

override protected def rules(row: Row, schema: Schema): MetaDataValidation[List[Any]] = {

override protected def rules(row: Row, schema: Schema, mayBeLast: Option[Boolean] = None): MetaDataValidation[List[Any]] = {
val cells: (Int) => Option[Cell] = row.cells.lift
val v = schema.columnDefinitions.zipWithIndex.map {
case (columnDefinition, columnIndex) =>
validateCell(columnIndex, cells, row, schema)
validateCell(columnIndex, cells, row, schema, mayBeLast)
}

v.sequence[MetaDataValidation, Any]
}

override protected def rulesForCell(columnIndex: Int, row: Row, schema: Schema): MetaDataValidation[Any] = {
override protected def rulesForCell(columnIndex: Int, row: Row, schema: Schema, mayBeLast: Option[Boolean] = None): MetaDataValidation[Any] = {

val columnDefinition = schema.columnDefinitions(columnIndex)

def isWarningDirective: Boolean = columnDefinition.directives.contains(Warning())
def isOptionDirective: Boolean = columnDefinition.directives.contains(Optional())

if(row.cells(columnIndex).value.trim.isEmpty && isOptionDirective) true.successNel
else columnDefinition.rules.map(_.evaluate(columnIndex, row, schema)).map{ ruleResult:Rule#RuleValidation[Any] => {
else columnDefinition.rules.map(_.evaluate(columnIndex, row, schema, mayBeLast)).map{ ruleResult:Rule#RuleValidation[Any] => {
if(isWarningDirective) toWarnings(ruleResult, row.lineNumber, columnIndex) else toErrors(ruleResult, row.lineNumber, columnIndex)
}}.sequence[MetaDataValidation, Any]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait FailFastMetaDataValidator extends MetaDataValidator {
results.reverse
} else {
val row = rows.next()
val result = validateRow(row, schema)
val result = validateRow(row, schema, Some(rows.hasNext))
validateRows(result :: results)
}
}
Expand All @@ -41,7 +41,7 @@ trait FailFastMetaDataValidator extends MetaDataValidator {
v.sequence[MetaDataValidation, Any]
}

override protected def rules(row: Row, schema: Schema): MetaDataValidation[List[Any]] = {
override protected def rules(row: Row, schema: Schema, mayBeLast: Option[Boolean] = None): MetaDataValidation[List[Any]] = {
val cells: (Int) => Option[Cell] = row.cells.lift

@tailrec
Expand All @@ -54,7 +54,7 @@ trait FailFastMetaDataValidator extends MetaDataValidator {
accum.reverse

case (columnDefinition, columnIndex) :: tail =>
validateCell(columnIndex, cells, row, schema) match {
validateCell(columnIndex, cells, row, schema, mayBeLast) match {
case failure @ Failure(_) if(!schema.columnDefinitions(columnIndex).directives.contains(Warning())) =>
validateRules(List.empty, failure :: accum) //stop on first failure which is not a warning
case result =>
Expand All @@ -67,7 +67,7 @@ trait FailFastMetaDataValidator extends MetaDataValidator {
v.sequence[MetaDataValidation, Any]
}

override protected def rulesForCell(columnIndex: Int, row: Row, schema: Schema): MetaDataValidation[Any] = {
override protected def rulesForCell(columnIndex: Int, row: Row, schema: Schema, mayBeLast: Option[Boolean] = None): MetaDataValidation[Any] = {
val columnDefinition = schema.columnDefinitions(columnIndex)

def isWarningDirective: Boolean = columnDefinition.directives.contains(Warning())
Expand All @@ -76,16 +76,16 @@ trait FailFastMetaDataValidator extends MetaDataValidator {
@tailrec
def validateRulesForCell(rules: List[Rule]): MetaDataValidation[Any] = rules match {
case Nil => true.successNel[FailMessage]
case rule :: tail => rule.evaluate(columnIndex, row, schema) match {
case rule :: tail => rule.evaluate(columnIndex, row, schema, mayBeLast) match {
case e@Failure(_) => toErrors(e, row.lineNumber, columnIndex)
case _ => validateRulesForCell(tail)
}
}

def validateAllRulesForCell(rules: List[Rule]): MetaDataValidation[Any] = rules.map(_.evaluate(columnIndex, row, schema)).map(toWarnings(_, row.lineNumber, columnIndex)).sequence[MetaDataValidation, Any]
def validateAllRulesForCell(rules: List[Rule]): MetaDataValidation[Any] = rules.map(_.evaluate(columnIndex, row, schema, mayBeLast)).map(toWarnings(_, row.lineNumber, columnIndex)).sequence[MetaDataValidation, Any]

if(row.cells(columnIndex).value.trim.isEmpty && isOptionDirective) true.successNel
else if(isWarningDirective) validateAllRulesForCell(columnDefinition.rules)
else validateRulesForCell(columnDefinition.rules)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ trait MetaDataValidator {
type MetaDataValidation[S] = ValidationNel[FailMessage, S]

def validate(csv: JReader, schema: Schema, progress: Option[ProgressCallback]): MetaDataValidation[Any] = {

//try to find the number of rows for the
//purposes pf reporting progress
//can only do that if we can reset()
Expand All @@ -53,12 +52,12 @@ trait MetaDataValidator {
}

/**
* Browse csv File and return all the titleIndex as a list
* @param csv the CSV reader
* @param schema the Schema
* @param columnIndex the index of the column to be return
* @return all the element of the column columnIndex
*/
* Browse csv File and return all the titleIndex as a list
* @param csv the CSV reader
* @param schema the Schema
* @param columnIndex the index of the column to be return
* @return all the element of the column columnIndex
*/
def getColumn(csv: JReader, schema: Schema, columnIndex: Int): List[String] = {

val separator = schema.globalDirectives.collectFirst {
Expand Down Expand Up @@ -134,6 +133,7 @@ trait MetaDataValidator {
// if 'no header' is not set and the file is empty - this is an error
// if 'no header' is not set and 'permit empty' is not set but the file contains only one line - this is an error


val rowIt = new RowIterator(reader, progress)

val maybeNoData =
Expand Down Expand Up @@ -173,16 +173,25 @@ trait MetaDataValidator {
case Left(ts) =>
//TODO emit all errors not just first!
ErrorMessage(ts(0).toString).failureNel[Any]
//ts.toList.map(t => ErrorMessage(t.toString).failureNel[Any]).sequence[MetaDataValidation, Any]
//ts.toList.map(t => ErrorMessage(t.toString).failureNel[Any]).sequence[MetaDataValidation, Any]
}
}

/**
* Return the column at the index columnIndex
* @param rows the row iterator
* @param columnIndex the index of the column
* @return List of string of all element at the columnIndex
*/
* Performs some extra validation when all rows have been validated
* @param csv
* @param schema
* @param progress
* @return
*/
def postValidate(csv: JReader, schema: Schema, progress: Option[ProgressFor]): MetaDataValidation[Any] = true.successNel[FailMessage]

/**
* Return the column at the index columnIndex
* @param rows the row iterator
* @param columnIndex the index of the column
* @return List of string of all element at the columnIndex
*/
def getColumn(rows: Iterator[Row], columnIndex: Int): List[String] =
rows.foldLeft(List[String]()){ (acc,row) =>
acc :+ filename(row, columnIndex)
Expand All @@ -193,6 +202,7 @@ trait MetaDataValidator {

def validateRows(rows: Iterator[Row], schema: Schema): MetaDataValidation[Any]


def validateHeader(header: Row, schema: Schema): Option[MetaDataValidation[Any]] = {
val icnc: Option[IgnoreColumnNameCase] = schema.globalDirectives.collectFirst {case i @ IgnoreColumnNameCase() => i }

Expand All @@ -209,9 +219,9 @@ trait MetaDataValidator {
Some(ErrorMessage(s"Metadata header, cannot find the column headers - ${Util.diff(schemaHeader.toSet, headerList.toSet).mkString(", ")} - .${if (icnc.isEmpty) " (Case sensitive)" else ""}").failNel[Any])
}

def validateRow(row: Row, schema: Schema): MetaDataValidation[Any] = {
def validateRow(row: Row, schema: Schema, mayBeLast: Option[Boolean] = None): MetaDataValidation[Any] = {
val totalColumnsV = totalColumns(row, schema)
val rulesV = rules(row, schema)
val rulesV = rules(row, schema, mayBeLast)
(totalColumnsV |@| rulesV) { _ :: _ }
}

Expand All @@ -224,19 +234,19 @@ trait MetaDataValidator {
else ErrorMessage(s"Expected @totalColumns of ${tc.get.numberOfColumns} and found ${row.cells.length} on line ${row.lineNumber}", Some(row.lineNumber), Some(row.cells.length)).failureNel[Any]
}

protected def rules(row: Row, schema: Schema): MetaDataValidation[List[Any]]
protected def rules(row: Row, schema: Schema, mayBeLast: Option[Boolean] = None): MetaDataValidation[List[Any]]

protected def validateCell(columnIndex: Int, cells: (Int) => Option[Cell], row: Row, schema: Schema): MetaDataValidation[Any] = {
protected def validateCell(columnIndex: Int, cells: (Int) => Option[Cell], row: Row, schema: Schema, mayBeLast: Option[Boolean] = None): MetaDataValidation[Any] = {
cells(columnIndex) match {
case Some(c) => rulesForCell(columnIndex, row, schema)
case Some(c) => rulesForCell(columnIndex, row, schema, mayBeLast)
case _ => ErrorMessage(s"Missing value at line: ${row.lineNumber}, column: ${schema.columnDefinitions(columnIndex).id}", Some(row.lineNumber), Some(columnIndex)).failureNel[Any]
}
}

protected def toWarnings(results: Rule#RuleValidation[Any], lineNumber: Int, columnIndex: Int): MetaDataValidation[Any] = results.leftMap(_.map(WarningMessage(_, Some(lineNumber), Some(columnIndex))))
protected def toErrors(results: Rule#RuleValidation[Any], lineNumber: Int, columnIndex: Int): MetaDataValidation[Any] = results.leftMap(_.map(ErrorMessage(_, Some(lineNumber), Some(columnIndex))))

protected def rulesForCell(columnIndex: Int, row: Row, schema: Schema): MetaDataValidation[Any]
protected def rulesForCell(columnIndex: Int, row: Row, schema: Schema, mayBeLast: Option[Boolean] = None): MetaDataValidation[Any]

protected def countRows(textFile: TextFile): Int = {
withReader(textFile) {
Expand Down Expand Up @@ -278,9 +288,9 @@ trait MetaDataValidator {
trait ProgressCallback {

/**
* A percentage is always between
* 0 and 100 inclusive
*/
* A percentage is always between
* 0 and 100 inclusive
*/
type Percentage = Float


Expand Down Expand Up @@ -325,4 +335,4 @@ class RowIterator(reader: CSVReader, progress: Option[ProgressFor]) extends Iter
override def hasNext: Boolean = current.nonEmpty

private def toRow(rowData: Option[Array[String]]): Option[Row] = rowData.map(data => Row(data.toList.map(Cell(_)), index))
}
}
Loading

0 comments on commit d0012ef

Please sign in to comment.