Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

fix #106 Gearpump Redis Integration #11

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

darionyaphet
Copy link
Contributor

Redis is a hight performance in memory storage , and have widely used in a lot of project .

It's should support redis as DataSource and DataSink .

@kkasravi
Copy link
Contributor

kkasravi commented May 7, 2016

@darionyaphet we probably need some unit tests. Good job on splitting example and actual definitions into example and external. It doesn't look like the build failures are related

[info] StreamAppSpec:
[info] - should be able to generate multiple new streams
[info] - should plan the dsl to Processsor(TaskDescription) DAG *** FAILED ***
[info]   java.io.NotSerializableException: scala.collection.LinearSeqLike$$anon$1
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:235)
[info]   at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:235)
[info]   at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:235)
[info]   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
[info]   ...
[info] - should produce 3 messages *** FAILED ***
[info]   java.io.NotSerializableException: scala.collection.LinearSeqLike$$anon$1
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:235)
[info]   at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:235)
[info]   at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:235)
[info]   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
[info]   ...

@manuzhang
Copy link
Contributor

@darionyaphet thanks for your contributions. please add license headers

@darionyaphet
Copy link
Contributor Author

@manuzhang I have add license headers .

@kkasravi I will add some unit tests .

thank you :)

@huafengw
Copy link
Contributor

@darionyaphet we have changed Gearpump's package name, please rebase our latest code. Sorry for the inconvenient.

@manuzhang
Copy link
Contributor

@darionyaphet also please modify your commit comment to fix GEARPUMP-106, Redis integration which would make our pr merge script happier.

@darionyaphet
Copy link
Contributor Author

Hi @manuzhang what is the commit format ?

I found some rules on Contribution Guideline

For all commit log messages, they must contain issue id. Like this: "fix #issueId, comments".

@darionyaphet
Copy link
Contributor Author

Hi @huafengw sure I will rename packages from io.gearpump to org.apache.gearpump :D

@manuzhang
Copy link
Contributor

@darionyaphet yes, as we use jira now issue id should be GEARPUMP-XXX. You may our recent commits for examples

}
}

