Skip to content
This repository has been archived by the owner on Jul 30, 2021. It is now read-only.

fix(task): merge map with 1 only #187

Merged
merged 32 commits into from Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b28c650
fix(task): merge map with 2 only
tglatt Apr 29, 2020
fe2e5ee
haxx: delete kinto collection before test
douglasduteil Apr 29, 2020
910d877
limit ds_configs init concurrency
douglasduteil Apr 29, 2020
98420db
do now throw when task update error
douglasduteil Apr 29, 2020
b558601
do now throw when task update error (2)
douglasduteil Apr 29, 2020
6e69652
fix(task): merge map with 1 only
douglasduteil Apr 29, 2020
44d9e0d
fix(task): merge map with 1 only (2)
douglasduteil Apr 29, 2020
03a6a4a
fix(task): update init script
douglasduteil Apr 29, 2020
19c1f9f
fix(task): update init script (2)
douglasduteil Apr 29, 2020
e4668f8
fix(task): merge map with 1 only (3)
douglasduteil Apr 29, 2020
f753b9e
fix(task): merge map with 1 only (4)
douglasduteil Apr 29, 2020
3485c6b
build(k8s): add init container to init kinto job
douglasduteil Apr 29, 2020
05df79f
build(k8s): add init container to init kinto job (2)
douglasduteil Apr 29, 2020
17c7a8a
build(k8s): add init container to init kinto job (3)
douglasduteil Apr 29, 2020
eaeb482
build(k8s): add init container to init kinto job (4)
douglasduteil Apr 29, 2020
e822ad0
build(k8s): add init container to init kinto job (5)
douglasduteil Apr 29, 2020
07dbe8c
fix(task): merge map with 1 only (5)
douglasduteil Apr 30, 2020
35c4d13
debug(task): log the number of task to update when scheduled
douglasduteil Apr 30, 2020
9c20a70
debug(dossier): log the number of procedure to update when scheduled
douglasduteil Apr 30, 2020
aefa945
build(k8s): add init container to init kinto job (6)
douglasduteil Apr 30, 2020
8ea072e
refactor(dossier): separate streams
douglasduteil Apr 30, 2020
5b36af2
refactor(dossier): separate streams (2)
douglasduteil Apr 30, 2020
32f124c
refactor(dossier): separate streams (3)
douglasduteil Apr 30, 2020
0ebd6a5
refactor(dossier): separate streams (4)
douglasduteil Apr 30, 2020
6436fc8
refactor(dossier): separate streams (5)
douglasduteil Apr 30, 2020
b5f2bfc
revert: haxx: delete kinto collection before test
douglasduteil Apr 30, 2020
6e3d7ea
debug(restapi): trace rest calls
douglasduteil Apr 30, 2020
9424e4c
revert: very verbose change -_-
douglasduteil Apr 30, 2020
8435ef7
perf: do 50 by 50 tasks
douglasduteil Apr 30, 2020
131b8da
debug(dossier): log procedure id from dossier
douglasduteil Apr 30, 2020
56d8b3c
debug(dossier): log on task complete
douglasduteil Apr 30, 2020
1a03472
flag the cleanner
douglasduteil Apr 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
@@ -1,3 +1,4 @@
.env
.eslintcache
node_modules
yarn-error.log
15 changes: 15 additions & 0 deletions k8s/kinto/job-init-kinto.yml
Expand Up @@ -30,3 +30,18 @@ spec:
cpu: ${INIT_KINTO_RESOURCE_CPU_LIMIT}
memory: ${INIT_KINTO_RESOURCE_MEMORY_LIMIT}
restartPolicy: Never
initContainers:
- name: wait-for-kinto
image: registry.gitlab.factory.social.gouv.fr/socialgouv/docker/curl:1.12.0
imagePullPolicy: Always
command:
- sh
- -c
- |
retry=300; # (60 x 1s) x 5min
while
! curl -sSL "http://kinto:8888/__lbheartbeat__" &&
[[ $(( retry-- )) -gt 0 ]];
do echo "Waiting for ${API_URL} to be available ($(( retry )))" ; sleep 1s ; done ;
[[ $(( retry )) -lt 1 ]] && exit 128;
echo Ready;
2 changes: 1 addition & 1 deletion packages/api/src/collector/service/dossier.service.ts
Expand Up @@ -106,7 +106,7 @@ class DossierService {
);
return dossierRepository.update(loadedRecord.id || "0", loadedRecord);
}
})
}, 1)
);
}
}
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/collector/service/task.service.ts
Expand Up @@ -30,6 +30,9 @@ class TaskService {
}

