From 76b8aadcc618bfacc64e682f726c7a20692ce0dc Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Thu, 14 Jun 2018 16:20:06 -0700 Subject: [PATCH 1/3] Switch to SimpleBulletRecord --- examples/spark/pom.xml | 9 +++- .../examples/receiver/RandomReceiver.scala | 53 ++++++++++--------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/examples/spark/pom.xml b/examples/spark/pom.xml index 593c17e3..f918cf58 100644 --- a/examples/spark/pom.xml +++ b/examples/spark/pom.xml @@ -10,7 +10,8 @@ 2.11.7 2.11 2.3.0 - 0.1.0 + 0.1.1 + 0.2.0 @@ -44,11 +45,17 @@ ${spark.version} provided + com.yahoo.bullet bullet-spark ${bullet.spark.version} + + com.yahoo.bullet + bullet-record + ${bullet.record.version} + src/main/scala diff --git a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala index e696cbbe..981bf9ae 100644 --- a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala +++ b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala @@ -6,11 +6,13 @@ package com.yahoo.bullet.spark.examples.receiver import java.util.UUID +import java.util.HashMap +import java.util.Arrays.asList -import scala.collection.JavaConverters._ import scala.util.Random import com.yahoo.bullet.record.BulletRecord +import com.yahoo.bullet.record.SimpleBulletRecord import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkLogger} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver @@ -90,36 +92,37 @@ class RandomReceiver(val config: BulletSparkConfig) } private def generateRecord(): BulletRecord = { - val record = new BulletRecord() + val record = new SimpleBulletRecord() val uuid = UUID.randomUUID().toString record.setString(RandomReceiver.STRING, uuid) record.setLong(RandomReceiver.LONG, generatedThisPeriod) record.setDouble(RandomReceiver.DOUBLE, Random.nextDouble()) record.setString(RandomReceiver.TYPE, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) record.setLong(RandomReceiver.DURATION, System.nanoTime() % RandomReceiver.INTEGER_POOL(Random.nextInt(RandomReceiver.INTEGER_POOL.length))) - val booleanMap = Map[java.lang.String, java.lang.Boolean]( - uuid.substring(0, 8) -> Random.nextBoolean(), - uuid.substring(9, 13) -> Random.nextBoolean(), - uuid.substring(14, 18) -> Random.nextBoolean(), - uuid.substring(19, 23) -> Random.nextBoolean() - ) - record.setBooleanMap(RandomReceiver.BOOLEAN_MAP, booleanMap.asJava) - val statsMap = Map[java.lang.String, java.lang.Long]( - RandomReceiver.PERIOD_COUNT -> periodCount, - RandomReceiver.RECORD_NUMBER -> (periodCount * maxPerPeriod + generatedThisPeriod), - RandomReceiver.NANO_TIME -> System.nanoTime(), - RandomReceiver.TIMESTAMP -> System.nanoTime() - ) - record.setLongMap(RandomReceiver.STATS_MAP, statsMap.asJava) - val randomMapA = Map[java.lang.String, java.lang.String]( - RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)), - RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)) - ) - val randomMapB = Map[java.lang.String, java.lang.String]( - RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)), - RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)) - ) - record.setListOfStringMap(RandomReceiver.LIST, List(randomMapA.asJava, randomMapB.asJava).asJava) + + // Don't use Scala Map and convert it by asJava when calling setxxxMap method in BulletRecord. + // It converts Scala Map to scala.collection.convert.Wrappers$MapWrapper which is not serializable in scala 2.11.x (https://issues.scala-lang.org/browse/SI-8911). + val booleanMap = new HashMap[java.lang.String, java.lang.Boolean](4) + booleanMap.put(uuid.substring(0, 8), Random.nextBoolean()) + booleanMap.put(uuid.substring(9, 13), Random.nextBoolean()) + booleanMap.put(uuid.substring(14, 18), Random.nextBoolean()) + booleanMap.put(uuid.substring(19, 23), Random.nextBoolean()) + record.setBooleanMap(RandomReceiver.BOOLEAN_MAP, booleanMap) + + val statsMap = new HashMap[java.lang.String, java.lang.Long](4) + statsMap.put(RandomReceiver.PERIOD_COUNT, periodCount) + statsMap.put(RandomReceiver.RECORD_NUMBER, periodCount * maxPerPeriod + generatedThisPeriod) + statsMap.put(RandomReceiver.NANO_TIME, System.nanoTime()) + statsMap.put(RandomReceiver.TIMESTAMP, System.nanoTime()) + record.setLongMap(RandomReceiver.STATS_MAP, statsMap) + + val randomMapA = new HashMap[java.lang.String, java.lang.String](2) + randomMapA.put(RandomReceiver.RANDOM_MAP_KEY_A, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) + randomMapA.put(RandomReceiver.RANDOM_MAP_KEY_B, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) + val randomMapB = new HashMap[java.lang.String, java.lang.String](2) + randomMapB.put( RandomReceiver.RANDOM_MAP_KEY_A, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) + randomMapB.put(RandomReceiver.RANDOM_MAP_KEY_B, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) + record.setListOfStringMap(RandomReceiver.LIST, asList(randomMapA, randomMapB)) record } } From d2ae1c9118245b7d852a202888ddf75e275e3f99 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Thu, 14 Jun 2018 16:21:52 -0700 Subject: [PATCH 2/3] minor change --- .../yahoo/bullet/spark/examples/receiver/RandomReceiver.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala index 981bf9ae..c1734b4a 100644 --- a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala +++ b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala @@ -11,8 +11,7 @@ import java.util.Arrays.asList import scala.util.Random -import com.yahoo.bullet.record.BulletRecord -import com.yahoo.bullet.record.SimpleBulletRecord +import com.yahoo.bullet.record.{BulletRecord, SimpleBulletRecord} import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkLogger} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver From d7aab4a097060c563cd2d7ad8d963bbb69de99dc Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Thu, 14 Jun 2018 16:26:18 -0700 Subject: [PATCH 3/3] format --- .../yahoo/bullet/spark/examples/receiver/RandomReceiver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala index c1734b4a..e22f8b22 100644 --- a/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala +++ b/examples/spark/src/main/scala/com/yahoo/bullet/spark/examples/receiver/RandomReceiver.scala @@ -119,7 +119,7 @@ class RandomReceiver(val config: BulletSparkConfig) randomMapA.put(RandomReceiver.RANDOM_MAP_KEY_A, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) randomMapA.put(RandomReceiver.RANDOM_MAP_KEY_B, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) val randomMapB = new HashMap[java.lang.String, java.lang.String](2) - randomMapB.put( RandomReceiver.RANDOM_MAP_KEY_A, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) + randomMapB.put(RandomReceiver.RANDOM_MAP_KEY_A, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) randomMapB.put(RandomReceiver.RANDOM_MAP_KEY_B, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))) record.setListOfStringMap(RandomReceiver.LIST, asList(randomMapA, randomMapB)) record