Skip to content

Commit

Permalink
CSEN-2 add timestamps to events
Browse files Browse the repository at this point in the history
  • Loading branch information
tomskikh committed Aug 27, 2018
1 parent eb9d8d2 commit 5c76306
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package com.bwsw.cloudstack.entities.events

import java.time.OffsetDateTime
import java.util.UUID

import com.bwsw.cloudstack.entities.TestEntities
Expand All @@ -26,9 +27,15 @@ import com.bwsw.cloudstack.entities.requests.account.AccountCreateRequest.RootAd
import com.bwsw.cloudstack.entities.requests.account.{AccountCreateRequest, AccountDeleteRequest}
import com.bwsw.cloudstack.entities.util.events.RecordToEventDeserializer
import com.bwsw.cloudstack.entities.util.kafka.Consumer
import org.scalatest.{BeforeAndAfterAll, FlatSpec}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}

class AccountEventsRetrievingTest extends FlatSpec with TestEntities with BeforeAndAfterAll {
class AccountEventsRetrievingTest
extends FlatSpec
with TestEntities
with BeforeAndAfterAll
with Matchers {

private val eventDateTime = OffsetDateTime.now()
val accountId: UUID = UUID.randomUUID()
val sleepInterval = 10000
val pollTimeout = 1000
Expand All @@ -48,33 +55,35 @@ class AccountEventsRetrievingTest extends FlatSpec with TestEntities with Before
val consumer = new Consumer(kafkaEndpoint, kafkaTopic)
consumer.assignToEnd()

private val beforeCreation = OffsetDateTime.now().minusSeconds(1)
executor.executeRequest(accountCreateRequest.getRequest)
private val beforeDeletion = OffsetDateTime.now().minusSeconds(1)
executor.executeRequest(accountDeleteRequest.getRequest)

Thread.sleep(sleepInterval)

val records: List[String] = consumer.poll(pollTimeout)

it should "retrieve AccountCreateEvent with status 'Completed' from Kafka records" in {
val expectedAccountCreateEvents = List(AccountCreateEvent(Constants.Statuses.COMPLETED, accountId))

val afterCreation = OffsetDateTime.now()
val actualAccountCreateEvents = records.map(RecordToEventDeserializer.deserializeRecord).filter {
case AccountCreateEvent(Constants.Statuses.COMPLETED, `accountId`) => true
case AccountCreateEvent(Constants.Statuses.COMPLETED, `accountId`, dateTime) =>
dateTime.isAfter(beforeCreation) && dateTime.isBefore(afterCreation)
case _ => false
}

assert(actualAccountCreateEvents == expectedAccountCreateEvents, s"records count: ${records.size}")
actualAccountCreateEvents.length shouldBe 1
}

it should "retrieve AccountDeleteEvent with status 'Completed' from Kafka records" in {
val expectedAccountDeleteEvents = List(AccountDeleteEvent(Constants.Statuses.COMPLETED, accountId))

val afterDeletion = OffsetDateTime.now()
val actualAccountDeleteEvents = records.map(RecordToEventDeserializer.deserializeRecord).filter {
case AccountDeleteEvent(Constants.Statuses.COMPLETED, `accountId`) => true
case AccountDeleteEvent(Constants.Statuses.COMPLETED, `accountId`, dateTime) =>
dateTime.isAfter(beforeDeletion) && dateTime.isBefore(afterDeletion)
case _ => false
}

assert(expectedAccountDeleteEvents == actualAccountDeleteEvents, s"records count: ${records.size}")
actualAccountDeleteEvents.length shouldBe 1
}

override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
*/
package com.bwsw.cloudstack.entities.events

import java.time.OffsetDateTime
import java.util.UUID

import com.bwsw.cloudstack.entities.TestEntities
import com.bwsw.cloudstack.entities.events.user.UserCreateEvent
import com.bwsw.cloudstack.entities.requests.user.UserCreateRequest
import com.bwsw.cloudstack.entities.util.events.RecordToEventDeserializer
import com.bwsw.cloudstack.entities.util.kafka.Consumer
import org.scalatest.FlatSpec
import org.scalatest.{FlatSpec, Matchers}

class UserEventsRetrievingTest
extends FlatSpec
with TestEntities
with Matchers {

class UserEventsRetrievingTest extends FlatSpec with TestEntities {
val userId: UUID = UUID.randomUUID()
val sleepInterval = 5000
val pollTimeout = 1000
Expand All @@ -46,20 +51,21 @@ class UserEventsRetrievingTest extends FlatSpec with TestEntities {
val consumer = new Consumer(kafkaEndpoint, kafkaTopic)
consumer.assignToEnd()

private val beforeCreation = OffsetDateTime.now().minusSeconds(1)
executor.executeRequest(userCreateRequest.getRequest)

Thread.sleep(sleepInterval)

val records: List[String] = consumer.poll(pollTimeout)

it should "retrieve UserCreateEvent with status 'Completed' from Kafka records" in {
val expectedUserCreateEvents = List(UserCreateEvent(Constants.Statuses.COMPLETED, userId))

val afterCreation = OffsetDateTime.now()
val actualUserCreateEvents = records.map(RecordToEventDeserializer.deserializeRecord).filter {
case UserCreateEvent(Constants.Statuses.COMPLETED, `userId`) => true
case UserCreateEvent(Constants.Statuses.COMPLETED, `userId`, dateTime) =>
dateTime.isAfter(beforeCreation) && dateTime.isBefore(afterCreation)
case _ => false
}

assert(actualUserCreateEvents == expectedUserCreateEvents, s"records count: ${records.size}")
actualUserCreateEvents.length shouldBe 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package com.bwsw.cloudstack.entities.events

import java.time.OffsetDateTime
import java.util.UUID

import com.bwsw.cloudstack.entities.TestEntities
Expand All @@ -26,12 +27,13 @@ import com.bwsw.cloudstack.entities.requests.vm.{VmCreateRequest, VmDeleteReques
import com.bwsw.cloudstack.entities.responses.vm.VirtualMachineCreateResponse
import com.bwsw.cloudstack.entities.util.events.RecordToEventDeserializer
import com.bwsw.cloudstack.entities.util.kafka.Consumer
import org.scalatest.{BeforeAndAfterAll, FlatSpec}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}

class VmEventsRetrievingTest
extends FlatSpec
with TestEntities
with BeforeAndAfterAll {
with BeforeAndAfterAll
with Matchers {

val serviceOfferingId: UUID = retrievedServiceOfferingId
val templateId: UUID = retrievedTemplateId
Expand All @@ -45,34 +47,37 @@ class VmEventsRetrievingTest
val consumer = new Consumer(kafkaEndpoint, kafkaTopic)
consumer.assignToEnd()

private val beforeCreation = OffsetDateTime.now().minusSeconds(1)
val vmId: UUID = mapper.deserialize[VirtualMachineCreateResponse](executor.executeRequest(vmCreateRequest.getRequest)).vm.id

val vmDeleteRequest = new VmDeleteRequest(vmId)
private val beforeDeletion = OffsetDateTime.now().minusSeconds(1)
executor.executeRequest(vmDeleteRequest.getRequest)

Thread.sleep(sleepInterval)

val records: List[String] = consumer.poll(pollTimeout)

it should "retrieve VirtualMachineCreateEvent with status 'Completed' from Kafka records" in {
val expectedVmCreateEvents = List(VirtualMachineCreateEvent(Constants.Statuses.COMPLETED, vmId))

val afterCreation = OffsetDateTime.now()
val actualVmCreateEvents = records.map(RecordToEventDeserializer.deserializeRecord).filter {
case VirtualMachineCreateEvent(Constants.Statuses.COMPLETED, `vmId`) => true
case VirtualMachineCreateEvent(Constants.Statuses.COMPLETED, `vmId`, dateTime) =>
dateTime.isAfter(beforeCreation) && dateTime.isBefore(afterCreation)
case _ => false
}

assert(actualVmCreateEvents == expectedVmCreateEvents, s"records count: ${records.size}")
actualVmCreateEvents.length shouldBe 1
}

it should "retrieve VirtualMachineDestroyEvent with status 'Completed' from Kafka records" in {
val expectedVmDestroyEvents = List(VirtualMachineDestroyEvent(Constants.Statuses.COMPLETED, vmId))

val afterDeletion = OffsetDateTime.now()
val actualVmDestroyEvents = records.map(RecordToEventDeserializer.deserializeRecord).filter {
case VirtualMachineDestroyEvent(Constants.Statuses.COMPLETED, `vmId`) => true
case VirtualMachineDestroyEvent(Constants.Statuses.COMPLETED, `vmId`, dateTime) =>
dateTime.isAfter(beforeDeletion) && dateTime.isBefore(afterDeletion)
case _ => false
}

assert(actualVmDestroyEvents == expectedVmDestroyEvents, s"records count: ${records.size}")
actualVmDestroyEvents.length shouldBe 1
}

override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,80 @@
*/
package com.bwsw.cloudstack.entities.common

import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
import java.util.UUID

import com.bwsw.cloudstack.entities.events.Constants.Events
import com.bwsw.cloudstack.entities.events.Constants.{Events, FieldNames}
import com.bwsw.cloudstack.entities.events.account.{AccountCreateEvent, AccountDeleteEvent}
import com.bwsw.cloudstack.entities.events.user.UserCreateEvent
import com.bwsw.cloudstack.entities.events.vm.{VirtualMachineCreateEvent, VirtualMachineDestroyEvent}
import com.bwsw.cloudstack.entities.events.{CloudStackEvent, UnknownEvent}
import org.slf4j.LoggerFactory
import spray.json.DefaultJsonProtocol._
import spray.json._

import scala.util.{Failure, Success, Try}

trait JsonFormats {

private val logger = LoggerFactory.getLogger(classOf[JsonFormats])
val dateTimeFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd['T'][ ]HH:mm:ss[ ]X")

implicit val uuidJsonFormat: JsonFormat[UUID] = new JsonFormat[UUID] {
override def read(json: JsValue): UUID =
UUID.fromString(StringJsonFormat.read(json))
override def read(json: JsValue): UUID = {
json match {
case JsString(x) => UUID.fromString(x)
case x => deserializationError("Expected UUID as JsString, but got " + x)
}
}

override def write(obj: UUID): JsValue = JsString(obj.toString)
}

override def write(obj: UUID): JsValue =
StringJsonFormat.write(obj.toString)
implicit val offsetDateTimeJsonFormat: JsonFormat[OffsetDateTime] = new JsonFormat[OffsetDateTime] {
override def read(json: JsValue): OffsetDateTime = {
json match {
case JsString(x) => OffsetDateTime.parse(x, dateTimeFormatter)
case x => deserializationError("Expected OffsetDateTime as JsString, but got " + x)
}
}

override def write(obj: OffsetDateTime): JsValue =
JsString(obj.format(dateTimeFormatter))
}

implicit val accountCreateEventJsonFormat: RootJsonFormat[AccountCreateEvent] =
jsonFormat2(AccountCreateEvent)
jsonFormat3(AccountCreateEvent)
implicit val accountDeleteEventJsonFormat: RootJsonFormat[AccountDeleteEvent] =
jsonFormat2(AccountDeleteEvent)
jsonFormat3(AccountDeleteEvent)
implicit val userCreateEventJsonFormat: RootJsonFormat[UserCreateEvent] =
jsonFormat2(UserCreateEvent)
jsonFormat3(UserCreateEvent)
implicit val virtualMachineCreateEventJsonFormat: RootJsonFormat[VirtualMachineCreateEvent] =
jsonFormat2(VirtualMachineCreateEvent)
jsonFormat3(VirtualMachineCreateEvent)
implicit val virtualMachineDestroyEventJsonFormat: RootJsonFormat[VirtualMachineDestroyEvent] =
jsonFormat2(VirtualMachineDestroyEvent)
jsonFormat3(VirtualMachineDestroyEvent)

implicit val cloudStackEventJsonFormat: RootJsonFormat[CloudStackEvent] = new RootJsonFormat[CloudStackEvent] {
override def read(json: JsValue): CloudStackEvent = {
Try {
json.asJsObject.fields("event") match {
case JsString(Events.ACCOUNT_CREATE) => json.convertTo[AccountCreateEvent]
case JsString(Events.ACCOUNT_DELETE) => json.convertTo[AccountDeleteEvent]
case JsString(Events.USER_CREATE) => json.convertTo[UserCreateEvent]
case JsString(Events.VM_CREATE) => json.convertTo[VirtualMachineCreateEvent]
case JsString(Events.VM_DESTROY) => json.convertTo[VirtualMachineDestroyEvent]
val fields = json.asJsObject.fields
val eventField = fields.get(FieldNames.Event)
val eventDateTimeField = fields.get(FieldNames.EventDateTime)

(eventField, eventDateTimeField) match {
case (Some(JsString(Events.ACCOUNT_CREATE)), Some(_)) => json.convertTo[AccountCreateEvent]
case (Some(JsString(Events.ACCOUNT_DELETE)), Some(_)) => json.convertTo[AccountDeleteEvent]
case (Some(JsString(Events.USER_CREATE)), Some(_)) => json.convertTo[UserCreateEvent]
case (Some(JsString(Events.VM_CREATE)), Some(_)) => json.convertTo[VirtualMachineCreateEvent]
case (Some(JsString(Events.VM_DESTROY)), Some(_)) => json.convertTo[VirtualMachineDestroyEvent]
case _ => UnknownEvent(json)
}
} match {
case Success(event) => event
case Failure(_) => UnknownEvent(json)
case Failure(exception) =>
logger.warn(s"Cannot parse event: $json", exception)
UnknownEvent(json)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ object Constants {
object Statuses {
val COMPLETED = "Completed"
}

object FieldNames {
val Event: String = "event"
val EventDateTime: String = "eventDateTime"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package com.bwsw.cloudstack.entities.events.account

import java.time.OffsetDateTime
import java.util.UUID

import com.bwsw.cloudstack.entities.events.CloudStackEvent

final case class AccountCreateEvent(status: String,
entityuuid: UUID)
entityuuid: UUID,
eventDateTime: OffsetDateTime)
extends CloudStackEvent
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package com.bwsw.cloudstack.entities.events.account

import java.time.OffsetDateTime
import java.util.UUID

import com.bwsw.cloudstack.entities.events.CloudStackEvent

final case class AccountDeleteEvent(status: String,
entityuuid: UUID)
entityuuid: UUID,
eventDateTime: OffsetDateTime)
extends CloudStackEvent
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package com.bwsw.cloudstack.entities.events.user

import java.time.OffsetDateTime
import java.util.UUID

import com.bwsw.cloudstack.entities.events.CloudStackEvent

final case class UserCreateEvent(status: String,
entityuuid: UUID)
entityuuid: UUID,
eventDateTime: OffsetDateTime)
extends CloudStackEvent
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package com.bwsw.cloudstack.entities.events.vm

import java.time.OffsetDateTime
import java.util.UUID

import com.bwsw.cloudstack.entities.events.CloudStackEvent

final case class VirtualMachineCreateEvent(status: String,
entityuuid: UUID)
entityuuid: UUID,
eventDateTime: OffsetDateTime)
extends CloudStackEvent
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package com.bwsw.cloudstack.entities.events.vm

import java.time.OffsetDateTime
import java.util.UUID

import com.bwsw.cloudstack.entities.events.CloudStackEvent

final case class VirtualMachineDestroyEvent(status: String,
entityuuid: UUID)
entityuuid: UUID,
eventDateTime: OffsetDateTime)
extends CloudStackEvent
Loading

0 comments on commit 5c76306

Please sign in to comment.