Skip to content

Commit

Permalink
add logo
Browse files Browse the repository at this point in the history
  • Loading branch information
andrew committed Jan 19, 2018
1 parent cf65249 commit 765856a
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 7 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -12,13 +12,13 @@ Lenses offers SQL (for data browsing and Kafka Streams), Kafka Connect connector
You can find more on [landoop.com!](http://www.landoop.com/kafka-lenses/)

<p align="center">
<a href="http://www.landoop.com/kafka-lenses/" target="_blank"><img src="http://www.landoop.com/docs/lenses/_images/lenses-data-management-platform.png"/></a>
<a href="http://www.landoop.com/kafka-lenses/" target="_blank"><img src="https://www.landoop.com/images/new/bg-12.gif"/></a>
</p>

# Stream Reactor
Streaming reference architecture built around Kafka.

![Alt text](https://datamountaineer.com/wp-content/uploads/2016/01/stream-reactor-1.jpg)

![Alt text](images/streamreactor-logo.png)

A collection of components to build a real time ingestion pipeline.

Expand Down
Binary file removed images/DM-logo.jpg
Binary file not shown.
Binary file added images/streamreactor-logo.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Expand Up @@ -39,17 +39,14 @@ class PulsarWriter(client: PulsarClient, settings: PulsarSinkSettings) extends S
// Enable compression
//TODO: pick it up from settings
conf.setCompressionType(CompressionType.LZ4)
private var producersMap = scala.collection.mutable.Map.empty[String, Producer]
private val producersMap = scala.collection.mutable.Map.empty[String, Producer]

//initialize error tracker
initialize(settings.maxRetries, settings.errorPolicy)
private val mappings: Map[String, Set[Kcql]] = settings.kcql.groupBy(k => k.getSource)
private val kcql = settings.kcql

def write(records: Iterable[SinkRecord]) = {

val grouped = records.groupBy(r => r.topic())

val t = Try {
records.foreach { record =>
val topic = record.topic()
Expand Down Expand Up @@ -86,6 +83,7 @@ class PulsarWriter(client: PulsarClient, settings: PulsarSinkSettings) extends S
def flush = {}

def close = {
logger.info("Closing client")
client.close()
}
}

0 comments on commit 765856a

Please sign in to comment.