Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bisynchronous methods POC #575

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 91 additions & 2 deletions src/Collection/index.js
@@ -1,6 +1,7 @@
// @flow

import type { Observable } from 'rxjs'
import { Observable } from 'rxjs/Observable'

import { Subject } from 'rxjs/Subject'
import { defer } from 'rxjs/observable/defer'
import { switchMap } from 'rxjs/operators'
Expand Down Expand Up @@ -48,10 +49,52 @@ export default class Collection<Record: Model> {
return cachedRecord || this._fetchRecord(id)
}

findBisync(id: RecordId, callback: any => void) {
if (!id) {
callback({ error: new Error(`Invalid record ID ${this.table}#${id}`) })
return
}

const cachedRecord = this._cache.get(id)
if (cachedRecord) {
callback({ value: cachedRecord })
return
}

this._fetchRecordBisync(id, callback)
}

// Finds the given record and starts observing it
// (with the same semantics as when calling `model.observe()`)
findAndObserve(id: RecordId): Observable<Record> {
return defer(() => this.find(id)).pipe(switchMap(model => model.observe()))
// return defer(() => this.find(id)).pipe(switchMap(model => model.observe()))
return Observable.create(observer => {
this.findBisync(id, result => {
if (result.value) {
observer.next(result.value)
} else {
observer.error(result.error)
}
})
}).pipe(switchMap(model => model.observe()))
}

findAndSubscribe(id: any, subscriber: any): any {
this.findBisync(id, result => {
if (result.value) {
const model = result.value
subscriber(model)
const subscription = model.subscribeToChanges(() => {
subscriber(model)
})
} else {
// ??
}
})

return () => {
// ??
}
}

// Query records of this type
Expand Down Expand Up @@ -98,6 +141,19 @@ export default class Collection<Record: Model> {
return this._cache.recordsFromQueryResult(rawRecords)
}

fetchQueryBisync(query: Query<Record>, callback: any => void): void {
this.database.adapter.queryBisync(query.serialize(), result => {
if (result.value) {
const rawRecords = result.value
const records = this._cache.recordsFromQueryResult(rawRecords)
// console.log('fetchQueryBisync', records.map(x => x._raw || x))
callback({ value: records })
} else {
callback(result)
}
})
}

