-
Notifications
You must be signed in to change notification settings - Fork 23
/
entity-store.utils.ts
201 lines (186 loc) · 6.38 KB
/
entity-store.utils.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import { ObservableMap } from '../observable-map.utils';
import { Observable, ReplaySubject, of } from 'rxjs';
import { RemoteData, remoteData, isSuccess, success } from '@devexperts/remote-data-ts';
import { array } from 'fp-ts/lib/Array';
import { Predicate } from 'fp-ts/lib/function';
import { isNotNullable } from '@devexperts/utils/dist/object/object';
import { LiveData } from './live-data.utils';
import { filter, map, distinctUntilChanged, shareReplay, switchMap, tap, multicast, refCount } from 'rxjs/operators';
import { tapRD } from './operators/tapRD';
import { mapRD } from './operators/mapRD';
import { switchMapRD } from './operators/switchMapRD';
export class EntityStore<L = never, A = never> {
/**
* Returns all values of current store.
* By default returns Observable<RemoteData<L, A[]>>, but can be overwritten by "getAllValues$" setter.
*
*/
get getAllValues$(): any {
return this._getAllValues$;
}
/**
* Overwrites default return-value for "getAllValues$" getter.
*
*/
set getAllValues$(value: any) {
this._getAllValues$ = value;
}
private readonly cache = new ObservableMap<string, RemoteData<L, A>>();
private readonly cachedStreams = new Map<string, LiveData<L, A>>();
private hasLoadedAll = false;
private isLoadingAll = false;
private _getAllValues$ = this.cache.values$.pipe(
filter(() => !this.isLoadingAll && this.hasLoadedAll),
map(data => data.filter(item => isSuccess(item))),
map(array.sequence(remoteData)),
distinctUntilChanged(),
shareReplay(1),
);
readonly keys$ = this.cache.keys$;
/**
* Returns entity by a key.
* If there is no data by a key it triggers "get" argument to receive data and put it in a cache.
*
* @param key - Key (name) of an entity you want to receive.
*
* @param get - Describes how to receive data if cache by key is empty. Typically it's an API call.
*
* @returns - Returns a LiveData stream with requested entity.
*/
get(key: string, get: () => LiveData<L, A>): LiveData<L, A> {
let sharedGetter: Observable<RemoteData<L, A>> | undefined = this.cachedStreams.get(key);
if (!isNotNullable(sharedGetter)) {
const hasValue = this.cache.has(key);
const cachedValue = this.cache.getValue(key);
const valueIsResolved = isNotNullable(cachedValue) && isSuccess(cachedValue);
if (hasValue && valueIsResolved) {
return this.cache.get(key);
}
sharedGetter = new Observable<RemoteData<L, A>>(observer => {
const getterSubscription = get().subscribe(value => {
this.cache.set(key, value);
});
const cacheSubscription = this.cache.get(key).subscribe(value => {
observer.next(value);
});
return () => {
getterSubscription.unsubscribe();
cacheSubscription.unsubscribe();
this.cachedStreams.delete(key);
};
}).pipe(multicast(new ReplaySubject<RemoteData<L, A>>(1)), refCount());
this.cachedStreams.set(key, sharedGetter);
}
return sharedGetter;
}
/**
* Triggers receiving all entities using "partialGetAll" argument and put them in a cache.
*
* @param pk - Means "Primary Key". Describes how to get a key from an entity (required for "updateCache").
*
* @param partialGetAll - Describes how to get all values for current entity store. Typically it's an API call.
*
* @param predicate - Predicate to filter "partialGetAll" result. E.g. you want to filter invalid entities or you have a business case - use only data created before 01.01.2018.
*
* @returns - Returns a LiveData stream with requested entities.
*/
getAll(
pk: (value: A) => string,
partialGetAll: () => LiveData<L, A[]>,
predicate?: Predicate<A>,
): LiveData<L, A[]> {
this.isLoadingAll = false;
return partialGetAll().pipe(
tapRD(values => {
this.hasLoadedAll = true;
this.updateCache(values, pk);
}),
switchMap(data => (isSuccess(data) ? this._getAllValues$ : of(data))),
distinctUntilChanged(),
mapRD(entities => {
if (typeof predicate === 'undefined') {
return entities;
}
let hasChanges = false;
const filtered = entities.filter(value => {
const result = predicate(value);
if (!result) {
hasChanges = true;
}
return result;
});
return hasChanges ? filtered : entities;
}),
shareReplay(1),
);
}
/**
* Remove an entity from current entity store.
*
* @param key - Key of an entity you want to remove. Using only in optimistic scenario, but required all the time.
*
* @param pk - Means "Primary Key". Describes how to get a key from an entity (required for "updateCache").
*
* @param remove - Describes how to remove an entity. Typically it's an API call. Should returns an array of existing entities. Exception should be handled inside this stream.
*
* @param optimistic - The flag - is optimistic scenario or not, true by default. If true, entity will be removed from a cache before an API call.
*
* @returns - Returns a LiveData stream with existing entities.
*/
remove(
key: string,
pk: (value: A) => string,
remove: () => LiveData<L, A[]>,
optimistic: boolean = true,
): LiveData<L, A[]> {
if (optimistic) {
this.cache.delete(key);
}
return remove().pipe(
tapRD(values => {
this.updateCache(values, pk);
}),
switchMap(() => this._getAllValues$),
);
}
/**
* Create an entity
*
* @param pk - Means "Primary Key". Describes how to get a key from an entity (required for "updateCache").
*
* @param create - Describes how to create an entity. Returns a stream with a created entity.
*
* @returns - Returns a LiveData stream with created entity.
*/
create(pk: (value: A) => string, create: () => LiveData<L, A>): LiveData<L, A> {
return create().pipe(
switchMapRD(value => {
const key = pk(value);
this.cache.set(key, success(value));
return this.cache.get(key);
}),
);
}
/**
* Update an entity
*
* @param key - Key for an entity you want to update.
*
* @param update - Describes how to update an entity. Returns a stream with updated entity. Typically it's an API call.
*
* @returns - Returns a LiveData stream with updated entity.
*/
update(key: string, update: () => LiveData<L, A>): LiveData<L, A> {
return update().pipe(
tap(value => {
if (isSuccess(value)) {
this.cache.set(key, value);
}
}),
);
}
private updateCache(values: A[], pk: (value: A) => string): void {
const entries = values.map<[string, RemoteData<L, A>]>(item => [pk(item), success(item)]);
this.cache.setMany(entries);
}
}