public markAsCompleted(task: Task): Observable<Task> {
logger.info(
`[TaskService] task ${task.procedure_id}-${task.dossier_id} ${task.action} completed`
);
task.task_completed_date = new Date().toISOString();
task.task_state = "completed";
return taskRepository.update(task);
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/routes.ts
Expand Up @@ -63,7 +63,7 @@ router.post(
.pipe(
map(() => dsConfigs),
flatMap((x: ProcedureConfig[]) => x),
mergeMap((x: ProcedureConfig) => dsProcedureConfigRepository.add(x)),
mergeMap((x: ProcedureConfig) => dsProcedureConfigRepository.add(x), 1),
tap((res: ProcedureConfig) =>
logger.info(`ds config ${res.group.label} added`)
)
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/scheduler/dossier-synchro.service.ts
Expand Up @@ -32,7 +32,7 @@ export const dossierSynchroService = {
return dossierService.save(config.group, procId, dossier);
}
return dossierService.update(dossierRecord, dossier);
}),
}, 1),
tap((dossier: DossierRecord | null) => {
if (dossier) {
logger.info(
Expand Down
50 changes: 32 additions & 18 deletions packages/api/src/scheduler/dossier.scheduler.ts
@@ -1,5 +1,5 @@
import { combineLatest, Observable } from "rxjs";
import { concatMap, flatMap, map, mergeMap, tap } from "rxjs/operators";
import { combineLatest, Observable, forkJoin } from "rxjs";
import { concatMap, flatMap, map, mergeMap, tap, share } from "rxjs/operators";
import {
apiResultService,
dsProcedureConfigService,
Expand Down Expand Up @@ -27,35 +27,44 @@ interface SyncContext {
export const dossierScheduler = {
start: () => {
handleScheduler(configuration.schedulerCronDS, "dossier-synchro", () => {
return syncProcedures().pipe(
const apiResult$ = syncProcedures().pipe(
mergeMap((x: ProcedureRecord) => buildSyncContext(x)),
map((x: SyncContext) => {
const actions = getSynchroActions(x.items, x.apiResult);
x.apiResult.items = x.items;
x.apiResult.actions = actions;
return x.apiResult;
}),
share()
);

const addAllTasks$ = apiResult$.pipe(
tap(apiResult =>
logger.info(
`[dossier.scheduler] procedure#${apiResult.procedure} - add ${apiResult.actions.length} actions`
)
),
flatMap((x: APIResult) =>
x.actions.map((a: SynchroAction) => ({ action: a, apiResult: x }))
),
mergeMap(
(input: { action: SynchroAction; apiResult: APIResult }) => {
return taskService.addTask(
input.action.action,
input.action.procedure,
input.action.item.id,
input.action.item.state,
input.action.item.updated_at
);
},
(input: { action: SynchroAction; apiResult: APIResult }) =>
input.apiResult,
5
),
mergeMap(({ action }) => {
return taskService.addTask(
action.action,
action.procedure,
action.item.id,
action.item.state,
action.item.updated_at
);
}, 1)
);

const updateApiResult$ = apiResult$.pipe(
mergeMap((apiResult: APIResult) => {
return apiResultService.update(apiResult);
})
}, 1)
);

return forkJoin(addAllTasks$, updateApiResult$);
});
}
};
Expand Down Expand Up @@ -111,6 +120,11 @@ function syncProcedures(): Observable<ProcedureRecord> {

function allDemarcheSimlifieeProcedures(): Observable<DSProcedure> {
return dsProcedureConfigService.all().pipe(
tap(procedureConfigs =>
logger.info(
`[SyncService.allDemarcheSimlifieeProcedures] process ${procedureConfigs.length} procedures`
)
),
flatMap((x: ProcedureConfig[]) => x),
flatMap((x: ProcedureConfig) => x.procedures),
concatMap(demarcheSimplifieeService.getDSProcedure),
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/scheduler/scheduler.service.ts
Expand Up @@ -34,7 +34,7 @@ export const handleScheduler = (
runScheduler$
.pipe(
takeLast(1),
mergeMap(() => synchroHistoryService.update(scheduler, end))
mergeMap(() => synchroHistoryService.update(scheduler, end), 1)
)
.subscribe({
complete: () => completeProcess(scheduler),
Expand Down
40 changes: 34 additions & 6 deletions packages/api/src/scheduler/task.scheduler.ts
@@ -1,5 +1,13 @@
import { combineLatest, Observable, of } from "rxjs";
import { exhaustMap, flatMap, mergeMap, reduce } from "rxjs/operators";
import { combineLatest, Observable, of, EMPTY } from "rxjs";
import {
exhaustMap,
flatMap,
mergeMap,
reduce,
catchError,
tap,
take
} from "rxjs/operators";
import {
dossierService,
dsProcedureConfigService,
Expand All @@ -11,6 +19,7 @@ import { statisticService } from "../collector/service/statistic.service";
import { configuration } from "../config";
import { dossierSynchroService } from "./dossier-synchro.service";
import { handleScheduler } from "./scheduler.service";
import { logger } from "../util";

export const taskScheduler = {
start: () => {
Expand All @@ -19,10 +28,11 @@ export const taskScheduler = {
allTasksToComplete(),
dsProcedureConfigService.all()
).pipe(
take(50),
mergeMap(
([task, procedures]) => processTask(task, procedures),
undefined,
5
1
),
reduce((acc: Task[], record: Task) => {
acc.push(record);
Expand Down Expand Up @@ -61,12 +71,30 @@ function processTask(taskToTreat: Task, procedures: ProcedureConfig[]) {
);
}
},
(task: Task) => task
(task: Task) => task,
1
),
mergeMap((task: Task) => taskService.markAsCompleted(task))
mergeMap(
(task: Task) =>
taskService.markAsCompleted(task).pipe(
catchError(err => {
logger.error(
`[task.scheduler] cannot update as completed task ${task.id}`,
err
);
return EMPTY;
})
),
1
)
);
}

function allTasksToComplete(): Observable<Task> {
return taskService.getTasksToComplete().pipe(flatMap((x: Task[]) => x));
return taskService.getTasksToComplete().pipe(
tap(tasks => {
logger.debug(`[task.scheduler] process ${tasks.length} to complete`);
}),
flatMap((x: Task[]) => x)
);
}
2 changes: 2 additions & 0 deletions packages/api/src/util/logger/logger.ts
Expand Up @@ -59,6 +59,8 @@ const wLogger = createLogger({

const logger = {
debug: (message: string) => wLogger.debug(message),
profile: (id: string) => wLogger.profile(id),
startTimer: () => wLogger.startTimer(),
error: (message: string, err: Error) => {
wLogger.error(message, err);
if (configuration.sentryEnabled) {
Expand Down
62 changes: 46 additions & 16 deletions packages/kinto/src/index.js
Expand Up @@ -42,28 +42,58 @@ const addDSConfig = async () => {
},
procedures: [6274, 6286]
};
const res = await api.createRecord("ds_collector", "ds_configs", dsConfig);
console.log("[init kinto] add ds_configs record: ", res);
{
const res = await api.createRecord(
"ds_collector",
"ds_configs",
dsConfig
);
console.log("[init kinto] add ds_configs record: ", res);
}
{
const res = await api.createRecord("ds_collector", "synchro_histories", {
scheduler: "task",
last_synchro: 0
});
console.log("[init kinto] add synchro_histories record: ", res);
}
} else {
console.log(
`[init kinto] ENVIRONMENT_TYPE= "${
configs.environmentType
}, no 'ds_configs' record has been created.`
`[init kinto] ENVIRONMENT_TYPE= "${configs.environmentType}, no 'ds_configs' record has been created.`
);
}
};
const init = async () => {
const cleanIt = () => {
console.log("[haxxx] delete the admin account");
await api
.deleteAdmin(configs.adminLogin, configs.adminPassword)
.catch(console.error);
const isNewUser = await api
.createAdmin(configs.adminLogin, configs.adminPassword)
.then(async res => {
await addCollections();
if (res.data) {
console.log("[init kinto] admin created", res);
await addDSConfig();
} else {
console.log("[init kinto] kinto already initialised... ", res);
}
});
.then(
res => Boolean(res.data),
() => false
);

console.log("[haxxx] delete the ds_collector bucket");
try {
await api.deleteBucket("ds_collector");
} catch {}
}

const init = async () => {
if (process.env.CLEAN_DB) {
cleanIt();
}
console.log("[init kinto] addCollections");
await addCollections();

if (isNewUser) {
console.log("[init kinto] admin created");
await addDSConfig();
} else {
console.log("[init kinto] kinto already initialised... ");
}
};

init();
init().catch(console.error);
44 changes: 39 additions & 5 deletions packages/kinto/src/kinto-api.js
Expand Up @@ -27,15 +27,27 @@ const header = auth => {

const api = async (url, options) => {
const response = await fetch(url, options);
if (!response.ok) throw response;
return response.json();
};

module.exports.createAdmin = async function(login, password) {
exports.createAdmin = async function(login, password) {
const body = { data: { password: password } };
return api(_account(login), _requestOptions("PUT", undefined, body));
};

module.exports.createUser = async function(login, password) {
exports.deleteAdmin = async function(login, password) {
return api(
_account(login),
_requestOptions(
"DELETE",
`${configs.adminLogin}:${configs.adminPassword}`,
{}
)
);
};

exports.createUser = async function(login, password) {
const body = { data: { password: password } };
return api(
_account(login),
Expand All @@ -47,7 +59,7 @@ module.exports.createUser = async function(login, password) {
);
};

module.exports.createBucket = async function(name) {
exports.createBucket = async function(name) {
const body = { data: { id: name } };
return api(
_buckets(),
Expand All @@ -59,7 +71,18 @@ module.exports.createBucket = async function(name) {
);
};

module.exports.createCollection = async function(bucket, collection) {
exports.deleteBucket = async function(name) {
return api(
_bucket(name),
_requestOptions(
"DELETE",
`${configs.adminLogin}:${configs.adminPassword}`,
{}
)
);
};

exports.createCollection = async function(bucket, collection) {
const body = { data: { id: collection } };
return api(
_collections(bucket),
Expand All @@ -71,7 +94,18 @@ module.exports.createCollection = async function(bucket, collection) {
);
};

module.exports.createRecord = async function(bucket, collection, data) {
exports.deleteCollection = async function(bucket, name) {
return api(
`${_collections(bucket)}/${name}`,
_requestOptions(
"DELETE",
`${configs.adminLogin}:${configs.adminPassword}`,
{}
)
);
};

exports.createRecord = async function(bucket, collection, data) {
const body = { data };
return api(
_records(bucket, collection),
Expand Down