Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Merge pull request #90 from CatalystCode/cleanup-settingsmutations
Browse files Browse the repository at this point in the history
Consistency fixes for settings mutations
  • Loading branch information
c-w committed Aug 7, 2017
2 parents 88315dc + 29d1f2d commit 0689a2e
Showing 1 changed file with 88 additions and 126 deletions.
214 changes: 88 additions & 126 deletions src/resolvers-cassandra/Settings/mutations.js
Expand Up @@ -5,7 +5,7 @@ const uuid = require('uuid/v4');
const cassandraConnector = require('../../clients/cassandra/CassandraConnector');
const blobStorageClient = require('../../clients/storage/BlobStorageClient');
const { withRunTime, limitForInClause } = require('../shared');
const trackEvent = require('../../clients/appinsights/AppInsightsClient').trackEvent;
const { trackEvent } = require('../../clients/appinsights/AppInsightsClient');
const apiUrlBase = process.env.FORTIS_CENTRAL_ASSETS_HOST || 'https://fortiscentral.blob.core.windows.net';
const STREAM_PIPELINE_TWITTER = 'twitter';
const STREAM_CONNECTOR_TWITTER = 'Twitter';
Expand All @@ -25,31 +25,27 @@ function createOrReplaceSite(args, res) { // eslint-disable-line no-unused-vars

function _insertTopics(siteType) {
return new Promise((resolve, reject) => {
if(!siteType || !siteType.length) {
return reject('insertTopics: siteType is not defined');
}
if (!siteType || !siteType.length) return reject('insertTopics: siteType is not defined');

const uri = `${apiUrlBase}/settings/siteTypes/${siteType}/topics/defaultTopics.json`;
blobStorageClient.fetchJson(uri)
.then(response => {
const mutations = response.map(topic => {
return {
query: `INSERT INTO fortis.watchlist (topicid,topic,lang_code,translations,insertion_time)
VALUES (?, ?, ?, ?, toTimestamp(now()));`,
params: [uuid(), topic.topic, topic.lang_code, topic.translations]
};
.then(response => {
return response.map(topic => ({
query: `INSERT INTO fortis.watchlist (topicid,topic,lang_code,translations,insertion_time)
VALUES (?, ?, ?, ?, toTimestamp(now()));`,
params: [uuid(), topic.topic, topic.lang_code, topic.translations]
}));
})
.then(mutations => {
cassandraConnector.executeBatchMutations(mutations)
.then(() => {
resolve({
numTopicsInserted: mutations.length
});
return mutations;
})
.then(mutations => {
cassandraConnector.executeBatchMutations(mutations)
.then(() => {
resolve({
numTopicsInserted: mutations.length
});
})
.catch(reject);
})
.catch(reject);
})
.catch(reject);
});
}

Expand All @@ -62,54 +58,49 @@ const insertTopics = trackEvent(_insertTopics, 'Settings.Topics.Insert', (respon
function createSite(args, res) { // eslint-disable-line no-unused-vars
return new Promise((resolve, reject) => {
const siteType = args && args.input && args.input.siteType;
if(!siteType || !siteType.length) {
return reject(`siteType for sitename ${args.input.name} is not defined`);
}
if (!siteType || !siteType.length) return reject(`siteType for sitename ${args.input.name} is not defined`);

cassandraConnector.executeQuery('SELECT * FROM fortis.sitesettings WHERE sitename = ?;', [args.input.name])
.then(rows => {
if (!rows || !rows.length) {
insertTopics(siteType)
.then(() => {
return cassandraConnector.executeBatchMutations([{
query: `INSERT INTO fortis.sitesettings (
geofence,
defaultzoom,
logo,
title,
sitename,
languages,
insertion_time
) VALUES (?,?,?,?,?,?,toTimestamp(now()))`,
params: [
args.input.targetBbox,
args.input.defaultZoomLevel,
args.input.logo,
args.input.title,
args.input.name,
args.input.supportedLanguages
]
}]);
})
.then(() => {
resolve({
name: args.input.name,
properties: {
targetBbox: args.input.targetBbox,
defaultZoomLevel: args.input.defaultZoomLevel,
logo: args.input.logo,
title: args.input.title,
defaultLocation: args.input.defaultLocation,
supportedLanguages:args.input.supportedLanguages
}
});
})
.catch(reject);
}
else if (rows.length == 1) return reject(`Site with sitename ${args.input.name} already exists.`);
else return reject(`(${rows.length}) number of sites with sitename ${args.input.name} already exist.`);
})
.catch(reject);
.then(rows => {
if (!rows || !rows.length) return insertTopics(siteType);
else if (rows.length == 1) return reject(`Site with sitename ${args.input.name} already exists.`);
else return reject(`(${rows.length}) number of sites with sitename ${args.input.name} already exist.`);
})
.then(() => {
return cassandraConnector.executeBatchMutations([{
query: `INSERT INTO fortis.sitesettings (
geofence,
defaultzoom,
logo,
title,
sitename,
languages,
insertion_time
) VALUES (?,?,?,?,?,?,toTimestamp(now()))`,
params: [
args.input.targetBbox,
args.input.defaultZoomLevel,
args.input.logo,
args.input.title,
args.input.name,
args.input.supportedLanguages
]
}]);
})
.then(() => {
resolve({
name: args.input.name,
properties: {
targetBbox: args.input.targetBbox,
defaultZoomLevel: args.input.defaultZoomLevel,
logo: args.input.logo,
title: args.input.title,
defaultLocation: args.input.defaultLocation,
supportedLanguages:args.input.supportedLanguages
}
});
})
.catch(reject);
});
}

Expand Down Expand Up @@ -141,14 +132,12 @@ function removeSite(args, res) { // eslint-disable-line no-unused-vars
}

function facebookPagePrimaryKeyValuesToRowKey(values) {
return [ TRUSTED_SOURCES_CONNECTOR_FACEBOOK, values[1], values[2] ];
return [TRUSTED_SOURCES_CONNECTOR_FACEBOOK, values[1], values[2]];
}

function facebookPageRowKeyToPrimaryKey(page) {
const params = page.RowKey.split(',');
if (params.length != 3) {
throw('Expecting three element comma-delimited RowKey representing (connector, sourceid, sourcetype).');
}
if (params.length != 3) throw('Expecting three element comma-delimited RowKey representing (connector, sourceid, sourcetype).');
return facebookPagePrimaryKeyValuesToRowKey(params);
}

Expand All @@ -168,11 +157,8 @@ function modifyFacebookPages(args, res) { // eslint-disable-line no-unused-vars
const pages = args && args.input && args.input.pages;
if (!pages || !pages.length) return reject('No pages specified');

const invalidPages = pages.filter(page=>!page.pageUrl);
if (invalidPages.length > 0) {
reject(`pageUrl required for ${JSON.stringify(invalidPages)}`);
return;
}
const invalidPages = pages.filter(page => !page.pageUrl);
if (invalidPages.length > 0) return reject(`pageUrl required for ${JSON.stringify(invalidPages)}`);

const mutations = [];
const expectedRecords = [];
Expand All @@ -195,9 +181,8 @@ function modifyFacebookPages(args, res) { // eslint-disable-line no-unused-vars
});

cassandraConnector.executeBatchMutations(mutations)
.then(() => { resolve({ pages: expectedRecords }); })
.catch(reject)
;
.then(() => resolve({ pages: expectedRecords }))
.catch(reject);
});
}

Expand All @@ -210,31 +195,23 @@ function removeFacebookPages(args, res) { // eslint-disable-line no-unused-vars
const pages = args && args.input && args.input.pages;
if (!pages || !pages.length) return reject('No pages specified');

const invalidPages = pages.filter(page=>!page.RowKey);
if (invalidPages.length > 0) {
reject(`RowKey required for ${JSON.stringify(invalidPages)}`);
return;
}

const mutations = pages.map(page => {
return {
query: 'DELETE FROM fortis.trustedsources WHERE connector = ? AND sourceid = ? AND sourcetype = ?',
params: facebookPageRowKeyToPrimaryKey(page)
};
});
const invalidPages = pages.filter(page => !page.RowKey);
if (invalidPages.length > 0) return reject(`RowKey required for ${JSON.stringify(invalidPages)}`);

const mutations = pages.map(page => ({
query: 'DELETE FROM fortis.trustedsources WHERE connector = ? AND sourceid = ? AND sourcetype = ?',
params: facebookPageRowKeyToPrimaryKey(page)
}));

cassandraConnector.executeBatchMutations(mutations)
.then(() => { resolve({ pages: pages }); })
.catch(reject)
;
.then(() => resolve({ pages: pages }))
.catch(reject);
});
}

function trustedTwitterAccountRowKeyToPrimaryKey(account) {
const params = account.RowKey.split(',');
if (params.length != 3) {
throw('Expecting three element comma-delimited RowKey representing (connector, sourceid, sourcetype).');
}
if (params.length != 3) throw('Expecting three element comma-delimited RowKey representing (connector, sourceid, sourcetype).');
return trustedTwitterAccountPrimaryKeyValuesToRowKey(params);
}

Expand Down Expand Up @@ -268,9 +245,8 @@ function modifyTrustedTwitterAccounts(args, res) { // eslint-disable-line no-unu
});

cassandraConnector.executeBatchMutations(queries)
.then(() => { resolve({ accounts: accounts.map(normalizedTrustedTwitterAccount) }); })
.catch(reject)
;
.then(() => resolve({ accounts: accounts.map(normalizedTrustedTwitterAccount) }))
.catch(reject);
});
}

