Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing data #57

Open
flo076 opened this issue Jul 7, 2016 · 4 comments
Open

Missing data #57

flo076 opened this issue Jul 7, 2016 · 4 comments

Comments

@flo076
Copy link

flo076 commented Jul 7, 2016

Hello,

i use this rabbitmq receivers but i have some problems. I used a java project ( spring-amqp ) for publish somes Avro messages in RabbitMq and consume it with scala-spark project.

Let me take an example with simple 'String' message but in my real case i publish avro messages and decode it and save it on HDFS in Parquet Format with PairRddFunction and i lose ~1% of my data .

@Autowired
private RabbitTemplate rabbitTemplate;
...

for (int i = 0; i < 2000000; i++) {
rabbitTemplate.convertAndSend(exchangeSessionActivity, routingKeySessionActivity, RandomStringUtils.randomAlphanumeric(255));
}

and i use distributed receivers to read it transform it and write it on Parquet format in HDFS.

def main(args: Array[String]): Unit = {
    val appName: String = getClass.getSimpleName

    val sparkConf = new SparkConf()
      .setAppName(appName)
      .setIfMissing(SparkConstant.CONF_SPARK_MASTER, settings.sparkMaster)

    // Create the context with a 10 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(10))


    val rabbitMQConnection = Map(
      "hosts" -> "192.168.152.130",
      "queueName" -> "test",
      ConfigParameters.VirtualHostKey -> "/decisionnel",
      "userName" -> "admin",
      "password" -> "password"
    )

    val distributedKeysSa = Seq(
      RabbitMQDistributedKey(
        "test",
        new ExchangeAndRouting(),
        rabbitMQConnection
      )
    )

    val receiverStream = RabbitMQUtils.createDistributedStream[String](ssc, distributedKeysSa, rabbitMQConnection)


    val totalEvents = ssc.sparkContext.accumulator(0L, "Number of events received")
    val totalEventsPair = ssc.sparkContext.accumulator(0L, "Number of events received")

    receiverStream.foreachRDD((rdd, ms) => {
      val pairRdd = rdd.map((null, _)) // Transform RDD to RDD[(K, V)] for have acces of PairRDDFunctions
          // .saveAsHadoopFile(....) save it on HDFS

      val rddCount = rdd.count()
      val pairRddCount = pairRdd.count()

      totalEvents += rddCount
      totalEventsPair += pairRddCount

      println("\n ---------------->")

      println(s"TOTAL EVENTS : \t $totalEvents")
      println(s"TOTAL EVENTS Pair: \t $totalEventsPair")

      if (rddCount != pairRddCount) {
        println("Missing Data")
      }

      println("\n <----------------")
    })

    ssc.start()
    ssc.awaitTermination()
  }

If you execute this code, sometime you will see the log "Missing Data" without any reason or warn or something like that (cf misssing-data.png 67156 instead of 67157).
missing-data

For me totalEvents it's ok but totalEventsPair miss some data

I'm looking for this problems during 3 days but i didn't find any solutions.

For your information, i try to activate Write Ahead Logs, Checkpointing, Back-pressure ...
I use

  • Spark 1.6.1
  • Scala 2.10.5
  • com.stratio.receiver.spark-rabbitmq_1.6 v0.3.0
  • RabbitMq 3.6.0, Erlang R16B03

Tx for your work 👍

@flo076
Copy link
Author

flo076 commented Aug 24, 2016

Hello,

Someone has an idea ?

@nelsou
Copy link
Contributor

nelsou commented Nov 10, 2016

@flo076 did you figure out ? I have the same problem #80 ...

@compae
Copy link
Member

compae commented Dec 15, 2016

Hi @flo076 ,

problably this issue is solved with the pr #83

could you check it??

@flo076
Copy link
Author

flo076 commented Mar 22, 2017

I check it, i think is another problem. I continue to loose some messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants