-
Notifications
You must be signed in to change notification settings - Fork 10
/
Lazy.kt
162 lines (140 loc) · 6.19 KB
/
Lazy.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package net.aquadc.persistence.sql.blocking
import net.aquadc.persistence.CloseableIterator
import net.aquadc.persistence.CloseableStruct
import net.aquadc.persistence.IteratorAndTransientStruct
import net.aquadc.persistence.NullSchema
import net.aquadc.persistence.sql.BindBy
import net.aquadc.persistence.sql.Fetch
import net.aquadc.persistence.sql.Table
import net.aquadc.persistence.struct.FieldDef
import net.aquadc.persistence.struct.Schema
import net.aquadc.persistence.struct.Struct
import net.aquadc.persistence.struct.StructSnapshot
import net.aquadc.persistence.type.DataType
import net.aquadc.persistence.type.Ilk
import java.io.FilterInputStream
import java.io.InputStream
import java.sql.ResultSet
import java.sql.SQLFeatureNotSupportedException
@PublishedApi internal class FetchCellLazily<CUR, R>(
private val rt: Ilk<R, *>,
private val orElse: () -> R
) : Fetch<Blocking<CUR>, Lazy<R>> {
override fun fetch(
from: Blocking<CUR>, query: String, argumentTypes: Array<out Ilk<*, DataType.NotNull<*>>>, arguments: Array<out Any>
): Lazy<R> {
val rt = rt; val orElse = orElse // don't capture `this`
return lazy { from.cell(query, argumentTypes, arguments, rt, orElse) }
}
}
@PublishedApi internal class FetchColLazily<CUR, R>(
private val rt: Ilk<R, *>
) : Fetch<Blocking<CUR>, CloseableIterator<R>> {
override fun fetch(
from: Blocking<CUR>, query: String, argumentTypes: Array<out Ilk<*, DataType.NotNull<*>>>, arguments: Array<out Any>
): CloseableIterator<R> {
val rt = rt // don't capture `this`
return object : CurIterator<CUR, NullSchema, R>(from, query, argumentTypes, arguments, null, BindBy.Name/*whatever*/, NullSchema) {
override fun row(cur: CUR): R = from.cellAt(cur, 0, rt)
}
}
}
@PublishedApi internal class FetchStructLazily<SCH : Schema<SCH>, CUR>(
private val table: Table<SCH, *>,
private val bindBy: BindBy,
private val orElse: () -> Struct<SCH>
) : Fetch<Blocking<CUR>, CloseableStruct<SCH>>, CloseableStruct<SCH> {
private var fallback: Struct<SCH>? = null
override fun fetch(
from: Blocking<CUR>, query: String, argumentTypes: Array<out Ilk<*, DataType.NotNull<*>>>, arguments: Array<out Any>
): CloseableStruct<SCH> {
val lazy = CurIterator<CUR, SCH, CloseableStruct<SCH>>(from, query, argumentTypes, arguments, table, bindBy, table.schema)
return if (lazy.hasNext() /* move to first */) lazy else this.also { fallback = orElse() }
}
override fun <T> get(field: FieldDef<SCH, T, *>): T = fallback!![field]
override val schema: SCH get() = fallback!!.schema
override fun close() { /* nothing to do here */ }
}
@PublishedApi internal class FetchStructListLazily<CUR, SCH : Schema<SCH>>(
private val table: Table<SCH, *>,
private val bindBy: BindBy,
private val transient: Boolean
) : Fetch<Blocking<CUR>, CloseableIterator<Struct<SCH>>> {
override fun fetch(
from: Blocking<CUR>, query: String, argumentTypes: Array<out Ilk<*, DataType.NotNull<*>>>, arguments: Array<out Any>
): CloseableIterator<Struct<SCH>> {
val transient = transient // don't capture this
return object : CurIterator<CUR, SCH, Struct<SCH>>(
from, query, argumentTypes, arguments, table, bindBy, table.schema
) {
override fun row(cur: CUR): Struct<SCH> =
if (transient) this else StructSnapshot(this)
}
}
}
@PublishedApi internal object InputStreamFromResultSet : Fetch<Blocking<ResultSet>, InputStream> {
override fun fetch(
from: Blocking<ResultSet>, query: String, argumentTypes: Array<out Ilk<*, DataType.NotNull<*>>>, arguments: Array<out Any>
): InputStream =
from.select(query, argumentTypes, arguments, 1).let { rs ->
check(rs.next()) { rs.close(); "ResultSet is empty." }
val stream = try { rs.getBlob(0).binaryStream } // Postgres-JDBC supports this, SQLite-JDBC doesn't
catch (e: SQLFeatureNotSupportedException) { rs.getBinaryStream(0) } // this is typically just in-memory :'(
object : FilterInputStream(stream) {
override fun close() {
super.close()
rs.close()
}
}
}
}
@Deprecated("moved") typealias CloseableIterator<T> = CloseableIterator<T>
@Deprecated("moved") typealias CloseableStruct<SCH> = CloseableStruct<SCH>
private open class CurIterator<CUR, SCH : Schema<SCH>, R>(
protected val from: Blocking<CUR>,
private val query: String,
private val argumentTypes: Array<out Ilk<*, DataType.NotNull<*>>>,
private val arguments: Array<out Any>,
private val table: Table<SCH, *>?,
private val bindBy: BindBy,
schema: SCH
) : IteratorAndTransientStruct<SCH, R>(schema) {
private var _cur: CUR? = null
private val cur get() = _cur ?: run {
check(state == 0) { "Iterator is closed." }
from.select(query, argumentTypes, arguments, table?.managedColNames?.size ?: 1).also { _cur = it }
}
var state = 0
final override fun next(): R = cur.let { cur ->
when (state) {
0 -> if (!move(cur, toState = 0)) throw NoSuchElementException()
1 -> state = 0
2 -> throw NoSuchElementException()
else -> throw AssertionError()
}
row(cur)
}
final override fun hasNext(): Boolean =
when (state) {
0 -> move(cur, 1)
1 -> true
2 -> false
else -> throw AssertionError()
}
final override fun close() {
_cur?.let { from.close(it); _cur = null }
state = 2
}
private fun move(cur: CUR, toState: Int): Boolean = from.next(cur).also {
if (it) state = toState
else close()
}
protected open fun row(cur: CUR): R =
throw UnsupportedOperationException()
final override fun <T> get(field: FieldDef<SCH, T, *>): T = when (state) {
0,
1 -> table!!.let { it.delegateFor(field).get(from, it, field, cur, bindBy) }
2 -> throw UnsupportedOperationException()
else -> throw AssertionError()
}
}