Expand All @@ -284,15 +260,11 @@ function removeTrustedTwitterAccounts(args, res) { // eslint-disable-line no-unu
if (!accounts || !accounts.length) return reject('No accounts specified');

const deleteByPrimaryKey = 'DELETE FROM fortis.trustedsources WHERE connector = ? AND sourceid = ? AND sourcetype = ?';
const queries = accounts.map(account => {
const params = trustedTwitterAccountRowKeyToPrimaryKey(account);
return {query: deleteByPrimaryKey, params: params};
});
const queries = accounts.map(trustedTwitterAccountRowKeyToPrimaryKey).map(params => ({query: deleteByPrimaryKey, params}));

cassandraConnector.executeBatchMutations(queries)
.then(() => { resolve({ accounts: accounts.map(normalizedTrustedTwitterAccount) }); })
.catch(reject)
;
.then(() => resolve({ accounts: accounts.map(normalizedTrustedTwitterAccount) }))
.catch(reject);
});
}

Expand All @@ -314,19 +286,16 @@ function modifyTwitterAccounts(args, res) { // eslint-disable-line no-unused-var
const updatedAccount = account;
if (account.RowKey) {
queries.push({ query: updateStatement, params: [STREAM_CONNECTOR_TWITTER, account, STREAM_PIPELINE_TWITTER, account.RowKey] });
}
else {
} else {
updatedAccount.RowKey = uuid();
queries.push({ query: insertStatement, params: [STREAM_PIPELINE_TWITTER, updatedAccount.RowKey, STREAM_CONNECTOR_TWITTER, account] });
}
expectedRecords.push(updatedAccount);
});