object RedisSourceSink extends AkkaApp with ArgumentsParser {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RedisSourceSink doesn't sound like an example and I can't get what the example does from the name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RedisSourceSink which is mean reading from RedisSource and write messages to RedisSink :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe name RedisSourceSinkExample

@manuzhang
Copy link
Contributor

@darionyaphet Could you add this module to the experiments ? That will be low risk before the module is mature. Plus, the experiments doesn't require UT coverage. Also, please add a README for how to use this module.

import taskContext.output

override def onNext(message: Message): Unit = {
val msg = message.msg.asInstanceOf[Option[String]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be better as

Option(message.msg) match {
  case Some(msg) =>
    val upper = msg.asInstanceOf[String].toUpperCase
    LOG.info(s"to Upper $upper")
    outer(new Message(upper, message.timestamp))
  case None =>
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good ! I will update this :)

@huafengw
Copy link
Contributor

Though the new package name is right, you still need to rebase latest master branch to correct the import.
Also, please add some comment to explain the interfaces and classes.

@kkasravi
Copy link
Contributor

@darionyaphet if you are time constrained please let us know and we can make the changes noted above. We would like to get this into the next release if possible (we're targeting in about 2 weeks).
thanks
Kam

@darionyaphet
Copy link
Contributor Author

Sorry to reply it with so long .

@manuzhang @huafengw redis examples class name have rename to RedisSourceSinkExample and RedisSourceStorageExample , also add into BuildExample.scala . It seems better ?

external-redis have move into experiments I will add unit test later :)

@kkasravi I will fix and add some comment at weekend :)

CrossVersion.binaryScalaVersion(scalaVersion.value)
) ++
Seq(
mainClass in (Compile, packageBin) := Some("org.apache.gearpump.streaming.example.redis.RedisSourceStorageExample"),
Copy link
Contributor

@huafengw huafengw May 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not correct, one manifest can only have one main class.

Seq(
         mainClass in (Compile, packageBin) := Some("org.apache.gearpump.streaming.example.redis.RedisSourceSinkExample"),
         target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" /
           CrossVersion.binaryScalaVersion(scalaVersion.value)
       )

is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @huafengw when I package using SBT , the main class will specifies into META-INF ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's what sbt-pack did.

def this(key: String, value: String) = this(toBytes(key), toBytes(value))
}

case class LPushMessage(key: Array[Byte], value: Array[Byte]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear how this message type can be used. LPushMessage looks the same as RPushMessage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LPushMessage and RPushMessage are difference message to control redis client add element at list's head or tail

@manuzhang
Copy link
Contributor

@darionyaphet What's the difference between RedisStorage and RedisSink here since they both extend DataSink ?

@karol-brejna-i
Copy link
Member

karol-brejna-i commented Jun 16, 2016

I think I see the idea behind RedisSink and RedisStorage.

Redis is advertised as "data structure store". It supports structures (like strings, lists, sets, hashes, etc.) and operation on them (add element to a set, remove element; add element at the beginning of a list, at the end; remove element from head/tail of the list, etc.).
It also pub/sub server (http://redis.io/topics/pubsub)

RedisSource and RedisSink work on "classic" message stream utilizing Redis' pub/sub mechanizm. The source subscribes to a channel. The sink publishes to a channel.

RedisStorage reads a message and let you make a Redis command (add to list, set, etc.). The subset of commands is defined in RedisMessage. (I would think about changing the name, though. For example RedisCommandSink or similar).

@manuzhang what do you think?

@codecov-io
Copy link

Current coverage is 64.10%

Sunburst

No coverage report found for master at 25d1fce.

Powered by Codecov. Last updated by 25d1fce...c3686fd

@darionyaphet
Copy link
Contributor Author

@manuzhang actually storm-redis act as a ORM framework and put the message into a redis instance . So it don't have a full command support . I think we can support more command such as delete , ttl and exists .

@karol-brejna-i
Copy link
Member

We are developing under 'experiments' here. I'd be happy to have something decent here. Maybe not full-blown, but functional.
Can we sum up what we still need to do with this PR?
Putting the examples in experiments folder?
Tests?
Updating names?

My point is: Let us release first version of the redis connector and improve it in next releases.

@manuzhang
Copy link
Contributor

@karol-brejna-i @darionyaphet I understand the difference between RedisSink and RedisStorage now but I don't think RedisStorage should extend the DataSink interface which is intended for write. Probably we need a new interface for such usage as RedisStorage. For now, we can add RedisStorage without extending any interface. I totally agree we have a functional version first and improve later as long as all are put in the experiments folder with reasonable naming. Tests are not hard requirement for a experiments module.

@darionyaphet
Copy link
Contributor Author

@karol-brejna-i It's seems have a lot things to do .

  1. Support more command such as : time to live , exist and delete .
  2. Support Redis Cluster and Redis Sharding .
  3. Add some Test Case

@darionyaphet
Copy link
Contributor Author

@manuzhang

HBaseSink seems have the same question .

Sink and Source are support to the data streaming , Storage is meaning to put data into a place .

@karol-brejna-i
Copy link
Member

@darionyaphet

Support more command such as : time to live , exist and delete .
This is the reason I tend to agree @manuzhang. Let's have the "basic" source/sink. Let's make RedisStorage a helper class (not a DataSink). This will let us release a functional version of the connector and improve later.

Support Redis Cluster and Redis Sharding .
Looks like Jedis has cluster support already (https://github.com/xetorthio/jedis#jedis-cluster). Sharding, too. So it would be a matter of using the features. Again, I would try to incorporate the features in next releases.

Add some Test Case
As Manu wrote, test are not the blocker for an experiment module.

My message is: let's release first version of Redis Connector soon and give ourselves time to improve ;-)

@darionyaphet
Copy link
Contributor Author

@kkasravi @manuzhang RedisStorage maybe need more discuss so I have remove it .

After we have a discuss , I will repush it :)

@manuzhang
Copy link
Contributor

@darionyaphet this is partially done in #93. Please rebase or, close and fire a new PR for follow-ups.

manuzhang pushed a commit to manuzhang/incubator-gearpump that referenced this pull request Jan 24, 2017
manuzhang pushed a commit to manuzhang/incubator-gearpump that referenced this pull request Jan 24, 2017
Author: Kam Kasravi <kamkasravi@yahoo.com>

Closes apache#11 from kkasravi/asf-site.
manuzhang pushed a commit to manuzhang/incubator-gearpump that referenced this pull request Jan 24, 2017
Author: Kam Kasravi <kamkasravi@yahoo.com>

Closes apache#11 from kkasravi/asf-site.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
6 participants