Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Commit

Permalink
Merge pull request #77 from CatalystCode/create-replace-site
Browse files Browse the repository at this point in the history
Added createSite, removeSite to settings and refactored executeQuery
  • Loading branch information
Smarker committed Jul 28, 2017
2 parents 8b7ddce + d5e1270 commit 87b82fe
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 29 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ jspm_packages

# IDEA config files
.idea/

# VS code config
.vscode
5 changes: 0 additions & 5 deletions .vscode/settings.json

This file was deleted.

43 changes: 29 additions & 14 deletions src/clients/cassandra/CassandraConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const chunk = require('lodash/chunk');
const flatten = require('lodash/flatten');
const trackDependency = require('../appinsights/AppInsightsClient').trackDependency;

const FETCH_SIZE = process.env.FETCH_SIZE || 1000;
const MAX_OPERATIONS_PER_BATCH = process.env.MAX_OPERATIONS_PER_BATCH || 10;
const MAX_CONCURRENT_BATCHES = process.env.MAX_CONCURRENT_BATCHES || 50;
const distance = cassandra.types.distance;
Expand All @@ -20,6 +21,11 @@ const options = {
[distance.local]: CORE_CONNECTIONS_PER_HOST_LOCAL,
[distance.remote]: CORE_CONNECTIONS_PER_HOST_REMOTE
}
},
queryOptions: {
autoPage: true,
prepare: true,
fetchSize: FETCH_SIZE
}
};
const client = new cassandra.Client(options);
Expand Down Expand Up @@ -49,29 +55,38 @@ function executeBatchMutations(mutations) {
}
},
(err) => {
if (err) {
reject(err);
} else {
resolve();
}
if (err) reject(err);
else resolve({numBatchMutations: chunkedMutations.length});
});
});
}

