Skip to content

Commit

Permalink
Travis
Browse files Browse the repository at this point in the history
  • Loading branch information
sonsoleslp committed Nov 29, 2018
1 parent 2c20a80 commit 8546627
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 0 deletions.
31 changes: 31 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,37 @@ under the License.
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--test dependencies-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.16</version>
<scope>test</scope>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.httpcomponents</groupId>-->
<!--<artifactId>httpclient</artifactId>-->
<!--<version>4.5.2</version>-->
<!--<scope>test</scope>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-table_${scala.binary.version}</artifactId>-->
<!--<version>${flink.version}</version>-->
<!--<scope>test</scope>-->
<!--</dependency>-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class OrionHttpHandler(
throw new Exception("Only POST requests are allowed")
}

println("Something received")

// Retrieve headers
val headerEntries = req.headers().entries()
val service = headerEntries.get(4).getValue()
Expand Down
3 changes: 3 additions & 0 deletions src/test/.travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
language: scala
scala:
- 2.11.11
25 changes: 25 additions & 0 deletions src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

log4j.rootLogger=INFO, console
#log4j.rootLogger= console


log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fiware.cosmos.orion.flink.connector.tests

import java.util
import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}

import com.alibaba.fastjson.JSONObject
import org.apache.http.NameValuePair
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.methods.{HttpGet, HttpPost}
import org.apache.http.impl.client.HttpClients
import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.EntityUtils
import org.slf4j.{Logger, LoggerFactory}

/** base test util */
class BaseTest {
lazy val logger: Logger = LoggerFactory.getLogger(getClass)
private lazy val httpclient = HttpClients.createDefault()
private lazy val schedule = Executors.newScheduledThreadPool(20)
private lazy val pool = Executors.newCachedThreadPool()


def schedule(period: Int, f: () => Unit): Unit = {
schedule.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
f.apply()
}
}, 3, period, TimeUnit.SECONDS)
}

def run(f: () => Unit): Unit = {
pool.submit(new Runnable {
override def run(): Unit = {
f.apply()
}
})
}

def sendGetRequest(url: String): String = {
val httpGet = new HttpGet(url)
val response1 = httpclient.execute(httpGet)
try {
logger.info(s"response: ${response1.getStatusLine}, url:$url")
val entity = response1.getEntity
EntityUtils.toString(entity)
} finally {
response1.close()
}
}

def sendPostRequest(url: String, map: Map[String, String]): String = {
val httpPost = new HttpPost(url)
val nvps = new util.ArrayList[NameValuePair]()
map.foreach { kv =>
nvps.add(new BasicNameValuePair(kv._1, kv._2))
}
httpPost.setEntity(new UrlEncodedFormEntity(nvps))
val response = httpclient.execute(httpPost)
try {
logger.info("response status line:" + response.getStatusLine)
val entity2 = response.getEntity
EntityUtils.toString(entity2)
} finally {
response.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fiware.cosmos.orion.flink.connector.tests

import java.net.URLEncoder
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}

import com.alibaba.fastjson.JSONObject
import com.fasterxml.jackson.databind.ObjectMapper

import scala.util.Random

/**
* http client
*/
object HttpSourceExample extends BaseTest {

def main(args: Array[String]): Unit = {
val queue = new LinkedBlockingQueue[JSONObject]()

run(() => StreamSqlExample.main(Array("--http", "true")))

Thread.sleep(5000)
while (true) {
// val json = queue.poll(Int.MaxValue, TimeUnit.SECONDS)
// logger.info("====request register from netty tcp source: " + json)
// val url = s"http://${json.getString("ip")}:${json.getString("port")}/payload?msg="
val url = s"http://localhost:7070"
schedule(5, () => {
val line = s"${Random.nextInt(5)},abc,${Random.nextInt(100)}"
val mapper = new ObjectMapper()
// val json = mapper.readTree(jsonString)
/* new JSONObject(Map("id" -> "R1",
"co" -> Map("type" -> "Float", "value" -> Random.nextInt(100), "metadata" -> Map()),
"co2" -> Map("type" -> "Float", "value" -> Random.nextInt(100), "metadata" -> Map()),
"humidity" -> Map("type" -> "Float", "value" -> Random.nextInt(100), "metadata" -> Map()),
"pressure" -> Map("type" -> "Float", "value" -> Random.nextInt(100), "metadata" -> Map()),
"temperature" -> Map("type" -> "Float", "value" -> Random.nextInt(100), "metadata" -> Map()),
"wind_speed" -> Map("type" -> "Float", "value" -> Random.nextInt(100), "metadata" -> Map()),
))*/
val jsonr =Map("data" -> "{}","subscriptionId" -> "57458eb60962ef754e7c0998")

sendPostRequest(url,jsonr)
})
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fiware.cosmos.orion.flink.connector.tests

import io.netty.bootstrap.Bootstrap
import io.netty.channel._
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.string.{StringDecoder, StringEncoder}
import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters}
import io.netty.handler.logging.{LogLevel, LoggingHandler}
import org.slf4j.LoggerFactory

private class NettyClient(host: String, port: Int) extends Thread {
private lazy val logger = LoggerFactory.getLogger(getClass)

private lazy val group: EventLoopGroup = new NioEventLoopGroup
private var ch: Channel = _

def shutdown(): Unit = {
group.shutdownGracefully()
}

def send(line: String): Unit = {
if (ch.isActive && ch != null) {
ch.writeAndFlush(line + "\n")
logger.info("client send msg: "
+ s"${ch.isActive} ${ch.isOpen} ${ch.isRegistered} ${ch.isWritable}")
} else {
logger.info("client fail send msg, "
+ s"${ch.isActive} ${ch.isOpen} ${ch.isRegistered} ${ch.isWritable}")
}
}

override def run(): Unit = {

val b: Bootstrap = new Bootstrap
b.group(group)
.channel(classOf[NioSocketChannel])
.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
.option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer[SocketChannel]() {
def initChannel(ch: SocketChannel) {
val p: ChannelPipeline = ch.pipeline
p.addLast(new LoggingHandler(LogLevel.INFO))
p.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter(): _*))
p.addLast(new StringEncoder())
p.addLast(new StringDecoder())
p.addLast(new NettyClientHandler)
}
})
// Start the client.
val f: ChannelFuture = b.connect(host, port).sync()
ch = f.channel()
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fiware.cosmos.orion.flink.connector.tests

import io.netty.channel._
import org.slf4j.LoggerFactory

import scala.util.Random


final class NettyClientHandler extends SimpleChannelInboundHandler[String] {
private lazy val logger = LoggerFactory.getLogger(getClass)

override def channelActive(ctx: ChannelHandlerContext): Unit = {
val ch = ctx.channel()
logger.info(s"active channel: $ch")
}

override def channelInactive(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
logger.info(s"inactive channel: $ch")
}

override def channelRead0(ctx: ChannelHandlerContext, msg: String) {
logger.info("receive message:" + msg)
ctx.writeAndFlush(Random.nextLong() + ",sjk," + Random.nextInt())
}

override def channelReadComplete(ctx: ChannelHandlerContext): Unit = ctx.flush

override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
cause.printStackTrace()
ctx.close
}
}

0 comments on commit 8546627

Please sign in to comment.