async unsafeFetchRecordsWithSQL(sql: string): Promise<Record[]> {
const { adapter } = this.database
invariant(
Expand All @@ -115,6 +171,10 @@ export default class Collection<Record: Model> {
return this.database.adapter.count(query.serialize())
}

fetchCountBisync(query: Query<Record>, callback: any => void): void {
this.database.adapter.countBisync(query.serialize(), callback)
}

// *** Implementation details ***

get table(): TableName<Record> {
Expand All @@ -132,6 +192,21 @@ export default class Collection<Record: Model> {
return this._cache.recordFromQueryResult(raw)
}

_fetchRecordBisync(id: RecordId, callback: any => void): void {
this.database.adapter.findBisync(this.table, id, result => {
if (result.value) {
const raw = result.value
if (raw) {
callback({ value: this._cache.recordFromQueryResult(raw) })
} else {
callback({ error: new Error(`Record ${this.table}#${id} not found`) })
}
} else {
callback(result)
}
})
}

changeSet(operations: CollectionChangeSet<Record>): void {
operations.forEach(({ record, type }) => {
if (type === CollectionChangeTypes.created) {
Expand All @@ -144,6 +219,10 @@ export default class Collection<Record: Model> {

this.changes.next(operations)

this._subscribers.forEach(subscriber => {
subscriber(operations)
})

operations.forEach(({ record, type }) => {
if (type === CollectionChangeTypes.updated) {
record._notifyChanged()
Expand All @@ -153,6 +232,16 @@ export default class Collection<Record: Model> {
})
}

_subscribers: any[] = []

subscribeToChanges(subscriber: Function): any {
this._subscribers.push(subscriber)

return () => {
// todo
}
}

// See: Database.unsafeClearCaches
unsafeClearCache(): void {
this._cache.unsafeClear()
Expand Down
1 change: 1 addition & 0 deletions src/Database/ActionQueue.js
Expand Up @@ -84,6 +84,7 @@ export default class ActionQueue implements ActionInterface {
this._queue.shift()

if (this._queue.length) {
// TODO: Race condition between this and enqueue() ?
setTimeout(() => this._executeNext(), 0)
}
}
Expand Down
21 changes: 21 additions & 0 deletions src/Database/index.js
Expand Up @@ -107,6 +107,8 @@ export default class Database {
const [table, changeSet]: [TableName<any>, CollectionChangeSet<any>] = (notification: any)
this.collections.get(table).changeSet(changeSet)
})

this._emitChanges(changeNotifications)
}

// Enqueues an Action -- a block of code that, when its ran, has a guarantee that no other Action
Expand All @@ -126,6 +128,25 @@ export default class Database {
return merge$(...changesSignals).pipe(startWith(null))
}

_subscribers: any[] = []

subscribeToChanges(tables: any, subscriber: Function): any {
this._subscribers.push({ tables, subscriber })

return () => {
// todo
}
}

_emitChanges(changes: { [TableName<any>]: CollectionChangeSet<any> }): void {
const changedTables = Object.keys(changes)
this._subscribers.forEach(({ tables, subscriber }) => {
if (changedTables.some(t => tables.includes(t))) {
subscriber()
}
})
}

_resetCount: number = 0

// Resets database - permanently destroys ALL records stored in the database, and sets up empty database
Expand Down
14 changes: 14 additions & 0 deletions src/Model/index.js
Expand Up @@ -260,12 +260,26 @@ export default class Model {

_notifyChanged(): void {
this._getChanges().next(this)

this._subscribers.forEach(subscriber => {
subscriber()
})
}

_notifyDestroyed(): void {
this._getChanges().complete()
}

_subscribers: any[] = []

subscribeToChanges(subscriber: Function): any {
this._subscribers.push(subscriber)

return () => {
// todo
}
}

_getRaw(rawFieldName: ColumnName): Value {
return this._raw[(rawFieldName: string)]
}
Expand Down
14 changes: 9 additions & 5 deletions src/Query/index.js
Expand Up @@ -35,15 +35,15 @@ export default class Query<Record: Model> {
_rawDescription: QueryDescription

@lazy
_cachedObservable: Observable<Record[]> = observeQuery(this).pipe(cacheWhileConnected)
_cachedObservable: Observable<Record[]> = observeQuery(this) //.pipe(cacheWhileConnected)

@lazy
_cachedCountObservable: Observable<number> = observeCount(this, false).pipe(cacheWhileConnected)
_cachedCountObservable: Observable<number> = observeCount(this, false) //.pipe(cacheWhileConnected)

@lazy
_cachedCountThrottledObservable: Observable<number> = observeCount(this, true).pipe(
cacheWhileConnected,
)
_cachedCountThrottledObservable: Observable<number> = observeCount(this, true) //.pipe(
// cacheWhileConnected,
// )

// Note: Don't use this directly, use Collection.query(...)
constructor(collection: Collection<Record>, conditions: Condition[]): void {
Expand All @@ -69,6 +69,10 @@ export default class Query<Record: Model> {
return this.collection.fetchQuery(this)
}

fetchBisync(callback: any => void): void {
this.collection.fetchQueryBisync(this, callback)
}

// Emits an array of matching records, then emits a new array every time it changes
observe(): Observable<Record[]> {
return this._cachedObservable
Expand Down
63 changes: 48 additions & 15 deletions src/Relation/helpers.js
@@ -1,6 +1,6 @@
// @flow

import type { Observable } from 'rxjs'
import { Observable } from 'rxjs/Observable'
import { of as of$ } from 'rxjs/observable/of'
import { map as map$, switchMap, distinctUntilChanged } from 'rxjs/operators'

Expand All @@ -13,20 +13,53 @@ const getImmutableObservable = <T: ?Model>(relation: Relation<T>): Observable<T>
// $FlowFixMe
.findAndObserve(relation.id)

const getObservable = <T: ?Model>(relation: Relation<T>): Observable<T> =>
relation._model
.observe()
// $FlowFixMe
.pipe(
map$(model => model._getRaw(relation._columnName)),
distinctUntilChanged(),
switchMap(
id =>
id ?
relation._model.collections.get(relation._relationTableName).findAndObserve(id) :
of$(null),
),
)
const getObservable = <T: ?Model>(relation: Relation<T>): Observable<T> => {
return Observable.create(observer => {
const { _model: model, _columnName: columnName } = relation
let relatedId = model._getRaw(columnName)

let relationSubscription = null
const relationStartObservation = () => {
relationSubscription && relationSubscription()
if (relatedId) {
relationSubscription = model.collections
.get(relation._relationTableName)
.findAndSubscribe(relatedId, m2 => {
observer.next(m2)
})
} else {
observer.next(null)
}
}
relationStartObservation()

const modelSubscription = model.subscribeToChanges(() => {
const oldId = relatedId
relatedId = model._getRaw(columnName)
if (relatedId !== oldId) {
relationStartObservation()
}
})

return () => {
modelSubscription()
relationSubscription && relationSubscription()
}
})
}
// relation._model
// .observe()
// // $FlowFixMe
// .pipe(
// map$(model => model._getRaw(relation._columnName)),
// distinctUntilChanged(),
// switchMap(
// id =>
// id ?
// relation._model.collections.get(relation._relationTableName).findAndObserve(id) :
// of$(null),
// ),
// )

// eslint-disable-next-line
export const createObservable = <T: ?Model>(relation: Relation<T>): Observable<T> =>
Expand Down
4 changes: 2 additions & 2 deletions src/Relation/index.js
Expand Up @@ -31,8 +31,8 @@ export default class Relation<T: ?Model> {

@lazy
_cachedObservable: Observable<T> = createObservable(this)
.pipe(publishReplayLatestWhileConnected)
.refCount()
// .pipe(publishReplayLatestWhileConnected)
// .refCount()

constructor(
model: Model,
Expand Down
26 changes: 26 additions & 0 deletions src/adapters/lokijs/WorkerBridge.js
Expand Up @@ -52,6 +52,7 @@ class WorkerBridge {
payload: WorkerExecutorPayload = [],
cloneMethod: 'shallowCloneDeepObjects' | 'immutable' | 'deep' = 'deep',
): Promise<any> {
// console.warn('Non-bisync send:', type, payload)
return new Promise((resolve, reject) => {
this._pendingRequests.push({ resolve, reject })

Expand All @@ -62,6 +63,31 @@ class WorkerBridge {
})
})
}

sendBisync(
type: WorkerExecutorType,
payload: WorkerExecutorPayload = [],
cloneMethod: 'shallowCloneDeepObjects' | 'immutable' | 'deep' = 'deep',
callback,
): void {
// console.log('sendBisync')
let calledBack = false
this._pendingRequests.push({
resolve: value => {
calledBack = true
callback({ value })
},
reject: error => callback({ error }),
})

this._worker.postMessage({
type,
payload,
cloneMethod,
})

// console[calledBack ? 'log' : 'error'](`Did call back synchronously:`, calledBack, type, payload)
}
}

export default WorkerBridge