Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion examples/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
<scala.version>2.11.7</scala.version>
<scala.dep.version>2.11</scala.dep.version>
<spark.version>2.3.0</spark.version>
<bullet.spark.version>0.1.0</bullet.spark.version>
<bullet.spark.version>0.1.1</bullet.spark.version>
<bullet.record.version>0.2.0</bullet.record.version>
</properties>

<repositories>
Expand Down Expand Up @@ -44,11 +45,17 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-spark</artifactId>
<version>${bullet.spark.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-record</artifactId>
<version>${bullet.record.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
package com.yahoo.bullet.spark.examples.receiver

import java.util.UUID
import java.util.HashMap
import java.util.Arrays.asList

import scala.collection.JavaConverters._
import scala.util.Random

import com.yahoo.bullet.record.BulletRecord
import com.yahoo.bullet.record.{BulletRecord, SimpleBulletRecord}
import com.yahoo.bullet.spark.utils.{BulletSparkConfig, BulletSparkLogger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
Expand Down Expand Up @@ -90,36 +91,37 @@ class RandomReceiver(val config: BulletSparkConfig)
}

private def generateRecord(): BulletRecord = {
val record = new BulletRecord()
val record = new SimpleBulletRecord()
val uuid = UUID.randomUUID().toString
record.setString(RandomReceiver.STRING, uuid)
record.setLong(RandomReceiver.LONG, generatedThisPeriod)
record.setDouble(RandomReceiver.DOUBLE, Random.nextDouble())
record.setString(RandomReceiver.TYPE, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
record.setLong(RandomReceiver.DURATION, System.nanoTime() % RandomReceiver.INTEGER_POOL(Random.nextInt(RandomReceiver.INTEGER_POOL.length)))
val booleanMap = Map[java.lang.String, java.lang.Boolean](
uuid.substring(0, 8) -> Random.nextBoolean(),
uuid.substring(9, 13) -> Random.nextBoolean(),
uuid.substring(14, 18) -> Random.nextBoolean(),
uuid.substring(19, 23) -> Random.nextBoolean()
)
record.setBooleanMap(RandomReceiver.BOOLEAN_MAP, booleanMap.asJava)
val statsMap = Map[java.lang.String, java.lang.Long](
RandomReceiver.PERIOD_COUNT -> periodCount,
RandomReceiver.RECORD_NUMBER -> (periodCount * maxPerPeriod + generatedThisPeriod),
RandomReceiver.NANO_TIME -> System.nanoTime(),
RandomReceiver.TIMESTAMP -> System.nanoTime()
)
record.setLongMap(RandomReceiver.STATS_MAP, statsMap.asJava)
val randomMapA = Map[java.lang.String, java.lang.String](
RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)),
RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))
)
val randomMapB = Map[java.lang.String, java.lang.String](
RandomReceiver.RANDOM_MAP_KEY_A -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)),
RandomReceiver.RANDOM_MAP_KEY_B -> RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length))
)
record.setListOfStringMap(RandomReceiver.LIST, List(randomMapA.asJava, randomMapB.asJava).asJava)

// Don't use Scala Map and convert it by asJava when calling setxxxMap method in BulletRecord.
// It converts Scala Map to scala.collection.convert.Wrappers$MapWrapper which is not serializable in scala 2.11.x (https://issues.scala-lang.org/browse/SI-8911).
val booleanMap = new HashMap[java.lang.String, java.lang.Boolean](4)
booleanMap.put(uuid.substring(0, 8), Random.nextBoolean())
booleanMap.put(uuid.substring(9, 13), Random.nextBoolean())
booleanMap.put(uuid.substring(14, 18), Random.nextBoolean())
booleanMap.put(uuid.substring(19, 23), Random.nextBoolean())
record.setBooleanMap(RandomReceiver.BOOLEAN_MAP, booleanMap)

val statsMap = new HashMap[java.lang.String, java.lang.Long](4)
statsMap.put(RandomReceiver.PERIOD_COUNT, periodCount)
statsMap.put(RandomReceiver.RECORD_NUMBER, periodCount * maxPerPeriod + generatedThisPeriod)
statsMap.put(RandomReceiver.NANO_TIME, System.nanoTime())
statsMap.put(RandomReceiver.TIMESTAMP, System.nanoTime())
record.setLongMap(RandomReceiver.STATS_MAP, statsMap)

val randomMapA = new HashMap[java.lang.String, java.lang.String](2)
randomMapA.put(RandomReceiver.RANDOM_MAP_KEY_A, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
randomMapA.put(RandomReceiver.RANDOM_MAP_KEY_B, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
val randomMapB = new HashMap[java.lang.String, java.lang.String](2)
randomMapB.put(RandomReceiver.RANDOM_MAP_KEY_A, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
randomMapB.put(RandomReceiver.RANDOM_MAP_KEY_B, RandomReceiver.STRING_POOL(Random.nextInt(RandomReceiver.STRING_POOL.length)))
record.setListOfStringMap(RandomReceiver.LIST, asList(randomMapA, randomMapB))
record
}
}
Expand Down