Skip to content

Commit

Permalink
Merge 4ac81be into 480c23e
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Nov 8, 2016
2 parents 480c23e + 4ac81be commit 59afc83
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 6 deletions.
Expand Up @@ -50,12 +50,23 @@ trait FieldValue {
def schema: Schema
}

/**
* Abstract class that all record-specific enumerations extend.
*
* @param recordSchema The schema that this enumeration represents.
*/
abstract class FieldEnumeration(val recordSchema: Schema) extends Enumeration {

/**
* An enumerated field that is part of this schema.
*/
class SchemaVal extends Val with FieldValue {

/**
* The schema for the record that this field is in.
*/
def schema = recordSchema
}

protected final def SchemaValue = new SchemaVal()

}
Expand Up @@ -26,39 +26,102 @@ import org.apache.avro.Schema.Field
*/
object Projection {

private def createProjection(fullSchema: Schema, fields: Set[String], exclude: Boolean = false): Schema = {
private def createProjection(fullSchema: Schema,
fields: Set[String],
exclude: Boolean = false): Schema = {
val projectedSchema = Schema.createRecord(fullSchema.getName, fullSchema.getDoc, fullSchema.getNamespace, fullSchema.isError)
projectedSchema.setFields(fullSchema.getFields.filter(createFilterPredicate(fields, exclude))
.map(p => new Field(p.name, p.schema, p.doc, p.defaultValue, p.order)))
projectedSchema
}

private def createFilterPredicate(fieldNames: Set[String], exclude: Boolean = false): Field => Boolean = {
private def createFilterPredicate(fieldNames: Set[String],
exclude: Boolean = false): Field => Boolean = {
val filterPred = (f: Field) => fieldNames.contains(f.name)
val includeOrExlcude = (contains: Boolean) => if (exclude) !contains else contains
filterPred.andThen(includeOrExlcude)
}

// TODO: Unify these various methods
/**
* Creates a projection that includes a variable number of fields.
*
* @param includedFields Fields to include in the projection
* @return Returns the specified schema with the fields predicated out.
*
* @note The schema is inferred from the provided FieldValues. Undefined
* behavior may result if you provide FieldValues from multiple different
* FieldEnumerations.
*
* @throws IllegalArgumentException if there are no fields included in the
* projection.
*/
def apply(includedFields: FieldValue*): Schema = {
assert(!includedFields.isEmpty, "Can't project down to zero fields!")
require(!includedFields.isEmpty, "Can't project down to zero fields!")
Projection(false, includedFields: _*)
}

/**
* Creates a projection that includes a variable number of fields.
*
* @param includedFields Fields to include in the projection
* @return Returns the specified schema with the fields predicated out.
*
* @note The schema is inferred from the provided FieldValues. Undefined
* behavior may result if you provide FieldValues from multiple different
* FieldEnumerations.
*
* @throws IllegalArgumentException if there are no fields included in the
* projection.
*/
def apply(includedFields: Traversable[FieldValue]): Schema = {
assert(includedFields.size > 0, "Can't project down to zero fields!")
require(includedFields.size > 0, "Can't project down to zero fields!")
val baseSchema = includedFields.head.schema
createProjection(baseSchema, includedFields.map(_.toString).toSet, false)
}

/**
* Creates a projection that either includes or excludes a variable number of
* fields.
*
* @param exclude If false, includes the provided fields in the projection.
* If true, the provided fields will be excluded, and all other fields from
* the record will be included in the projection.
* @param includedFields If exclude is false, fields to include in the
* projection. Else, fields to exclude from the projection.
* @return Returns the specified schema with the fields predicated out.
*
* @note The schema is inferred from the provided FieldValues. Undefined
* behavior may result if you provide FieldValues from multiple different
* FieldEnumerations.
*
* @note Unlike the other apply methods, this method will allow you to project
* down to zero fields, which may cause an exception when loading from disk.
*/
def apply(exclude: Boolean, includedFields: FieldValue*): Schema = {
val baseSchema = includedFields.head.schema
createProjection(baseSchema, includedFields.map(_.toString).toSet, exclude)
}
}

/**
* Helper object to create a projection that excludes fields from a schema.
*/
object Filter {

/**
* Creates a projection that excludes fields from a schema.
*
* @param excludedFields The fields to exclude from this projection.
* @return Returns the specified schema with the fields predicated out.
*
* @note The schema is inferred from the provided FieldValues. Undefined
* behavior may result if you provide FieldValues from multiple different
* FieldEnumerations.
*
* @note Unlike the apply methods in Projection, this method will allow you to
* project down to zero fields, which may cause an exception when loading
* from disk.
*/
def apply(excludeFields: FieldValue*): Schema = {
Projection(true, excludeFields: _*)
}
Expand Down

0 comments on commit 59afc83

Please sign in to comment.