cassandraConnector.executeBatchMutations(queries)
.then(() => { resolve({ accounts: expectedRecords }); })
.catch(reject)
;

.then(() => resolve({ accounts: expectedRecords }))
.catch(reject);
});
}

Expand All @@ -340,19 +309,14 @@ function removeTwitterAccounts(args, res) { // eslint-disable-line no-unused-var
if (!accounts || !accounts.length) return reject('No accounts specified');

const invalidAccounts = accounts.filter(account=>!account.RowKey);
if (invalidAccounts.length > 0) {
reject(`RowKey required for ${JSON.stringify(invalidAccounts)}`);
return;
}
if (invalidAccounts.length > 0) return reject(`RowKey required for ${JSON.stringify(invalidAccounts)}`);

const statement = 'DELETE FROM fortis.streams WHERE streamid = ?';
const queries = accounts.map( account => { return { query: statement, params: [ account.RowKey ] }; } );
const queries = accounts.map(account => ({query: statement, params: [account.RowKey]}));

cassandraConnector.executeBatchMutations(queries)
.then(() => { resolve({ accounts: accounts }); })
.catch(reject)
;

.then(() => resolve({ accounts }))
.catch(reject);
});
}

Expand Down Expand Up @@ -384,10 +348,8 @@ function modifyBlacklist(args, res) { // eslint-disable-line no-unused-vars
});

cassandraConnector.executeBatchMutations(mutations)
.then(() => { resolve({ filters: filterRecords }); })
.catch(reject)
;

.then(() => resolve({ filters: filterRecords }))
.catch(reject);
});
}

Expand Down

0 comments on commit 0689a2e

Please sign in to comment.