Skip to content

Commit

Permalink
fix OnsUtils createDefaultStreams lost message body (#278)
Browse files Browse the repository at this point in the history
fix OnsUtils createDefaultStreams lost message body
  • Loading branch information
wenxuanguan authored and uncleGen committed Feb 19, 2019
1 parent 73dc20a commit a60dda2
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.streaming.aliyun.ons

import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializeFilter
import com.aliyun.openservices.ons.api.Message
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.function.{Function => JFunction}
Expand Down Expand Up @@ -186,7 +188,13 @@ object OnsUtils {
(msg: Message) => func.call(msg))
}

private def extractMessage(msg: Message): Array[Byte] = msg.toString.getBytes
private def extractMessage(msg: Message): Array[Byte] = {
JSON.toJSONBytes(msg)
}

def toMessage(msgJson: Array[Byte]): Message = {
JSON.parseObject(new String(msgJson), classOf[Message])
}

@Experimental
def createDefaultStreams(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.aliyun.emr.example.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.aliyun.ons.OnsUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class TestAliyunONS {
public static void main(String[] args) throws Exception{
if (args.length < 6) {
System.out.println("usage: TestAliyunONS <duration second> <consumer-id>"
+ " <topic> <tags> <access-key-id> <access-key>");
System.exit(1);
}

long duration = Long.parseLong(args[0]) * 1000;
String consumerId = args[1];
String topic = args[2];
String tags= args[3];
String accessId = args[4];
String accessKey = args[5];

JavaStreamingContext jssc = new JavaStreamingContext(new SparkConf().setAppName("test-aliyun-ons"),
new Duration(duration));
OnsUtils.createDefaultStreams(jssc, consumerId, topic, tags, accessId, accessKey, StorageLevel.MEMORY_AND_DISK_2())
.foreachRDD(new VoidFunction<JavaRDD<byte[]>>() {
@Override
public void call(JavaRDD<byte[]> javaRDD) throws Exception {
javaRDD.foreach(new VoidFunction<byte[]>() {
@Override
public void call(byte[] bytes) throws Exception {
System.out.println(new String(OnsUtils.toMessage(bytes).getBody()));
}
});
}
});
jssc.start();
jssc.awaitTermination();
}
}

0 comments on commit a60dda2

Please sign in to comment.