Skip to content

Commit

Permalink
Reuse stencil client between Spark Tasks (#58)
Browse files Browse the repository at this point in the history
* reuse stencil client between tasks

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* format

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Apr 19, 2021
1 parent 4b4ade3 commit 5730cf4
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ import com.gojek.de.stencil.StencilClientFactory
import com.gojek.de.stencil.client.StencilClient

class StencilProtoRegistry(val url: String) extends ProtoRegistry {
import StencilProtoRegistry.stencilClient

override def getProtoDescriptor(className: String): Descriptors.Descriptor = {
stencilClient(url).get(className)
}
}

object StencilProtoRegistry {
@transient
private var _stencilClient: StencilClient = _

def stencilClient: StencilClient = {
def stencilClient(url: String): StencilClient = {
if (_stencilClient == null) {
_stencilClient = StencilClientFactory.getClient(url, Collections.emptyMap[String, String])
}
_stencilClient
}

override def getProtoDescriptor(className: String): Descriptors.Descriptor = {
stencilClient.get(className)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,13 @@ object ProtoReflection {
}

def createMessageParser(protoRegistry: ProtoRegistry, className: String): Array[Byte] => Row = {
// perform request to registry in driver, so serialized protoRegistry will have cached descriptor
protoRegistry.getProtoDescriptor(className)
bytes =>
{
val protoDescriptor = protoRegistry.getProtoDescriptor(className)

bytes => {
val protoDescriptor = protoRegistry.getProtoDescriptor(className)

Try { DynamicMessage.parseFrom(protoDescriptor, bytes) }
.map(messageToRow(protoDescriptor, _))
.getOrElse(null)
}
Try { DynamicMessage.parseFrom(protoDescriptor, bytes) }
.map(messageToRow(protoDescriptor, _))
.getOrElse(null)
}
}
}

0 comments on commit 5730cf4

Please sign in to comment.