From f6732c73635f5b7979203c29ccf4d654617122b0 Mon Sep 17 00:00:00 2001 From: Naoki Takezoe Date: Wed, 15 Mar 2017 09:54:15 +0900 Subject: [PATCH] Move storage depended tests to each storage project --- .../data/storage/hbase}/LEventsSpec.scala | 13 +- .../data/storage/hbase/PEventsSpec.scala | 192 +++++++++++++ .../storage/hbase}/StorageTestUtils.scala | 11 +- .../data/storage/hbase/TestEvents.scala | 266 ++++++++++++++++++ .../data/storage/jdbc/LEventsSpec.scala | 236 ++++++++++++++++ .../data/storage/jdbc}/PEventsSpec.scala | 28 +- .../data/storage/jdbc/StorageTestUtils.scala | 29 ++ .../data/storage/jdbc/TestEvents.scala | 266 ++++++++++++++++++ 8 files changed, 998 insertions(+), 43 deletions(-) rename {data/src/test/scala/org/apache/predictionio/data/storage => storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase}/LEventsSpec.scala (95%) create mode 100644 storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala rename {data/src/test/scala/org/apache/predictionio/data/storage => storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase}/StorageTestUtils.scala (79%) create mode 100644 storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala create mode 100644 storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala rename {data/src/test/scala/org/apache/predictionio/data/storage => storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc}/PEventsSpec.scala (91%) create mode 100644 storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala create mode 100644 storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala similarity index 95% rename from data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala rename to storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala index 3938072ae5..c813cedf3a 100644 --- a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala +++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala @@ -16,8 +16,9 @@ */ -package org.apache.predictionio.data.storage +package org.apache.predictionio.data.storage.hbase +import org.apache.predictionio.data.storage.{Event, LEvents, PropertyMap, Storage} import org.specs2._ import org.specs2.specification.Step @@ -28,7 +29,6 @@ class LEventsSpec extends Specification with TestEvents { Events can be implemented by: - HBLEvents ${hbEvents} - - JDBCLEvents ${jdbcLEvents} """ @@ -40,13 +40,6 @@ class LEventsSpec extends Specification with TestEvents { """ - def jdbcLEvents = sequential ^ s2""" - - JDBCLEvents should - - behave like any LEvents implementation ${events(jdbcDO)} - - """ - val appId = 1 def events(eventClient: LEvents) = sequential ^ s2""" @@ -75,8 +68,6 @@ class LEventsSpec extends Specification with TestEvents { dbName ) - def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName) - def initDefault(eventClient: LEvents) = { eventClient.init(appId) } diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala new file mode 100644 index 0000000000..d675e55e18 --- /dev/null +++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase + +import org.apache.predictionio.data.storage._ +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.specs2._ +import org.specs2.specification.Step + +class PEventsSpec extends Specification with TestEvents { + + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + val sc = new SparkContext("local[4]", "PEventAggregatorSpec test") + + val appId = 1 + val channelId = 6 + val dbName = "test_pio_storage_events_" + hashCode + + def hbLocal = Storage.getDataObject[LEvents]( + StorageTestUtils.hbaseSourceName, + dbName + ) + + def hbPar = Storage.getDataObject[PEvents]( + StorageTestUtils.hbaseSourceName, + dbName + ) + + def stopSpark = { + sc.stop() + } + + def is = s2""" + + PredictionIO Storage PEvents Specification + + PEvents can be implemented by: + - HBPEvents ${hbPEvents} + - (stop Spark) ${Step(sc.stop())} + + """ + + def hbPEvents = sequential ^ s2""" + + HBPEvents should + - behave like any PEvents implementation ${events(hbLocal, hbPar)} + - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))} + + """ + + def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2""" + + - (init test) ${initTest(localEventClient)} + - (insert test events) ${insertTestEvents(localEventClient)} + find in default ${find(parEventClient)} + find in channel ${findChannel(parEventClient)} + aggregate user properties in default ${aggregateUserProperties(parEventClient)} + aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)} + write to default ${write(parEventClient)} + write to channel ${writeChannel(parEventClient)} + + """ + + /* setup */ + + // events from TestEvents trait + val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2) + val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4) + + def initTest(localEventClient: LEvents) = { + localEventClient.init(appId) + localEventClient.init(appId, Some(channelId)) + } + + def insertTestEvents(localEventClient: LEvents) = { + listOfEvents.map( localEventClient.insert(_, appId) ) + // insert to channel + listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) ) + success + } + + /* following are tests */ + + def find(parEventClient: PEvents) = { + val resultRDD: RDD[Event] = parEventClient.find( + appId = appId + )(sc) + + val results = resultRDD.collect.toList + .map {_.copy(eventId = None)} // ignore eventId + + results must containTheSameElementsAs(listOfEvents) + } + + def findChannel(parEventClient: PEvents) = { + val resultRDD: RDD[Event] = parEventClient.find( + appId = appId, + channelId = Some(channelId) + )(sc) + + val results = resultRDD.collect.toList + .map {_.copy(eventId = None)} // ignore eventId + + results must containTheSameElementsAs(listOfEventsChannel) + } + + def aggregateUserProperties(parEventClient: PEvents) = { + val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( + appId = appId, + entityType = "user" + )(sc) + val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap + + val expected = Map( + "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + result must beEqualTo(expected) + } + + def aggregateUserPropertiesChannel(parEventClient: PEvents) = { + val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( + appId = appId, + channelId = Some(channelId), + entityType = "user" + )(sc) + val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap + + val expected = Map( + "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime) + ) + + result must beEqualTo(expected) + } + + def write(parEventClient: PEvents) = { + val written = List(r5, r6) + val writtenRDD = sc.parallelize(written) + parEventClient.write(writtenRDD, appId)(sc) + + // read back + val resultRDD = parEventClient.find( + appId = appId + )(sc) + + val results = resultRDD.collect.toList + .map { _.copy(eventId = None)} // ignore eventId + + val expected = listOfEvents ++ written + + results must containTheSameElementsAs(expected) + } + + def writeChannel(parEventClient: PEvents) = { + val written = List(r1, r5, r6) + val writtenRDD = sc.parallelize(written) + parEventClient.write(writtenRDD, appId, Some(channelId))(sc) + + // read back + val resultRDD = parEventClient.find( + appId = appId, + channelId = Some(channelId) + )(sc) + + val results = resultRDD.collect.toList + .map { _.copy(eventId = None)} // ignore eventId + + val expected = listOfEventsChannel ++ written + + results must containTheSameElementsAs(expected) + } + +} diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala similarity index 79% rename from data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala rename to storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala index 747076a8ce..161cf900a3 100644 --- a/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala +++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala @@ -16,14 +16,12 @@ */ -package org.apache.predictionio.data.storage +package org.apache.predictionio.data.storage.hbase -import org.apache.predictionio.data.storage.hbase.HBLEvents -import scalikejdbc._ +import org.apache.predictionio.data.storage.{LEvents, Storage} object StorageTestUtils { val hbaseSourceName = "HBASE" - val jdbcSourceName = "PGSQL" def dropHBaseNamespace(namespace: String): Unit = { val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace) @@ -35,11 +33,8 @@ object StorageTestUtils { admin.deleteTable(name) } - //Only empty namespaces (no tables) can be removed. + // Only empty namespaces (no tables) can be removed. admin.deleteNamespace(namespace) } - def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s => - SQL(s"drop table $table").execute().apply() - } } diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala new file mode 100644 index 0000000000..2171864869 --- /dev/null +++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase + +import org.apache.predictionio.data.storage.{DataMap, Event} +import org.joda.time.{DateTime, DateTimeZone} + +trait TestEvents { + + val u1BaseTime = new DateTime(654321) + val u2BaseTime = new DateTime(6543210) + val u3BaseTime = new DateTime(6543410) + + // u1 events + val u1e1 = Event( + event = "$set", + entityType = "user", + entityId = "u1", + properties = DataMap( + """{ + "a" : 1, + "b" : "value2", + "d" : [1, 2, 3], + }"""), + eventTime = u1BaseTime + ) + + val u1e2 = u1e1.copy( + event = "$set", + properties = DataMap("""{"a" : 2}"""), + eventTime = u1BaseTime.plusDays(1) + ) + + val u1e3 = u1e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value4"}"""), + eventTime = u1BaseTime.plusDays(2) + ) + + val u1e4 = u1e1.copy( + event = "$unset", + properties = DataMap("""{"b" : null}"""), + eventTime = u1BaseTime.plusDays(3) + ) + + val u1e5 = u1e1.copy( + event = "$set", + properties = DataMap("""{"e" : "new"}"""), + eventTime = u1BaseTime.plusDays(4) + ) + + val u1LastTime = u1BaseTime.plusDays(4) + val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}""" + + // delete event for u1 + val u1ed = u1e1.copy( + event = "$delete", + properties = DataMap(), + eventTime = u1BaseTime.plusDays(5) + ) + + // u2 events + val u2e1 = Event( + event = "$set", + entityType = "user", + entityId = "u2", + properties = DataMap( + """{ + "a" : 21, + "b" : "value12", + "d" : [7, 5, 6], + }"""), + eventTime = u2BaseTime + ) + + val u2e2 = u2e1.copy( + event = "$unset", + properties = DataMap("""{"a" : null}"""), + eventTime = u2BaseTime.plusDays(1) + ) + + val u2e3 = u2e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value9", "g": "new11"}"""), + eventTime = u2BaseTime.plusDays(2) + ) + + val u2LastTime = u2BaseTime.plusDays(2) + val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}""" + + // u3 events + val u3e1 = Event( + event = "$set", + entityType = "user", + entityId = "u3", + properties = DataMap( + """{ + "a" : 22, + "b" : "value13", + "d" : [5, 6, 1], + }"""), + eventTime = u3BaseTime + ) + + val u3e2 = u3e1.copy( + event = "$unset", + properties = DataMap("""{"a" : null}"""), + eventTime = u3BaseTime.plusDays(1) + ) + + val u3e3 = u3e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""), + eventTime = u3BaseTime.plusDays(2) + ) + + val u3LastTime = u3BaseTime.plusDays(2) + val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}""" + + // some random events + val r1 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now, + prId = Some("my_prid") + ) + val r2 = Event( + event = "my_event2", + entityType = "my_entity_type2", + entityId = "my_entity_id2" + ) + val r3 = Event( + event = "my_event3", + entityType = "my_entity_type", + entityId = "my_entity_id", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "propA" : 1.2345, + "propB" : "valueB", + }""" + ), + prId = Some("my_prid") + ) + val r4 = Event( + event = "my_event4", + entityType = "my_entity_type4", + entityId = "my_entity_id4", + targetEntityType = Some("my_target_entity_type4"), + targetEntityId = Some("my_target_entity_id4"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }"""), + eventTime = DateTime.now + ) + val r5 = Event( + event = "my_event5", + entityType = "my_entity_type5", + entityId = "my_entity_id5", + targetEntityType = Some("my_target_entity_type5"), + targetEntityId = Some("my_target_entity_id5"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now + ) + val r6 = Event( + event = "my_event6", + entityType = "my_entity_type6", + entityId = "my_entity_id6", + targetEntityType = Some("my_target_entity_type6"), + targetEntityId = Some("my_target_entity_id6"), + properties = DataMap( + """{ + "prop1" : 6, + "prop2" : "value2", + "prop3" : [6, 7, 8], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now + ) + + // timezone + val tz1 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id0", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")), + prId = Some("my_prid") + ) + + val tz2 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id1", + eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")), + prId = Some("my_prid") + ) + + val tz3 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id2", + eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")), + prId = Some("my_prid") + ) + +} diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala new file mode 100644 index 0000000000..d723d07908 --- /dev/null +++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import org.apache.predictionio.data.storage.{Event, LEvents, PropertyMap, Storage} +import org.specs2._ +import org.specs2.specification.Step + +class LEventsSpec extends Specification with TestEvents { + def is = s2""" + + PredictionIO Storage LEvents Specification + + Events can be implemented by: + - JDBCLEvents ${jdbcLEvents} + + """ + + def jdbcLEvents = sequential ^ s2""" + + JDBCLEvents should + - behave like any LEvents implementation ${events(jdbcDO)} + + """ + + val appId = 1 + + def events(eventClient: LEvents) = sequential ^ s2""" + + init default ${initDefault(eventClient)} + insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)} + insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)} + insert and delete by ID ${insertAndDelete(eventClient)} + insert test user events ${insertTestUserEvents(eventClient)} + find user events ${findUserEvents(eventClient)} + aggregate user properties ${aggregateUserProperties(eventClient)} + aggregate one user properties ${aggregateOneUserProperties(eventClient)} + aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)} + init channel ${initChannel(eventClient)} + insert 2 events to channel ${insertChannel(eventClient)} + insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)} + find events from channel ${findChannel(eventClient)} + remove default ${removeDefault(eventClient)} + remove channel ${removeChannel(eventClient)} + + """ + + val dbName = "test_pio_storage_events_" + hashCode + + def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName) + + def initDefault(eventClient: LEvents) = { + eventClient.init(appId) + } + + def insertAndGetEvents(eventClient: LEvents) = { + + // events from TestEvents trait + val listOfEvents = List(r1,r2,r3) + + val insertResp = listOfEvents.map { eventClient.insert(_, appId) } + + val insertedEventId: List[String] = insertResp + + val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) + .map { case (e, id) => Some(e.copy(eventId = Some(id))) } + + val getResp = insertedEventId.map { id => eventClient.get(id, appId) } + + val getEvents = getResp + + insertedEvent must containTheSameElementsAs(getEvents) + } + + def insertAndGetTimezone(eventClient: LEvents) = { + val listOfEvents = List(tz1, tz2, tz3) + + val insertResp = listOfEvents.map { eventClient.insert(_, appId) } + + val insertedEventId: List[String] = insertResp + + val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) + .map { case (e, id) => Some(e.copy(eventId = Some(id))) } + + val getResp = insertedEventId.map { id => eventClient.get(id, appId) } + + val getEvents = getResp + + insertedEvent must containTheSameElementsAs(getEvents) + } + + def insertAndDelete(eventClient: LEvents) = { + val eventId = eventClient.insert(r2, appId) + + val resultBefore = eventClient.get(eventId, appId) + + val expectedBefore = r2.copy(eventId = Some(eventId)) + + val deleteStatus = eventClient.delete(eventId, appId) + + val resultAfter = eventClient.get(eventId, appId) + + (resultBefore must beEqualTo(Some(expectedBefore))) and + (deleteStatus must beEqualTo(true)) and + (resultAfter must beEqualTo(None)) + } + + def insertTestUserEvents(eventClient: LEvents) = { + // events from TestEvents trait + val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + + listOfEvents.map{ eventClient.insert(_, appId) } + + success + } + + def findUserEvents(eventClient: LEvents) = { + + val results: List[Event] = eventClient.find( + appId = appId, + entityType = Some("user")) + .toList + .map(e => e.copy(eventId = None)) // ignore eventID + + // same events in insertTestUserEvents + val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + + results must containTheSameElementsAs(expected) + } + + def aggregateUserProperties(eventClient: LEvents) = { + + val result: Map[String, PropertyMap] = eventClient.aggregateProperties( + appId = appId, + entityType = "user") + + val expected = Map( + "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + result must beEqualTo(expected) + } + + def aggregateOneUserProperties(eventClient: LEvents) = { + val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( + appId = appId, + entityType = "user", + entityId = "u1") + + val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime)) + + result must beEqualTo(expected) + } + + def aggregateNonExistentUserProperties(eventClient: LEvents) = { + val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( + appId = appId, + entityType = "user", + entityId = "u999999") + + result must beEqualTo(None) + } + + val channelId = 12 + + def initChannel(eventClient: LEvents) = { + eventClient.init(appId, Some(channelId)) + } + + def insertChannel(eventClient: LEvents) = { + + // events from TestEvents trait + val listOfEvents = List(r4,r5) + + listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) ) + + success + } + + def insertAndDeleteChannel(eventClient: LEvents) = { + + val eventId = eventClient.insert(r2, appId, Some(channelId)) + + val resultBefore = eventClient.get(eventId, appId, Some(channelId)) + + val expectedBefore = r2.copy(eventId = Some(eventId)) + + val deleteStatus = eventClient.delete(eventId, appId, Some(channelId)) + + val resultAfter = eventClient.get(eventId, appId, Some(channelId)) + + (resultBefore must beEqualTo(Some(expectedBefore))) and + (deleteStatus must beEqualTo(true)) and + (resultAfter must beEqualTo(None)) + } + + def findChannel(eventClient: LEvents) = { + + val results: List[Event] = eventClient.find( + appId = appId, + channelId = Some(channelId) + ) + .toList + .map(e => e.copy(eventId = None)) // ignore eventId + + // same events in insertChannel + val expected = List(r4, r5) + + results must containTheSameElementsAs(expected) + } + + def removeDefault(eventClient: LEvents) = { + eventClient.remove(appId) + } + + def removeChannel(eventClient: LEvents) = { + eventClient.remove(appId, Some(channelId)) + } +} diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala similarity index 91% rename from data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala rename to storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala index ccd71a4c87..71ebf5ff7a 100644 --- a/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala +++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala @@ -16,14 +16,13 @@ */ -package org.apache.predictionio.data.storage - -import org.specs2._ -import org.specs2.specification.Step +package org.apache.predictionio.data.storage.jdbc +import org.apache.predictionio.data.storage._ import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD +import org.specs2._ +import org.specs2.specification.Step class PEventsSpec extends Specification with TestEvents { @@ -35,16 +34,6 @@ class PEventsSpec extends Specification with TestEvents { val channelId = 6 val dbName = "test_pio_storage_events_" + hashCode - def hbLocal = Storage.getDataObject[LEvents]( - StorageTestUtils.hbaseSourceName, - dbName - ) - - def hbPar = Storage.getDataObject[PEvents]( - StorageTestUtils.hbaseSourceName, - dbName - ) - def jdbcLocal = Storage.getDataObject[LEvents]( StorageTestUtils.jdbcSourceName, dbName @@ -64,20 +53,11 @@ class PEventsSpec extends Specification with TestEvents { PredictionIO Storage PEvents Specification PEvents can be implemented by: - - HBPEvents ${hbPEvents} - JDBCPEvents ${jdbcPEvents} - (stop Spark) ${Step(sc.stop())} """ - def hbPEvents = sequential ^ s2""" - - HBPEvents should - - behave like any PEvents implementation ${events(hbLocal, hbPar)} - - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))} - - """ - def jdbcPEvents = sequential ^ s2""" JDBCPEvents should diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala new file mode 100644 index 0000000000..4bf90cfe30 --- /dev/null +++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import scalikejdbc._ + +object StorageTestUtils { + val jdbcSourceName = "PGSQL" + + def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s => + SQL(s"drop table $table").execute().apply() + } +} diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala new file mode 100644 index 0000000000..2cb08e5beb --- /dev/null +++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import org.apache.predictionio.data.storage.{DataMap, Event} +import org.joda.time.{DateTime, DateTimeZone} + +trait TestEvents { + + val u1BaseTime = new DateTime(654321) + val u2BaseTime = new DateTime(6543210) + val u3BaseTime = new DateTime(6543410) + + // u1 events + val u1e1 = Event( + event = "$set", + entityType = "user", + entityId = "u1", + properties = DataMap( + """{ + "a" : 1, + "b" : "value2", + "d" : [1, 2, 3], + }"""), + eventTime = u1BaseTime + ) + + val u1e2 = u1e1.copy( + event = "$set", + properties = DataMap("""{"a" : 2}"""), + eventTime = u1BaseTime.plusDays(1) + ) + + val u1e3 = u1e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value4"}"""), + eventTime = u1BaseTime.plusDays(2) + ) + + val u1e4 = u1e1.copy( + event = "$unset", + properties = DataMap("""{"b" : null}"""), + eventTime = u1BaseTime.plusDays(3) + ) + + val u1e5 = u1e1.copy( + event = "$set", + properties = DataMap("""{"e" : "new"}"""), + eventTime = u1BaseTime.plusDays(4) + ) + + val u1LastTime = u1BaseTime.plusDays(4) + val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}""" + + // delete event for u1 + val u1ed = u1e1.copy( + event = "$delete", + properties = DataMap(), + eventTime = u1BaseTime.plusDays(5) + ) + + // u2 events + val u2e1 = Event( + event = "$set", + entityType = "user", + entityId = "u2", + properties = DataMap( + """{ + "a" : 21, + "b" : "value12", + "d" : [7, 5, 6], + }"""), + eventTime = u2BaseTime + ) + + val u2e2 = u2e1.copy( + event = "$unset", + properties = DataMap("""{"a" : null}"""), + eventTime = u2BaseTime.plusDays(1) + ) + + val u2e3 = u2e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value9", "g": "new11"}"""), + eventTime = u2BaseTime.plusDays(2) + ) + + val u2LastTime = u2BaseTime.plusDays(2) + val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}""" + + // u3 events + val u3e1 = Event( + event = "$set", + entityType = "user", + entityId = "u3", + properties = DataMap( + """{ + "a" : 22, + "b" : "value13", + "d" : [5, 6, 1], + }"""), + eventTime = u3BaseTime + ) + + val u3e2 = u3e1.copy( + event = "$unset", + properties = DataMap("""{"a" : null}"""), + eventTime = u3BaseTime.plusDays(1) + ) + + val u3e3 = u3e1.copy( + event = "$set", + properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""), + eventTime = u3BaseTime.plusDays(2) + ) + + val u3LastTime = u3BaseTime.plusDays(2) + val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}""" + + // some random events + val r1 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now, + prId = Some("my_prid") + ) + val r2 = Event( + event = "my_event2", + entityType = "my_entity_type2", + entityId = "my_entity_id2" + ) + val r3 = Event( + event = "my_event3", + entityType = "my_entity_type", + entityId = "my_entity_id", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "propA" : 1.2345, + "propB" : "valueB", + }""" + ), + prId = Some("my_prid") + ) + val r4 = Event( + event = "my_event4", + entityType = "my_entity_type4", + entityId = "my_entity_id4", + targetEntityType = Some("my_target_entity_type4"), + targetEntityId = Some("my_target_entity_id4"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }"""), + eventTime = DateTime.now + ) + val r5 = Event( + event = "my_event5", + entityType = "my_entity_type5", + entityId = "my_entity_id5", + targetEntityType = Some("my_target_entity_type5"), + targetEntityId = Some("my_target_entity_id5"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now + ) + val r6 = Event( + event = "my_event6", + entityType = "my_entity_type6", + entityId = "my_entity_id6", + targetEntityType = Some("my_target_entity_type6"), + targetEntityId = Some("my_target_entity_id6"), + properties = DataMap( + """{ + "prop1" : 6, + "prop2" : "value2", + "prop3" : [6, 7, 8], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = DateTime.now + ) + + // timezone + val tz1 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id0", + targetEntityType = Some("my_target_entity_type"), + targetEntityId = Some("my_target_entity_id"), + properties = DataMap( + """{ + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c"], + "prop6" : 4.56 + }""" + ), + eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")), + prId = Some("my_prid") + ) + + val tz2 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id1", + eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")), + prId = Some("my_prid") + ) + + val tz3 = Event( + event = "my_event", + entityType = "my_entity_type", + entityId = "my_entity_id2", + eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")), + prId = Some("my_prid") + ) + +}