/**
* @param {string} query
* @param {string[]} params
* @param {{fetchSize: int}} [options]
* @returns {Promise.<object[]>}
*/
function executeQuery(query, params) { // eslint-disable-line no-unused-vars
return new Promise((resolve, reject) => { // eslint-disable-line no-unused-vars
client.execute(query, params, { prepare: true }, (err, result) => {
if (err) {
return reject(err);
} else {
return resolve(result.rows);
}
});
function executeQuery(query, params, options) {
return new Promise((resolve, reject) => {
try {
const rows = [];
const stream = client.stream(query, params, options)
.on('readable', () => {
let row;
while((row = stream.read()) != undefined) {
rows.push(row);
}
})
.on('error', err => {
if (err) reject(err);
})
.on('end', () => {
resolve(rows);
});
} catch (exception) {
reject(exception);
}
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/clients/storage/BlobStorageClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Promise = require('promise');
const request = require('request');
const trackDependency = require('../appinsights/AppInsightsClient').trackDependency;

/**
* @param {string} uri
Expand All @@ -27,5 +28,5 @@ function fetchJson(uri) {
}

module.exports = {
fetchJson: fetchJson
fetchJson: trackDependency(fetchJson, 'BlobStorage', 'fetchJson')
};
3 changes: 2 additions & 1 deletion src/resolvers-cassandra/Settings/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ const mutations = require('./mutations');
const queries = require('./queries');

module.exports = {
createOrReplaceSite: mutations.createOrReplaceSite,
createSite: mutations.createSite,
removeSite: mutations.removeSite,
modifyFacebookPages: mutations.modifyFacebookPages,
removeFacebookPages: mutations.removeFacebookPages,
modifyTrustedTwitterAccounts: mutations.modifyTrustedTwitterAccounts,
Expand Down
132 changes: 127 additions & 5 deletions src/resolvers-cassandra/Settings/mutations.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,141 @@
const Promise = require('promise');
const uuid = require('uuid/v4');
const cassandraConnector = require('../../clients/cassandra/CassandraConnector');
const blobStorageClient = require('../../clients/storage/BlobStorageClient');
const withRunTime = require('../shared').withRunTime;
const trackEvent = require('../../clients/appinsights/AppInsightsClient').trackEvent;

const apiUrlBase = process.env.FORTIS_CENTRAL_ASSETS_HOST || 'https://fortiscentral.blob.core.windows.net';
const STREAM_PIPELINE_TWITTER = 'twitter';
const STREAM_CONNECTOR_TWITTER = 'Twitter';

const TRUSTED_SOURCES_CONNECTOR_TWITTER = 'Twitter';
const TRUSTED_SOURCES_CONNECTOR_FACEBOOK = 'FacebookPage';
const TRUSTED_SOURCES_RANK_DEFAULT = 10;

function createOrReplaceSite(args, res) { // eslint-disable-line no-unused-vars
return new Promise((resolve, reject) => { // eslint-disable-line no-unused-vars
return reject(
'This API call is no longer supported. ' +
'Its functionality has been separated into createSite and removeSite.'
);
});
}

function _insertTopics(siteType) {
return new Promise((resolve, reject) => {
if(!siteType || !siteType.length) {
return reject('insertTopics: siteType is not defined');
}
const uri = `${apiUrlBase}/settings/siteTypes/${siteType}/topics/defaultTopics.json`;
blobStorageClient.fetchJson(uri)
.then(response => {
const mutations = response.map(topic => {
return {
query: `INSERT INTO fortis.watchlist (topicid,topic,lang_code,translations,insertion_time)
VALUES (?, ?, ?, ?, toTimestamp(now()));`,
params: [uuid(), topic.topic, topic.lang_code, topic.translations]
};
});
return mutations;
})
.then(mutations => {
cassandraConnector.executeBatchMutations(mutations)
.then(() => {
resolve({
numTopicsInserted: mutations.length
});
})
.catch(reject);
})
.catch(reject);
});
}

const insertTopics = trackEvent(_insertTopics, 'Settings.Topics.Insert', (response, err) => ({numTopicsInserted: err ? 0 : response.numTopicsInserted}));

/**
* @param {{input: {targetBbox: number[], defaultZoomLevel: number, logo: string, title: string, name: string, defaultLocation: number[], storageConnectionString: string, featuresConnectionString: string, mapzenApiKey: string, fbToken: string, supportedLanguages: string[]}}} args
* @returns {Promise.<{name: string, properties: {targetBBox: number[], defaultZoomLevel: number, logo: string, title: string, defaultLocation: number[], storageConnectionString: string, featuresConnectionString: string, mapzenApiKey: string, fbToken: string, supportedLanguages: string[]}}>}
* @param {{input: {siteType: string, targetBbox: number[], defaultZoomLevel: number, logo: string, title: string, name: string, defaultLocation: number[], storageConnectionString: string, featuresConnectionString: string, mapzenApiKey: string, fbToken: string, supportedLanguages: string[]}}} args
* @returns {Promise}
*/
function createOrReplaceSite(args, res) { // eslint-disable-line no-unused-vars
function createSite(args, res) { // eslint-disable-line no-unused-vars
return new Promise((resolve, reject) => {
const siteType = args && args.input && args.input.siteType;
if(!siteType || !siteType.length) {
return reject(`siteType for sitename ${args.input.name} is not defined`);
}

cassandraConnector.executeQuery('SELECT * FROM fortis.sitesettings WHERE sitename = ?;', [args.input.name])
.then(rows => {
if (!rows || !rows.length) {
insertTopics(siteType)
.then(() => {
return cassandraConnector.executeBatchMutations([{
query: `INSERT INTO fortis.sitesettings (
geofence,
defaultzoom,
logo,
title,
sitename,
languages,
insertion_time
) VALUES (?,?,?,?,?,?,toTimestamp(now()))`,
params: [
args.input.targetBbox,
args.input.defaultZoomLevel,
args.input.logo,
args.input.title,
args.input.name,
args.input.supportedLanguages
]
}]);
})
.then(() => {
resolve({
name: args.input.name,
properties: {
targetBbox: args.input.targetBbox,
defaultZoomLevel: args.input.defaultZoomLevel,
logo: args.input.logo,
title: args.input.title,
defaultLocation: args.input.defaultLocation,
supportedLanguages:args.input.supportedLanguages
}
});
})
.catch(reject);
}
else if (rows.length == 1) return reject(`Site with sitename ${args.input.name} already exists.`);
else return reject(`(${rows.length}) number of sites with sitename ${args.input.name} already exist.`);
})
.catch(reject);
});
}

/**
* @param {{input: {siteType: string, targetBbox: number[], defaultZoomLevel: number, logo: string, title: string, name: string, defaultLocation: number[], storageConnectionString: string, featuresConnectionString: string, mapzenApiKey: string, fbToken: string, supportedLanguages: string[]}}} args
* @returns {Promise}
*/
function removeSite(args, res) { // eslint-disable-line no-unused-vars
return new Promise((resolve, reject) => {
cassandraConnector.executeBatchMutations([{
query: 'DELETE FROM fortis.sitesettings WHERE sitename = ?;',
params: [args.input.name]
}])
.then(() => {
resolve({
name: args.input.name,
properties: {
targetBbox: args.input.targetBbox,
defaultZoomLevel: args.input.defaultZoomLevel,
logo: args.input.logo,
title: args.input.title,
defaultLocation: args.input.defaultLocation,
supportedLanguages: args.input.supportedLanguages
}
});
})
.catch(reject);
});
}

function facebookPagePrimaryKeyValuesToRowKey(values) {
Expand Down Expand Up @@ -296,7 +416,9 @@ function removeBlacklist(args, res) { // eslint-disable-line no-unused-vars
}

module.exports = {
createOrReplaceSite: trackEvent(createOrReplaceSite, 'createOrReplaceSite'),
createOrReplaceSite: createOrReplaceSite,
createSite: trackEvent(createSite, 'createSite'),
removeSite: trackEvent(removeSite, 'removeSite'),
modifyFacebookPages: trackEvent(withRunTime(modifyFacebookPages), 'modifyFacebookPages'),
removeFacebookPages: trackEvent(withRunTime(removeFacebookPages), 'removeFacebookPages'),
modifyTrustedTwitterAccounts: trackEvent(withRunTime(modifyTrustedTwitterAccounts), 'modifyTrustedTwitterAccounts'),
Expand Down
4 changes: 2 additions & 2 deletions src/resolvers-cassandra/Settings/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ function twitterAccounts(args, res) { // eslint-disable-line no-unused-vars
return new Promise((resolve, reject) => {
const sourcesByConnector = 'SELECT params FROM fortis.streams WHERE connector = ? ALLOW FILTERING';
cassandraConnector.executeQuery(sourcesByConnector, [CONNECTOR_TWITTER])
.then(rows => {
const accounts = rows.map(cassandraRowToTwitterAccount);
.then(result => {
const accounts = result.map(cassandraRowToTwitterAccount);
resolve({accounts: accounts});
})
.catch(reject)
Expand Down
18 changes: 18 additions & 0 deletions src/resolvers/Settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ function sites(args, res) { // eslint-disable-line no-unused-vars
});
}

function createSite(args, res) { // eslint-disable-line no-unused-vars
return new Promise((resolve, reject) => { // eslint-disable-line no-unused-vars
return reject(
'This API call is supported in v2. '
);
});
}

function removeSite(args, res) { // eslint-disable-line no-unused-vars
return new Promise((resolve, reject) => { // eslint-disable-line no-unused-vars
return reject(
'This API call is supported in v2. '
);
});
}

function createOrReplaceSite(args, res) { // eslint-disable-line no-unused-vars
const siteDefintion = args.input;
return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -268,6 +284,8 @@ function removeBlacklist(args, res) { // eslint-disable-line no-unused-vars

module.exports = {
sites: sites,
createSite: createSite,
removeSite: removeSite,
createOrReplaceSite: createOrReplaceSite,
modifyFacebookPages: modifyFacebookPages,
modifyTrustedTwitterAccounts: modifyTrustedTwitterAccounts,
Expand Down
5 changes: 4 additions & 1 deletion src/schemas/SettingsSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ module.exports = graphql.buildSchema(`
}
type Mutation {
createSite(input: SiteDefinition!): Site
removeSite(input: SiteDefinition!): Site
removeFacebookPages(input: FacebookPageListInput!): FacebookPageCollection
modifyFacebookPages(input: FacebookPageListInput!): FacebookPageCollection
createOrReplaceSite(input: SiteDefinition!): Site
Expand Down Expand Up @@ -47,7 +49,7 @@ module.exports = graphql.buildSchema(`
type TwitterAccountCollection {
runTime: String,
accounts: [TwitterAccount],
accounts: [TwitterAccount]
}
type TrustedTwitterAccountCollection {
Expand Down Expand Up @@ -101,6 +103,7 @@ module.exports = graphql.buildSchema(`
}
input SiteDefinition {
siteType: String,
targetBbox: [Float],
defaultZoomLevel: Int,
logo: String,
Expand Down

0 comments on commit 87b82fe

Please sign in to comment.