Skip to content

Commit

Permalink
Refine API.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Dec 21, 2016
1 parent 68d1e98 commit 6677beb
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ abstract class Collect[T] extends TypedImperativeAggregate[T] {
// actual order of input rows.
override def deterministic: Boolean = false

def _serialize(obj: Iterable[Any]): Array[Byte] = {
protected def _serialize(obj: Iterable[Any]): Array[Byte] = {
val buffer = new Array[Byte](4 << 10) // 4K
val bos = new ByteArrayOutputStream()
val out = new DataOutputStream(bos)
Expand All @@ -69,7 +69,7 @@ abstract class Collect[T] extends TypedImperativeAggregate[T] {
}
}

def _deserialize(bytes: Array[Byte], updater: (Any) => Any): Unit = {
protected def _deserialize(bytes: Array[Byte], updater: (Any) => Unit): Unit = {
val bis = new ByteArrayInputStream(bytes)
val ins = new DataInputStream(bis)
try {
Expand All @@ -90,7 +90,6 @@ abstract class Collect[T] extends TypedImperativeAggregate[T] {
bis.close()
}
}

}

/**
Expand Down Expand Up @@ -144,7 +143,7 @@ case class CollectList(
override def serialize(obj: ArrayBuffer[Any]): Array[Byte] = _serialize(obj)
override def deserialize(bytes: Array[Byte]): ArrayBuffer[Any] = {
val buffer = new ArrayBuffer[Any]()
val updater = (value: Any) => buffer += value
val updater: (Any) => Unit = (value) => buffer += value
_deserialize(bytes, updater)
buffer
}
Expand Down Expand Up @@ -209,7 +208,7 @@ case class CollectSet(
override def serialize(obj: HashSet[Any]): Array[Byte] = _serialize(obj)
override def deserialize(bytes: Array[Byte]): HashSet[Any] = {
val buffer = new HashSet[Any]()
val updater = (value: Any) => buffer += value
val updater: (Any) => Unit = (value) => buffer += value
_deserialize(bytes, updater)
buffer
}
Expand Down

0 comments on commit 6677beb

Please sign in to comment.