-
Notifications
You must be signed in to change notification settings - Fork 3
/
CachePostgresSlickStore.scala
107 lines (76 loc) · 2.88 KB
/
CachePostgresSlickStore.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package test.db
import javax.cache.Cache
import com.typesafe.config.ConfigFactory
import org.apache.ignite.cache.store.CacheStoreAdapter
import org.apache.ignite.lang.IgniteBiInClosure
import org.slf4j.LoggerFactory
import slick.jdbc.PostgresProfile
import slick.jdbc.PostgresProfile.api._
import slick.lifted
import test.nodes.Device
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success, Try}
trait PostgresSlickConnection {
val pgProfile = PostgresProfile.api
val pgDatabase = Database.forConfig("ignitePostgres")
val tableName: String
}
class CachePostgresSlickStore extends CacheStoreAdapter[String, Device] with PostgresSlickConnection with Serializable {
import scala.concurrent.ExecutionContext.Implicits.global
val config = ConfigFactory.load("application.conf")
val log = LoggerFactory.getLogger("IgniteLog")
val tableName = "device_ignite_slick_table"
import pgProfile._
//Slick
class DeviceTable(tag: Tag) extends Table[Device](tag, Some("public"), tableName) {
def id = column[String]("id")
def metadata = column[String]("metadata")
def lat = column[Double]("lat")
def lon = column[Double]("lon")
def * = (id, metadata, lat, lon) <> (Device.tupled, Device.unapply)
def pk = primaryKey(s"pk_$tableName", id)
def uniqueIndex = index(s"idx_$table", id, unique = true)
}
val table = lifted.TableQuery[DeviceTable]
val startup = createSchema
private def createSchema(): Unit = {
pgDatabase.run(table.exists.result) onComplete {
case Success(exists) =>
log.info("Schema already exists")
case Failure(e) => {
log.info(s"Creating schema for $tableName")
val dbioAction = (
for {
_ <- table.schema.create
} yield ()
).transactionally
pgDatabase.run(dbioAction)
}
}
}
override def loadCache(clo: IgniteBiInClosure[String, Device], args: AnyRef*): Unit = {
for {
devices <- pgDatabase.run(table.map(u => u).result) recoverWith { case _ => Future(Seq.empty[Device]) }
} yield {
log.info(s"Loading cache $tableName")
devices.foreach(device => clo.apply(device.id, device))
}
}
override def delete(key: Any) = Try {
log.info(s"Delete from $tableName value $key")
val dbioAction = DBIO.seq(
table.filter(_.id === key.toString).delete
).transactionally
pgDatabase.run(dbioAction)
}
override def write(entry: Cache.Entry[_ <: String, _ <: Device]): Unit = Try {
log.info(s"Insert into $tableName value ${entry.getValue.toString}")
val dbioAction = DBIO.seq(table.insertOrUpdate(entry.getValue)).transactionally
pgDatabase.run(dbioAction)
}
override def load(key: String): Device = {
val loadedDevice = pgDatabase.run(table.filter(_.id === key).result.headOption)
Await.result(loadedDevice, 10 second).getOrElse(null)
}
}