Skip to content

Commit

Permalink
Improve coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
sonsoleslp committed May 8, 2019
1 parent 5b8c266 commit 27ef826
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,16 @@ class OrionHttpHandler(
override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
msg match {
case req : FullHttpRequest =>

if (req.method() != HttpMethod.POST) {
throw new Exception("Only POST requests are allowed")
}
val ngsiEvent = parseMessage(req)
if (sc != null && ngsiEvent != null) {
logger.info(write(ngsiEvent))
sc.collect(ngsiEvent)
} else {
exceptionCaught(ctx, new Exception("Request body is not an NgsiEvent"))
}
if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE))
Expand Down Expand Up @@ -110,12 +113,12 @@ class OrionHttpHandler(
//Convert attributes to Attribute objects
.transform((k,v) => MapToAttributeConverter
.unapply(v.asInstanceOf[Map[String,Any]]))
new Entity(entityId,entityType,attrs)
Entity(entityId, entityType, attrs)
})
// Generate timestamp
val creationTime = System.currentTimeMillis
// Generate NgsiEvent
val ngsiEvent = new NgsiEvent(creationTime, service, servicePath, entities, subscriptionId)
val ngsiEvent = NgsiEvent(creationTime, service, servicePath, entities, subscriptionId)
ngsiEvent
} catch {
case e: Exception => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object OrionSink {

try {
val response = client.execute(httpEntity)
logger.info("POST to " + msg.url)
logger.info("Sent to " + msg.url)
} catch {
case e: Exception => {
logger.error(e.toString)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package org.fiware.cosmos.orion.flink.connector.test

import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.OrionSource
import org.fiware.cosmos.orion.flink.connector._
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization.write


object Constants {
Expand All @@ -18,6 +19,7 @@ object Constants {
* @author @sonsoleslp
*/
object FlinkJobTest{
implicit val formats = DefaultFormats

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand All @@ -29,15 +31,18 @@ object FlinkJobTest{
.map(entity => {
val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue()
val pres = entity.attrs("pressure").value.asInstanceOf[Number].floatValue()
new EntityNode( entity.id, temp, pres)
EntityNode(entity.id, temp, pres)
})
.keyBy("id")
.timeWindow(Time.seconds(Constants.MaxWindow), Time.seconds(Constants.MinWindow))

processedDataStream.max("temperature").map(max=> {
simulatedNotification.maxTempVal = max.temperature})
processedDataStream .max("pressure").map(max=> {
simulatedNotification.maxPresVal = max.pressure})
processedDataStream.max("temperature").map(max=> {
simulatedNotification.maxTempVal = max.temperature})
val sinkStream = processedDataStream .max("pressure").map(max=> {
simulatedNotification.maxPresVal = max.pressure
OrionSinkObject(write(max),"http://localhost:3000",ContentType.JSON,HTTPMethod.POST)
})
OrionSink.addSink(sinkStream)

env.execute("Socket Window NgsiEvent")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class OrionConnectorTest extends BaseTest{
}

@Test def buildHttpPostSinkEntity : Unit = {
val os = new OrionSinkObject(Utils.Content, Utils.OrionAddress, ContentType.Plain, HTTPMethod.POST)
val os = OrionSinkObject(Utils.Content, Utils.OrionAddress, ContentType.Plain, HTTPMethod.POST)
val httpMsg = OrionSink.createHttpMsg(os)
val content = scala.io.Source.fromInputStream(httpMsg.getEntity.getContent).mkString

Expand All @@ -112,7 +112,7 @@ class OrionConnectorTest extends BaseTest{
}

@Test def buildHttpPutSinkEntity : Unit = {
val os = new OrionSinkObject(Utils.Content, Utils.OrionAddress, ContentType.JSON, HTTPMethod.PUT)
val os = OrionSinkObject(Utils.Content, Utils.OrionAddress, ContentType.JSON, HTTPMethod.PUT)
val httpMsg = OrionSink.createHttpMsg(os)
val content = scala.io.Source.fromInputStream(httpMsg.getEntity.getContent).mkString

Expand All @@ -122,7 +122,7 @@ class OrionConnectorTest extends BaseTest{
}

@Test def buildHttpPatchSinkEntity : Unit = {
val os = new OrionSinkObject(Utils.Content, Utils.OrionAddress, ContentType.JSON, HTTPMethod.PATCH)
val os = OrionSinkObject(Utils.Content, Utils.OrionAddress, ContentType.JSON, HTTPMethod.PATCH)
val httpMsg = OrionSink.createHttpMsg(os)
val content = scala.io.Source.fromInputStream(httpMsg.getEntity.getContent).mkString

Expand Down Expand Up @@ -181,6 +181,5 @@ class OrionConnectorTest extends BaseTest{
Thread.sleep(Utils.SleepTimeShort)
Assert.assertEquals(simulatedNotification.maxTempVal,originalValue,0)


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ final val notificationVal =
],
"subscriptionId": "57458eb60962ef754e7c0998"
}""".stripMargin
final val defaultTemperature = 20;
final val defaultPressure = 40;
def notification(temperature: Float = defaultTemperature, pressure: Float = defaultPressure) : String = {
final val defaultTemperature = 20
final val defaultPressure = 40

def notification(temperature: Float = defaultTemperature, pressure: Float = defaultPressure) : String = {
s"""{
"data": [
{
Expand Down

0 comments on commit 27ef826

Please sign in to comment.