Skip to content

Commit

Permalink
upgrade rxjs
Browse files Browse the repository at this point in the history
  • Loading branch information
nataly87s committed Apr 7, 2019
1 parent c48bf1b commit 01f9e5d
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 39 deletions.
2 changes: 1 addition & 1 deletion services/authoring/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"passport-jwt": "^4.0.0",
"passport-strategy": "^1.0.0",
"ramda": "^0.26.1",
"rxjs": "^5.5.3",
"rxjs": "^6.4.0",
"simple-git": "^1.84.0",
"sshpk": "^1.14.1",
"typescript-ioc": "^1.0.2",
Expand Down
26 changes: 16 additions & 10 deletions services/authoring/src/repositories/git-continuous-updater.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
import { Observable } from 'rxjs';
import { concat, defer, EMPTY } from 'rxjs';
import { catchError, delay, distinctUntilChanged, repeat, tap } from 'rxjs/operators';
import logger from '../utils/logger';
const { CONTINUOUS_UPDATER_INTERVAL } = require('../constants');

export default {
onUpdate(gitTransactionManager) {
const updateRepo$ = Observable.defer(async () => {
const updateRepo$ = defer(async () => {
await gitTransactionManager.read(async (gitRepo) => await gitRepo.fetch());
return await gitTransactionManager.write(async (gitRepo) => await gitRepo.mergeMaster());
});
}).pipe(
catchError((err) => {
logger.error({ err }, 'Error pulling changes in git repo');
return EMPTY;
}),
);

return updateRepo$
.do(null, (err) => logger.error({ err }, 'Error pulling changes in git repo'))
.catch((_) => Observable.empty())
.concat(Observable.empty().delay(CONTINUOUS_UPDATER_INTERVAL))
.repeat()
.distinctUntilChanged()
.do((sha) => logger.info({ sha }, 'Updated git repo'));
const delayComplete$ = EMPTY.pipe(delay(CONTINUOUS_UPDATER_INTERVAL));

return concat(updateRepo$, delayComplete$).pipe(
repeat(),
distinctUntilChanged(),
tap((sha) => logger.info({ sha }, 'Updated git repo')),
);
},
};
15 changes: 9 additions & 6 deletions services/authoring/src/search-index/build/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module.exports = function createIndex(repoDir) {
const promisify = require('util').promisify;
const glob = promisify(require('glob'));
const { Observable } = require('rxjs');
const { map, tap, toArray } = require('rxjs/operators');
const _ = require('highland');
const readFile = _.wrapCallback(require('fs').readFile);
const lunr = require('lunr');
Expand All @@ -26,7 +27,7 @@ module.exports = function createIndex(repoDir) {
return glob(path.join(repoDir, 'manifests/**/*.json'))
.catch(console.error)
.then((fileNames) =>
Observable.create((observer) => {
new Observable((observer) => {
_(fileNames)
.map(readFile)
.parallel(10)
Expand All @@ -36,11 +37,13 @@ module.exports = function createIndex(repoDir) {
.on('data', (x) => observer.next(x))
.on('end', () => observer.complete());
})
.map(({ key_path: id, meta }) => Object.assign({}, meta, { id }))
.map(mapToLower)
.do((obj) => builder.add(obj))
.toArray()
.map((manifests) => ({ manifests, index: builder.build() }))
.pipe(
map(({ key_path: id, meta }) => Object.assign({}, meta, { id })),
map(mapToLower),
tap((obj) => builder.add(obj)),
toArray(),
map((manifests) => ({ manifests, index: builder.build() })),
)
.toPromise(),
);
};
10 changes: 7 additions & 3 deletions services/authoring/src/search-index/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ async function refreshIndex(repoDir) {
'node',
[path.join(__dirname, 'build/cli.js'), repoDir, indexFile],
(error, stdout, stderr) => {
logger.info(stdout);
if (error) reject(error);
else resolve();
logger.info({ stdout, stderr }, 'finished indexing');

if (error) {
reject(error);
} else {
resolve();
}
},
);
});
Expand Down
21 changes: 13 additions & 8 deletions services/authoring/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import path from 'path';
import BluebirdPromise from 'bluebird';
import express from 'express';
import { Observable } from 'rxjs';
import { defer } from 'rxjs';
import { retry, share, switchMapTo, tap } from 'rxjs/operators';
import fs from 'fs-extra';
import passport from 'passport';
import Transactor from './utils/transactor';
Expand Down Expand Up @@ -122,18 +123,22 @@ async function startServer() {
app.listen(PORT, () => logger.info({ port: PORT }, 'server started'));
}

const onUpdate$ = GitContinuousUpdater.onUpdate(gitTransactionManager).share();
const onUpdate$ = GitContinuousUpdater.onUpdate(gitTransactionManager).pipe(share());

onUpdate$
.switchMapTo(Observable.defer(() => searchIndex.refreshIndex(gitRepositoryConfig.localPath)))
.do(null, (err: any) => logger.error(err, 'Error refreshing index'))
.retry()
.pipe(
switchMapTo(defer(() => searchIndex.refreshIndex(gitRepositoryConfig.localPath))),
tap({ error: (err) => logger.error({ err }, 'Error refreshing search index') }),
retry(),
)
.subscribe();

onUpdate$
.switchMapTo(Observable.defer(() => appsRepository.refresh()))
.do(null, (err: any) => logger.error(err, 'Error refersing apps index'))
.retry()
.pipe(
switchMapTo(defer(() => appsRepository.refresh())),
tap({ error: (err) => logger.error({ err }, 'Error refreshing apps index') }),
retry(),
)
.subscribe();

gitRepoCreationPromiseWithTimeout.then(startServer).catch((reason: any) => {
Expand Down
17 changes: 6 additions & 11 deletions services/authoring/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3207,12 +3207,12 @@ rimraf@~2.4.0:
dependencies:
glob "^6.0.1"

rxjs@^5.5.3:
version "5.5.12"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.5.12.tgz#6fa61b8a77c3d793dbaf270bee2f43f652d741cc"
integrity sha512-xx2itnL5sBbqeeiVgNPVuQQ1nC8Jp2WfNJhXWHmElW9YmrpS9UVnNzhP3EH3HFqexO5Tlp8GhYY+WEcqcVMvGw==
rxjs@^6.4.0:
version "6.4.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-6.4.0.tgz#f3bb0fe7bda7fb69deac0c16f17b50b0b8790504"
integrity sha512-Z9Yfa11F6B9Sg/BK9MnqnQ+aQYicPLtilXBp2yUtDt2JRCE0h26d33EnfO3ZxoNxG0T92OUucP3Ct7cpfkdFfw==
dependencies:
symbol-observable "1.0.1"
tslib "^1.9.0"

safe-buffer@5.1.2, safe-buffer@^5.0.1, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1:
version "5.1.2"
Expand Down Expand Up @@ -3559,11 +3559,6 @@ swagger-ui-express@^4.0.1:
dependencies:
swagger-ui-dist "^3.18.1"

symbol-observable@1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.0.1.tgz#8340fc4702c3122df5d22288f88283f513d3fdd4"
integrity sha1-g0D8RwLDEi310iKI+IKD9RPT/dQ=

tar-fs@^1.16.3:
version "1.16.3"
resolved "https://registry.yarnpkg.com/tar-fs/-/tar-fs-1.16.3.tgz#966a628841da2c4010406a82167cbd5e0c72d509"
Expand Down Expand Up @@ -3685,7 +3680,7 @@ ts-node@^8.0.3:
source-map-support "^0.5.6"
yn "^3.0.0"

tslib@^1.8.0, tslib@^1.8.1:
tslib@^1.8.0, tslib@^1.8.1, tslib@^1.9.0:
version "1.9.3"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.9.3.tgz#d7e4dd79245d85428c4d7e4822a79917954ca286"
integrity sha512-4krF8scpejhaOgqzBEcGM7yDIEfi0/8+8zDRZhNZZ2kjmHJ4hv3zCbQWxoJGz1iw5U0Jl0nma13xzHXcncMavQ==
Expand Down

0 comments on commit 01f9e5d

Please sign in to comment.