From dc550c296d5d18b89b4fe0d3a2accc88daf2030b Mon Sep 17 00:00:00 2001 From: Smarker Date: Tue, 10 Oct 2017 11:47:49 -0400 Subject: [PATCH 1/7] Add, remove, and query trustedsources. --- src/resolvers-cassandra/Settings/index.js | 2 + src/resolvers-cassandra/Settings/mutations.js | 72 ++++++++++++++++++- src/resolvers-cassandra/Settings/queries.js | 28 ++++++-- src/schemas/SettingsSchema.js | 28 ++++++-- 4 files changed, 118 insertions(+), 12 deletions(-) diff --git a/src/resolvers-cassandra/Settings/index.js b/src/resolvers-cassandra/Settings/index.js index fbacae2f..70935a03 100644 --- a/src/resolvers-cassandra/Settings/index.js +++ b/src/resolvers-cassandra/Settings/index.js @@ -19,6 +19,8 @@ module.exports = { removeBlacklist: mutations.removeBlacklist, removeKeywords: mutations.removeKeywords, addKeywords: mutations.addKeywords, + addTrustedSources: mutations.addTrustedSources, + removeTrustedSources: mutations.removeTrustedSources, siteTerms: queries.siteTerms, sites: queries.sites, diff --git a/src/resolvers-cassandra/Settings/mutations.js b/src/resolvers-cassandra/Settings/mutations.js index ed159dd7..0cb42dd0 100644 --- a/src/resolvers-cassandra/Settings/mutations.js +++ b/src/resolvers-cassandra/Settings/mutations.js @@ -175,6 +175,74 @@ function createSite(args, res) { // eslint-disable-line no-unused-vars }); } +function addTrustedSources(args, res) { // eslint-disable-line no-unused-vars + return new Promise((resolve, reject) => { + if (!args || !args.input || !args.input.sources || !args.input.sources.length) { + //loggingClient.logNoKeywordsToAdd(); + return reject('No trustedsources to add specified.'); + } + + let mutations = []; + args.input.sources.forEach(source => { + mutations.push({ + query: `INSERT INTO fortis.trustedsources ( + pipelinekey, + externalsourceid, + sourcetype, + rank, + displayname, + insertiontime, + reportingcategory + ) VALUES (?,?,?,?,?,dateof(now()),?)`, + params: [ + source.pipelinekey, + source.externalsourceid, + source.sourcetype, + source.rank, + source.displayname, + source.reportingcategory + ] + }); + }); + + cassandraConnector.executeBatchMutations(mutations) + .then(_ => { // eslint-disable-line no-unused-vars + resolve({ + sources: args.input.sources + }); + }) + .catch(error => { + trackException(error); + reject(error); + }); + }); +} + +function removeTrustedSources(args, res) { // eslint-disable-line no-unused-vars + return new Promise((resolve, reject) => { + if (!args || !args.input || !args.input.sources || !args.input.sources.length) { + //loggingClient.logNoKeywordsToRemove(); + return reject('No trusted sources to remove specified.'); + } + + const mutations = args.input.sources.map(source => ({ + query: 'DELETE FROM fortis.trustedsources WHERE pipelinekey = ? AND externalsourceid = ? AND sourcetype = ? AND rank = ?', + params: [source.pipelinekey, source.externalsourceid, source.sourcetype, source.rank] + })); + + cassandraConnector.executeBatchMutations(mutations) + .then(_ => { // eslint-disable-line no-unused-vars + resolve({ + sources: args.input.sources + }); + }) + .catch(error => { + trackException(error); + reject(error); + }); + }); +} + function removeKeywords(args, res) { // eslint-disable-line no-unused-vars return new Promise((resolve, reject) => { if (!args || !args.input || !args.input.edges || !args.input.edges.length) { @@ -579,5 +647,7 @@ module.exports = { modifyTwitterAccounts: trackEvent(withRunTime(modifyTwitterAccounts), 'modifyTwitterAccounts'), removeTwitterAccounts: trackEvent(withRunTime(removeTwitterAccounts), 'removeTwitterAccounts'), modifyBlacklist: trackEvent(withRunTime(modifyBlacklist), 'modifyBlacklist'), - removeBlacklist: trackEvent(withRunTime(removeBlacklist), 'removeBlacklist') + removeBlacklist: trackEvent(withRunTime(removeBlacklist), 'removeBlacklist'), + addTrustedSources: trackEvent(withRunTime(addTrustedSources), 'addTrustedSources'), + removeTrustedSources: trackEvent(withRunTime(removeTrustedSources), 'removeTrustedSources') }; diff --git a/src/resolvers-cassandra/Settings/queries.js b/src/resolvers-cassandra/Settings/queries.js index 3deb5b51..550aadcd 100644 --- a/src/resolvers-cassandra/Settings/queries.js +++ b/src/resolvers-cassandra/Settings/queries.js @@ -43,9 +43,10 @@ function streams(args, res) { // eslint-disable-line no-unused-vars function trustedSources(args, res) { // eslint-disable-line no-unused-vars return new Promise((resolve, reject) => { - const query = 'SELECT * FROM fortis.trustedsources where pipelinekey IN ?'; + const pipelineKeysPlaceholder = getPlaceholderString(args.pipelinekeys); + const query = `SELECT * FROM fortis.trustedsources where pipelinekey IN (${pipelineKeysPlaceholder})`; const params = [ - args.pipelinekeys + ...args.pipelinekeys ]; cassandraConnector.executeQuery(query, params) @@ -57,6 +58,19 @@ function trustedSources(args, res) { // eslint-disable-line no-unused-vars }); } +function getPlaceholderString(items) { + if (!items || !items.length) return ''; + let placeholder = ''; + for (let i = 0; i < items.length; i++) { + if (i === items.length - 1) { + placeholder += '?'; + } else { + placeholder += '?,'; + } + } + return placeholder; +} + function cassandraRowToStream(row) { if (row.enabled == null) row.enabled = false; return { @@ -72,17 +86,19 @@ function cassandraRowToStream(row) { function cassandraRowToSource(row) { return { + rowKey: row.pipelinekey + ',' + row.externalsourceid + ',' + row.sourcetype + ',' + row.rank, externalsourceid: row.externalsourceid, sourcetype: row.sourcetype, displayname: row.displayname || row.externalsourceid, pipelinekey: row.pipelinekey, - rank: row.rank + rank: row.rank, + reportingcategory: row.reportingcategory }; } -function trustedSourceFilter(row, namequery) { - if (namequery) { - return row.externalsourceid.toLowerCase().indexOf(namequery.toLowerCase()) > -1; +function trustedSourceFilter(row, pipelineKey) { + if (pipelineKey) { + return row.pipelinekey.toLowerCase().indexOf(pipelineKey.toLowerCase()) > -1; } return true; diff --git a/src/schemas/SettingsSchema.js b/src/schemas/SettingsSchema.js index 266c1b17..b5755519 100644 --- a/src/schemas/SettingsSchema.js +++ b/src/schemas/SettingsSchema.js @@ -30,6 +30,8 @@ module.exports = graphql.buildSchema(` removeTrustedTwitterAccounts(input: TrustedTwitterAccountDefintion!): TrustedTwitterAccountCollection modifyBlacklist(input: BlacklistTermDefintion!): BlacklistCollection removeBlacklist(input: BlacklistTermDefintion!): BlacklistCollection + addTrustedSources(input: SourceListInput): SourceCollection + removeTrustedSources(input: SourceListInput): SourceCollection } type SiteProperties { @@ -84,12 +86,28 @@ module.exports = graphql.buildSchema(` sources: [Source]! } + input SourceListInput { + sources: [SourceInput]! + } + + input SourceInput { + rowKey: String, + externalsourceid: String!, + sourcetype: String!, + displayname: String, + pipelinekey: String!, + rank: String!, + reportingcategory: String + } + type Source { - externalsourceid: String - sourcetype: String - displayname: String - pipelinekey: String - rank: String + rowKey: String, + externalsourceid: String!, + sourcetype: String, + displayname: String, + pipelinekey: String, + rank: String, + reportingcategory: String } type Site { From b80375f706ed8e48d147c18cf3d36012c9405856 Mon Sep 17 00:00:00 2001 From: Smarker Date: Wed, 11 Oct 2017 14:40:26 -0400 Subject: [PATCH 2/7] Add restartPipeline service. --- src/resolvers-cassandra/Messages/index.js | 1 + src/resolvers-cassandra/Messages/mutations.js | 6 ++++++ src/schemas/MessageSchema.js | 1 + 3 files changed, 8 insertions(+) diff --git a/src/resolvers-cassandra/Messages/index.js b/src/resolvers-cassandra/Messages/index.js index 3ac14104..329192cf 100644 --- a/src/resolvers-cassandra/Messages/index.js +++ b/src/resolvers-cassandra/Messages/index.js @@ -5,6 +5,7 @@ const queries = require('./queries'); module.exports = { publishEvents: mutations.publishEvents, + restartPipeline: mutations.restartPipeline, byLocation: queries.byLocation, byBbox: queries.byBbox, diff --git a/src/resolvers-cassandra/Messages/mutations.js b/src/resolvers-cassandra/Messages/mutations.js index 05e299c8..2e90a773 100644 --- a/src/resolvers-cassandra/Messages/mutations.js +++ b/src/resolvers-cassandra/Messages/mutations.js @@ -1,12 +1,18 @@ 'use strict'; +const streamingController = require('../../clients/streaming/StreamingController'); const eventHubSender = require('../../clients/eventhub/EventHubSender'); const trackEvent = require('../../clients/appinsights/AppInsightsClient').trackEvent; +function restartPipeline(args, res) { // eslint-disable-line no-unused-vars + return streamingController.restartPipeline(); +} + function publishEvents(args, res) { // eslint-disable-line no-unused-vars return eventHubSender.sendMessages(args && args.input && args.input.messages); } module.exports = { + restartPipeline: trackEvent(restartPipeline, 'restartPipeline'), publishEvents: trackEvent(publishEvents, 'publishEvents') }; diff --git a/src/schemas/MessageSchema.js b/src/schemas/MessageSchema.js index 543378bf..b5a887e3 100644 --- a/src/schemas/MessageSchema.js +++ b/src/schemas/MessageSchema.js @@ -17,6 +17,7 @@ module.exports = graphql.buildSchema(` type Mutation { publishEvents(input: NewMessages): [String] + restartPipeline: Boolean } input NewMessages { From 73a167a97d5bcc20e05cdcc517cd054434ff801a Mon Sep 17 00:00:00 2001 From: Smarker Date: Wed, 11 Oct 2017 14:41:07 -0400 Subject: [PATCH 3/7] Add trustedSources and restartPipeline logging. --- src/clients/appinsights/LoggingClient.js | 28 +++++++++++++++++++- src/clients/streaming/StreamingController.js | 12 ++++++--- src/resolvers-cassandra/Settings/queries.js | 9 ++++--- 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/src/clients/appinsights/LoggingClient.js b/src/clients/appinsights/LoggingClient.js index 09d3a582..1fe452f3 100644 --- a/src/clients/appinsights/LoggingClient.js +++ b/src/clients/appinsights/LoggingClient.js @@ -29,6 +29,12 @@ function logExecuteQueryError() { }); } +function restartPipelineExtraProps() { + return () => ({ + success: 'true' + }); +} + function termsExtraProps() { return () => ({ operation: 'query', @@ -37,6 +43,23 @@ function termsExtraProps() { }); } +function trustedSourcesExtraProps() { + return () => ({ + operation: 'query', + table: 'trustedsources', + success: 'true' + }); +} + +function trustedSourcesExtraMetrics() { + return (graphqlResult) => { + const totalRows = graphqlResult.sources.length; + return { + totalRows + }; + }; +} + function addKeywordsExtraProps() { return () => ({ operation: 'modify', @@ -55,7 +78,7 @@ function removeKeywordsExtraProps() { function keywordsExtraMetrics() { return (graphqlResult) => { - const totalRows = graphqlResult.edges.length; //TODO + const totalRows = graphqlResult.edges.length; return { totalRows }; @@ -113,7 +136,10 @@ module.exports = { logCassandraClientUndefined, logNoMutationsDefined, logExecuteQueryError, + restartPipelineExtraProps, termsExtraProps, + trustedSourcesExtraProps, + trustedSourcesExtraMetrics, addKeywordsExtraProps, removeKeywordsExtraProps, keywordsExtraMetrics, diff --git a/src/clients/streaming/StreamingController.js b/src/clients/streaming/StreamingController.js index 9413d4f5..5139a2aa 100644 --- a/src/clients/streaming/StreamingController.js +++ b/src/clients/streaming/StreamingController.js @@ -2,12 +2,17 @@ const Promise = require('promise'); const azure = require('azure-sb'); -const trackDependency = require('../appinsights/AppInsightsClient').trackDependency; +const { trackEvent, trackDependency } = require('../appinsights/AppInsightsClient'); +const loggingClient = require('../appinsights/LoggingClient'); const SERVICE_BUS_CONNECTION_STRING = process.env.FORTIS_SB_CONN_STR; const SERVICE_BUS_CONFIG_QUEUE = process.env.FORTIS_SB_CONFIG_QUEUE || 'configuration'; const SERVICE_BUS_COMMAND_QUEUE = process.env.FORTIS_SB_COMMAND_QUEUE || 'command'; +function restartPipeline() { + return notifyUpdate(SERVICE_BUS_COMMAND_QUEUE, { 'dirty': 'streams' }); +} + function restartStreaming() { return notifyUpdate(SERVICE_BUS_COMMAND_QUEUE, { 'dirty': 'streams' }); } @@ -31,8 +36,8 @@ function notifyUpdate(queue, properties) { }; sendQueueMessage(queue, serviceBusMessage) - .then(resolve) - .catch(reject); + .then(resolve(true)) + .catch(reject(false)); }); } @@ -56,6 +61,7 @@ function sendQueueMessage(queue, serviceBusMessage) { } module.exports = { + restartPipeline: trackEvent(restartPipeline, 'restartPipeline', loggingClient.restartPipelineExtraProps()), restartStreaming: trackDependency(restartStreaming, 'ServiceBus', 'send'), notifyWatchlistUpdate: trackDependency(notifyWatchlistUpdate, 'ServiceBus', 'send'), notifyBlacklistUpdate: trackDependency(notifyBlacklistUpdate, 'ServiceBus', 'send'), diff --git a/src/resolvers-cassandra/Settings/queries.js b/src/resolvers-cassandra/Settings/queries.js index 550aadcd..008065a6 100644 --- a/src/resolvers-cassandra/Settings/queries.js +++ b/src/resolvers-cassandra/Settings/queries.js @@ -4,7 +4,7 @@ const Promise = require('promise'); const facebookAnalyticsClient = require('../../clients/facebook/FacebookAnalyticsClient'); const cassandraConnector = require('../../clients/cassandra/CassandraConnector'); const { withRunTime, getTermsByCategory, getSiteDefintion } = require('../shared'); -const trackEvent = require('../../clients/appinsights/AppInsightsClient').trackEvent; +const { trackException, trackEvent } = require('../../clients/appinsights/AppInsightsClient'); const loggingClient = require('../../clients/appinsights/LoggingClient'); const PIPELINE_KEY_TWITTER = 'twitter'; @@ -54,7 +54,10 @@ function trustedSources(args, res) { // eslint-disable-line no-unused-vars sources: rows.map(cassandraRowToSource) .filter(source => trustedSourceFilter(source, args.sourcename)) })) - .catch(reject); + .catch(error => { + trackException(error); + reject(error); + }); }); } @@ -220,7 +223,7 @@ module.exports = { sites: trackEvent(withRunTime(sites), 'sites'), streams: trackEvent(withRunTime(streams), 'streams'), siteTerms: trackEvent(withRunTime(terms), 'terms', loggingClient.termsExtraProps(), loggingClient.keywordsExtraMetrics()), - trustedSources: trackEvent(withRunTime(trustedSources), 'trustedsources'), + trustedSources: trackEvent(withRunTime(trustedSources), 'trustedsources', loggingClient.trustedSourcesExtraProps(), loggingClient.trustedSourcesExtraMetrics()), twitterAccounts: trackEvent(withRunTime(twitterAccounts), 'twitterAccounts'), trustedTwitterAccounts: trackEvent(withRunTime(trustedTwitterAccounts), 'trustedTwitterAccounts'), facebookPages: trackEvent(withRunTime(facebookPages), 'facebookPages'), From b018f916953ccace237a10047d97c794ea5624e2 Mon Sep 17 00:00:00 2001 From: Smarker Date: Mon, 16 Oct 2017 11:22:41 -0400 Subject: [PATCH 4/7] Small changes. --- src/clients/facebook/FacebookAnalyticsClient.js | 4 +++- src/resolvers-cassandra/Settings/queries.js | 14 ++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/clients/facebook/FacebookAnalyticsClient.js b/src/clients/facebook/FacebookAnalyticsClient.js index c04cba19..80ce3035 100644 --- a/src/clients/facebook/FacebookAnalyticsClient.js +++ b/src/clients/facebook/FacebookAnalyticsClient.js @@ -6,9 +6,10 @@ const trackDependency = require('../appinsights/AppInsightsClient').trackDepende const accessToken = process.env.FACEBOOK_AUTH_TOKEN; const apiUrlBase = process.env.FACEBOOK_API_HOST || 'https://graph.facebook.com'; +const facebookApiVersion = 'v2.9'; function buildFeedUri(pageId) { - return `${apiUrlBase}/v2.9/${pageId}/feed` + return `${apiUrlBase}/${facebookApiVersion}/${pageId}/feed` + `?access_token=${accessToken}` + '&format=json'; } @@ -37,6 +38,7 @@ function fetchPageLastUpdatedAt(pageId) { }); } + module.exports = { fetchPageLastUpdatedAt: trackDependency(fetchPageLastUpdatedAt, 'Facebook', 'pageLastUpdatedAt') }; \ No newline at end of file diff --git a/src/resolvers-cassandra/Settings/queries.js b/src/resolvers-cassandra/Settings/queries.js index 008065a6..216e8ffc 100644 --- a/src/resolvers-cassandra/Settings/queries.js +++ b/src/resolvers-cassandra/Settings/queries.js @@ -74,6 +74,12 @@ function getPlaceholderString(items) { return placeholder; } +function trustedSourceFilter(row, pipelineKey) { + if (pipelineKey) { + return row.pipelinekey.toLowerCase().indexOf(pipelineKey.toLowerCase()) > -1; + } +} + function cassandraRowToStream(row) { if (row.enabled == null) row.enabled = false; return { @@ -99,14 +105,6 @@ function cassandraRowToSource(row) { }; } -function trustedSourceFilter(row, pipelineKey) { - if (pipelineKey) { - return row.pipelinekey.toLowerCase().indexOf(pipelineKey.toLowerCase()) > -1; - } - - return true; -} - function paramsToParamsEntries(params) { const paramsEntries = []; for (const key of Object.keys(params)) { From 5e29c63791db32dc77bea166e39e7532a50f0c2e Mon Sep 17 00:00:00 2001 From: Smarker Date: Wed, 18 Oct 2017 10:45:54 -0400 Subject: [PATCH 5/7] Add restart pipeline logging. --- src/clients/streaming/StreamingController.js | 5 ++--- src/resolvers-cassandra/Messages/mutations.js | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/clients/streaming/StreamingController.js b/src/clients/streaming/StreamingController.js index 5139a2aa..8ea1c062 100644 --- a/src/clients/streaming/StreamingController.js +++ b/src/clients/streaming/StreamingController.js @@ -2,8 +2,7 @@ const Promise = require('promise'); const azure = require('azure-sb'); -const { trackEvent, trackDependency } = require('../appinsights/AppInsightsClient'); -const loggingClient = require('../appinsights/LoggingClient'); +const { trackDependency } = require('../appinsights/AppInsightsClient'); const SERVICE_BUS_CONNECTION_STRING = process.env.FORTIS_SB_CONN_STR; const SERVICE_BUS_CONFIG_QUEUE = process.env.FORTIS_SB_CONFIG_QUEUE || 'configuration'; @@ -61,7 +60,7 @@ function sendQueueMessage(queue, serviceBusMessage) { } module.exports = { - restartPipeline: trackEvent(restartPipeline, 'restartPipeline', loggingClient.restartPipelineExtraProps()), + restartPipeline: trackDependency(restartPipeline, 'ServiceBus', 'send'), restartStreaming: trackDependency(restartStreaming, 'ServiceBus', 'send'), notifyWatchlistUpdate: trackDependency(notifyWatchlistUpdate, 'ServiceBus', 'send'), notifyBlacklistUpdate: trackDependency(notifyBlacklistUpdate, 'ServiceBus', 'send'), diff --git a/src/resolvers-cassandra/Messages/mutations.js b/src/resolvers-cassandra/Messages/mutations.js index 2e90a773..3931b491 100644 --- a/src/resolvers-cassandra/Messages/mutations.js +++ b/src/resolvers-cassandra/Messages/mutations.js @@ -3,6 +3,7 @@ const streamingController = require('../../clients/streaming/StreamingController'); const eventHubSender = require('../../clients/eventhub/EventHubSender'); const trackEvent = require('../../clients/appinsights/AppInsightsClient').trackEvent; +const restartPipelineExtraProps = require('../../clients/appinsights/LoggingClient').restartPipelineExtraProps; function restartPipeline(args, res) { // eslint-disable-line no-unused-vars return streamingController.restartPipeline(); @@ -13,6 +14,6 @@ function publishEvents(args, res) { // eslint-disable-line no-unused-vars } module.exports = { - restartPipeline: trackEvent(restartPipeline, 'restartPipeline'), + restartPipeline: trackEvent(restartPipeline, 'restartPipeline', restartPipelineExtraProps()), publishEvents: trackEvent(publishEvents, 'publishEvents') }; From 9bffab006c4798525ea4ab345c5c57e326efcae2 Mon Sep 17 00:00:00 2001 From: Smarker Date: Wed, 18 Oct 2017 10:46:23 -0400 Subject: [PATCH 6/7] Add trustedsources logging. --- src/clients/appinsights/LoggingClient.js | 58 ++++++++++++++++++++---- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/src/clients/appinsights/LoggingClient.js b/src/clients/appinsights/LoggingClient.js index 5f969022..fa0b4919 100644 --- a/src/clients/appinsights/LoggingClient.js +++ b/src/clients/appinsights/LoggingClient.js @@ -51,14 +51,6 @@ function trustedSourcesExtraProps() { }); } -function streamsExtraProps() { - return () => ({ - operation: 'query', - table: 'streams', - success: 'true' - }); -} - function trustedSourcesExtraMetrics() { return (graphqlResult) => { const totalRows = graphqlResult.sources.length; @@ -68,6 +60,36 @@ function trustedSourcesExtraMetrics() { }; } +function logNoTrustedSourcesToAdd() { + trackSyncEvent('cassandra', { + client: constants.CLIENTS.cassandra, + operation: 'modify', + table: 'trustedsources', + success: false + }, { + numToMutate: 0 + }); +} + +function logNoTrustedSourcesToRemove() { + trackSyncEvent('cassandra', { + client: constants.CLIENTS.cassandra, + operation: 'remove', + table: 'trustedsources', + success: false + }, { + numToMutate: 0 + }); +} + +function streamsExtraProps() { + return () => ({ + operation: 'query', + table: 'streams', + success: 'true' + }); +} + function modifyStreamsExtraProps() { return () => ({ operation: 'modify', @@ -85,6 +107,22 @@ function streamsExtraMetrics() { }; } +function addTrustedSourcesExtraProps() { + return () => ({ + operation: 'modify', + table: 'trustedsources', + success: 'true' + }); +} + +function removeTrustedSourcesExtraProps() { + return () => ({ + operation: 'remove', + table: 'trustedsources', + success: 'true' + }); +} + function addKeywordsExtraProps() { return () => ({ operation: 'modify', @@ -176,9 +214,13 @@ module.exports = { termsExtraProps, trustedSourcesExtraProps, trustedSourcesExtraMetrics, + logNoTrustedSourcesToAdd, + logNoTrustedSourcesToRemove, streamsExtraProps, modifyStreamsExtraProps, streamsExtraMetrics, + addTrustedSourcesExtraProps, + removeTrustedSourcesExtraProps, addKeywordsExtraProps, removeKeywordsExtraProps, keywordsExtraMetrics, From 9e7d3205489b00d07df1b5617175e4f8e725f70e Mon Sep 17 00:00:00 2001 From: Smarker Date: Wed, 18 Oct 2017 10:46:49 -0400 Subject: [PATCH 7/7] Remove unused fb, twitter, trusted twitter services. --- src/resolvers-cassandra/Settings/index.js | 10 - src/resolvers-cassandra/Settings/mutations.js | 185 +----------------- src/resolvers-cassandra/Settings/queries.js | 117 +---------- src/schemas/SettingsSchema.js | 2 +- 4 files changed, 8 insertions(+), 306 deletions(-) diff --git a/src/resolvers-cassandra/Settings/index.js b/src/resolvers-cassandra/Settings/index.js index aa6aee95..c86f95ba 100644 --- a/src/resolvers-cassandra/Settings/index.js +++ b/src/resolvers-cassandra/Settings/index.js @@ -8,12 +8,6 @@ module.exports = { removeSite: mutations.removeSite, editSite: mutations.editSite, modifyStreams: mutations.modifyStreams, - modifyFacebookPages: mutations.modifyFacebookPages, - removeFacebookPages: mutations.removeFacebookPages, - modifyTrustedTwitterAccounts: mutations.modifyTrustedTwitterAccounts, - removeTrustedTwitterAccounts: mutations.removeTrustedTwitterAccounts, - modifyTwitterAccounts: mutations.modifyTwitterAccounts, - removeTwitterAccounts: mutations.removeTwitterAccounts, modifyBlacklist: mutations.modifyBlacklist, removeBlacklist: mutations.removeBlacklist, removeKeywords: mutations.removeKeywords, @@ -25,9 +19,5 @@ module.exports = { sites: queries.sites, trustedSources: queries.trustedSources, streams: queries.streams, - twitterAccounts: queries.twitterAccounts, - trustedTwitterAccounts: queries.trustedTwitterAccounts, - facebookPages: queries.facebookPages, - facebookAnalytics: queries.facebookAnalytics, termBlacklist: queries.termBlacklist }; diff --git a/src/resolvers-cassandra/Settings/mutations.js b/src/resolvers-cassandra/Settings/mutations.js index ca50bb80..684df455 100644 --- a/src/resolvers-cassandra/Settings/mutations.js +++ b/src/resolvers-cassandra/Settings/mutations.js @@ -9,12 +9,6 @@ const { withRunTime, limitForInClause } = require('../shared'); const { trackEvent, trackException } = require('../../clients/appinsights/AppInsightsClient'); const loggingClient = require('../../clients/appinsights/LoggingClient'); const apiUrlBase = process.env.FORTIS_CENTRAL_ASSETS_HOST || 'https://fortiscentral.blob.core.windows.net'; -const STREAM_PIPELINE_TWITTER = 'twitter'; -const STREAM_CONNECTOR_TWITTER = 'Twitter'; - -const TRUSTED_SOURCES_CONNECTOR_TWITTER = 'Twitter'; -const TRUSTED_SOURCES_CONNECTOR_FACEBOOK = 'FacebookPage'; -const TRUSTED_SOURCES_RANK_DEFAULT = 10; function createOrReplaceSite(args, res) { // eslint-disable-line no-unused-vars return new Promise((resolve, reject) => { // eslint-disable-line no-unused-vars @@ -178,7 +172,7 @@ function createSite(args, res) { // eslint-disable-line no-unused-vars function addTrustedSources(args, res) { // eslint-disable-line no-unused-vars return new Promise((resolve, reject) => { if (!args || !args.input || !args.input.sources || !args.input.sources.length) { - //loggingClient.logNoKeywordsToAdd(); + loggingClient.logNoTrustedSourcesToAdd(); return reject('No trustedsources to add specified.'); } @@ -221,7 +215,7 @@ function addTrustedSources(args, res) { // eslint-disable-line no-unused-vars function removeTrustedSources(args, res) { // eslint-disable-line no-unused-vars return new Promise((resolve, reject) => { if (!args || !args.input || !args.input.sources || !args.input.sources.length) { - //loggingClient.logNoKeywordsToRemove(); + loggingClient.logNoTrustedSourcesToRemove(); return reject('No trusted sources to remove specified.'); } @@ -384,171 +378,6 @@ function modifyStreams(args, res) { // eslint-disable-line no-unused-vars }); } -function facebookPagePrimaryKeyValuesToRowKey(values) { - return [TRUSTED_SOURCES_CONNECTOR_FACEBOOK, values[1], values[2]]; -} - -function facebookPageRowKeyToPrimaryKey(page) { - const params = page.RowKey.split(','); - if (params.length != 3) throw('Expecting three element comma-delimited RowKey representing (connector, sourceid, sourcetype).'); - return facebookPagePrimaryKeyValuesToRowKey(params); -} - -function normalizedFacebookPage(primaryKeyValues) { - return { - RowKey: facebookPagePrimaryKeyValuesToRowKey(primaryKeyValues), - pageUrl: primaryKeyValues[1] - }; -} - -function modifyFacebookPages(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - const pages = args && args.input && args.input.pages; - if (!pages || !pages.length) return reject('No pages specified'); - - const invalidPages = pages.filter(page => !page.pageUrl); - if (invalidPages.length > 0) return reject(`pageUrl required for ${JSON.stringify(invalidPages)}`); - - const mutations = []; - const expectedRecords = []; - pages.forEach(page => { - const isUpdate = page.RowKey && page.pageUrl != facebookPageRowKeyToPrimaryKey(page)[1]; - if (isUpdate) { - mutations.push({ - query: 'DELETE FROM fortis.trustedsources WHERE connector = ? AND sourceid = ? AND sourcetype = ?', - params: facebookPageRowKeyToPrimaryKey(page) - }); - } - - const params = facebookPagePrimaryKeyValuesToRowKey([TRUSTED_SOURCES_CONNECTOR_FACEBOOK, page.pageUrl, 'FacebookPost']); - params.push(TRUSTED_SOURCES_RANK_DEFAULT); - mutations.push({ - query: 'INSERT INTO fortis.trustedsources (connector, sourceid, sourcetype, insertiontime, rank) VALUES (?, ?, ?, dateof(now()), ?)', - params: params - }); - expectedRecords.push(normalizedFacebookPage(params)); - }); - - cassandraConnector.executeBatchMutations(mutations) - .then(() => resolve({ pages: expectedRecords })) - .catch(reject); - }); -} - -function removeFacebookPages(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - const pages = args && args.input && args.input.pages; - if (!pages || !pages.length) return reject('No pages specified'); - - const invalidPages = pages.filter(page => !page.RowKey); - if (invalidPages.length > 0) return reject(`RowKey required for ${JSON.stringify(invalidPages)}`); - - const mutations = pages.map(page => ({ - query: 'DELETE FROM fortis.trustedsources WHERE connector = ? AND sourceid = ? AND sourcetype = ?', - params: facebookPageRowKeyToPrimaryKey(page) - })); - - cassandraConnector.executeBatchMutations(mutations) - .then(() => resolve({ pages: pages })) - .catch(reject); - }); -} - -function trustedTwitterAccountRowKeyToPrimaryKey(account) { - const params = account.RowKey.split(','); - if (params.length != 3) throw('Expecting three element comma-delimited RowKey representing (connector, sourceid, sourcetype).'); - return trustedTwitterAccountPrimaryKeyValuesToRowKey(params); -} - -function trustedTwitterAccountPrimaryKeyValuesToRowKey(values) { - return [ TRUSTED_SOURCES_CONNECTOR_TWITTER, values[1], values[2] ]; -} - -function normalizedTrustedTwitterAccount(account) { - const keyValues = trustedTwitterAccountRowKeyToPrimaryKey(account); - return { - RowKey: trustedTwitterAccountPrimaryKeyValuesToRowKey(keyValues), - acctUrl: keyValues[1] - }; -} - -function modifyTrustedTwitterAccounts(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - const accounts = args && args.input && args.input.accounts; - if (!accounts || !accounts.length) return reject('No accounts specified'); - - const statement = 'INSERT INTO fortis.trustedsources (connector, sourceid, sourcetype, insertiontime, rank) VALUES (?, ?, ?, dateof(now()), ?)'; - const queries = accounts.map(account => { - const params = trustedTwitterAccountRowKeyToPrimaryKey(account); - params.push(TRUSTED_SOURCES_RANK_DEFAULT); - - return {query: statement, params: params}; - }); - - cassandraConnector.executeBatchMutations(queries) - .then(() => resolve({ accounts: accounts.map(normalizedTrustedTwitterAccount) })) - .catch(reject); - }); -} - -function removeTrustedTwitterAccounts(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - const accounts = args && args.input && args.input.accounts; - if (!accounts || !accounts.length) return reject('No accounts specified'); - - const deleteByPrimaryKey = 'DELETE FROM fortis.trustedsources WHERE connector = ? AND sourceid = ? AND sourcetype = ?'; - const queries = accounts.map(trustedTwitterAccountRowKeyToPrimaryKey).map(params => ({query: deleteByPrimaryKey, params})); - - cassandraConnector.executeBatchMutations(queries) - .then(() => resolve({ accounts: accounts.map(normalizedTrustedTwitterAccount) })) - .catch(reject); - }); -} - -function modifyTwitterAccounts(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - const accounts = args && args.input && args.input.accounts; - if (!accounts || !accounts.length) return reject('No accounts specified'); - - const updateStatement = 'UPDATE fortis.streams set connector = ?, params = ? WHERE pipeline= ? AND streamid = ?'; - const insertStatement = 'INSERT INTO fortis.streams (pipeline, streamid, connector, params) VALUES (?, ?, ?, ?)'; - const queries = []; - const expectedRecords = []; - accounts.forEach( account => { - // TODO: Arrive at a consensus as to what a canonical account should be in order to create a proper copy of the incoming record. - const updatedAccount = account; - if (account.RowKey) { - queries.push({ query: updateStatement, params: [STREAM_CONNECTOR_TWITTER, account, STREAM_PIPELINE_TWITTER, account.RowKey] }); - } else { - updatedAccount.RowKey = uuid(); - queries.push({ query: insertStatement, params: [STREAM_PIPELINE_TWITTER, updatedAccount.RowKey, STREAM_CONNECTOR_TWITTER, account] }); - } - expectedRecords.push(updatedAccount); - }); - - cassandraConnector.executeBatchMutations(queries) - .then(() => resolve({ accounts: expectedRecords })) - .catch(reject); - }); -} - -function removeTwitterAccounts(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - const accounts = args && args.input && args.input.accounts; - if (!accounts || !accounts.length) return reject('No accounts specified'); - - const invalidAccounts = accounts.filter(account=>!account.RowKey); - if (invalidAccounts.length > 0) return reject(`RowKey required for ${JSON.stringify(invalidAccounts)}`); - - const statement = 'DELETE FROM fortis.streams WHERE streamid = ?'; - const queries = accounts.map(account => ({query: statement, params: [account.RowKey]})); - - cassandraConnector.executeBatchMutations(queries) - .then(() => resolve({ accounts })) - .catch(reject); - }); -} - function modifyBlacklist(args, res) { // eslint-disable-line no-unused-vars return new Promise((resolve, reject) => { const termFilters = args && args.input && args.input.filters; @@ -619,14 +448,8 @@ module.exports = { removeKeywords: trackEvent(withRunTime(removeKeywords), 'removeKeywords', loggingClient.removeKeywordsExtraProps(), loggingClient.keywordsExtraMetrics()), addKeywords: trackEvent(withRunTime(addKeywords), 'addKeywords', loggingClient.addKeywordsExtraProps(), loggingClient.keywordsExtraMetrics()), editSite: trackEvent(withRunTime(editSite), 'editSite'), - modifyFacebookPages: trackEvent(withRunTime(modifyFacebookPages), 'modifyFacebookPages'), - removeFacebookPages: trackEvent(withRunTime(removeFacebookPages), 'removeFacebookPages'), - modifyTrustedTwitterAccounts: trackEvent(withRunTime(modifyTrustedTwitterAccounts), 'modifyTrustedTwitterAccounts'), - removeTrustedTwitterAccounts: trackEvent(withRunTime(removeTrustedTwitterAccounts), 'removeTrustedTwitterAccounts'), - modifyTwitterAccounts: trackEvent(withRunTime(modifyTwitterAccounts), 'modifyTwitterAccounts'), - removeTwitterAccounts: trackEvent(withRunTime(removeTwitterAccounts), 'removeTwitterAccounts'), modifyBlacklist: trackEvent(withRunTime(modifyBlacklist), 'modifyBlacklist'), removeBlacklist: trackEvent(withRunTime(removeBlacklist), 'removeBlacklist'), - addTrustedSources: trackEvent(withRunTime(addTrustedSources), 'addTrustedSources'), - removeTrustedSources: trackEvent(withRunTime(removeTrustedSources), 'removeTrustedSources') + addTrustedSources: trackEvent(withRunTime(addTrustedSources), 'addTrustedSources', loggingClient.addTrustedSourcesExtraProps(), loggingClient.trustedSourcesExtraMetrics()), + removeTrustedSources: trackEvent(withRunTime(removeTrustedSources), 'removeTrustedSources', loggingClient.removeTrustedSourcesExtraProps(), loggingClient.trustedSourcesExtraMetrics()) }; diff --git a/src/resolvers-cassandra/Settings/queries.js b/src/resolvers-cassandra/Settings/queries.js index 02fb2da3..f6795a23 100644 --- a/src/resolvers-cassandra/Settings/queries.js +++ b/src/resolvers-cassandra/Settings/queries.js @@ -1,15 +1,11 @@ 'use strict'; const Promise = require('promise'); -const facebookAnalyticsClient = require('../../clients/facebook/FacebookAnalyticsClient'); const cassandraConnector = require('../../clients/cassandra/CassandraConnector'); const { withRunTime, getTermsByCategory, getSiteDefintion } = require('../shared'); const { trackException, trackEvent } = require('../../clients/appinsights/AppInsightsClient'); const loggingClient = require('../../clients/appinsights/LoggingClient'); -const PIPELINE_KEY_TWITTER = 'twitter'; -const CONNECTOR_FACEBOOK = 'Facebook'; - function terms(args, res) { // eslint-disable-line no-unused-vars return new Promise((resolve, reject) => { const { translationLanguage, category } = args; @@ -50,16 +46,12 @@ function streams(args, res) { // eslint-disable-line no-unused-vars function trustedSources(args, res) { // eslint-disable-line no-unused-vars return new Promise((resolve, reject) => { - const pipelineKeysPlaceholder = getPlaceholderString(args.pipelinekeys); - const query = `SELECT * FROM fortis.trustedsources where pipelinekey IN (${pipelineKeysPlaceholder})`; - const params = [ - ...args.pipelinekeys - ]; + const query = 'SELECT * FROM fortis.trustedsources'; + const params = []; cassandraConnector.executeQuery(query, params) .then(rows => resolve({ sources: rows.map(cassandraRowToSource) - .filter(source => trustedSourceFilter(source, args.sourcename)) })) .catch(error => { trackException(error); @@ -68,25 +60,6 @@ function trustedSources(args, res) { // eslint-disable-line no-unused-vars }); } -function getPlaceholderString(items) { - if (!items || !items.length) return ''; - let placeholder = ''; - for (let i = 0; i < items.length; i++) { - if (i === items.length - 1) { - placeholder += '?'; - } else { - placeholder += '?,'; - } - } - return placeholder; -} - -function trustedSourceFilter(row, pipelineKey) { - if (pipelineKey) { - return row.pipelinekey.toLowerCase().indexOf(pipelineKey.toLowerCase()) > -1; - } -} - function cassandraRowToStream(row) { if (row.enabled == null) row.enabled = false; return { @@ -125,86 +98,6 @@ function paramsToParamsEntries(params) { return paramsEntries; } -function cassandraRowToTwitterAccount(row) { - return { - userIds: row.params.userIds, - consumerKey: row.params.consumerKey, - consumerSecret: row.params.consumerSecret, - accessToken: row.params.accessToken, - accessTokenSecret: row.params.accessTokenSecret - }; -} - -function twitterAccounts(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - const sourcesByPipelineKey = 'SELECT params FROM fortis.streams WHERE pipelinekey = ?'; - cassandraConnector.executeQuery(sourcesByPipelineKey, [PIPELINE_KEY_TWITTER]) - .then(result => { - const accounts = result.map(cassandraRowToTwitterAccount); - resolve({ accounts: accounts }); - }) - .catch(reject) - ; - }); -} - -function cassandraRowToTrustedTwitterAccount(row) { - return { - RowKey: `${row.connector},${row.sourceid},${row.sourcetype}`, - acctUrl: row.sourceid - }; -} - -function trustedTwitterAccounts(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - const sourcesByConnector = 'SELECT connector, sourceid, sourcetype FROM fortis.trustedsources WHERE pipelinekey = ? ALLOW FILTERING'; - cassandraConnector.executeQuery(sourcesByConnector, [PIPELINE_KEY_TWITTER]) - .then(rows => { - const accounts = rows.map(cassandraRowToTrustedTwitterAccount); - resolve({ accounts: accounts }); - }) - .catch(reject) - ; - }); -} - -function cassandraRowToFacebookPage(row) { - return { - RowKey: `${row.connector},${row.sourceid},${row.sourcetype}`, - pageUrl: row.sourceid - }; -} - -function facebookPages(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - const sourcesByConnector = 'SELECT connector, sourceid, sourcetype FROM fortis.trustedsources WHERE connector = ? ALLOW FILTERING'; - cassandraConnector.executeQuery(sourcesByConnector, [CONNECTOR_FACEBOOK]) - .then(rows => { - const pages = rows.map(cassandraRowToFacebookPage); - resolve({ pages: pages }); - }) - .catch(reject) - ; - }); -} - -function facebookPageToId(page) { - const match = page && page.pageUrl && page.pageUrl.match(/facebook.com\/([^/]+)/); - return match && match.length >= 1 && match[1]; -} - -function facebookAnalytics(args, res) { // eslint-disable-line no-unused-vars - return new Promise((resolve, reject) => { - facebookPages({ siteId: args.siteId }) - .then(response => { - const pageIds = response.pages.map(facebookPageToId).filter(pageId => !!pageId); - Promise.all(pageIds.map(pageId => ({ Name: pageId, LastUpdated: facebookAnalyticsClient.fetchPageLastUpdatedAt(pageId), Count: -1 }))) - .then(analytics => resolve({ analytics })) - .catch(reject); - }); - }); -} - function cassandraRowToTermFilter(row) { return { id: row.id, @@ -228,10 +121,6 @@ module.exports = { sites: trackEvent(withRunTime(sites), 'sites'), streams: trackEvent(withRunTime(streams), 'streams', loggingClient.streamsExtraProps(), loggingClient.streamsExtraMetrics()), siteTerms: trackEvent(withRunTime(terms), 'terms', loggingClient.termsExtraProps(), loggingClient.keywordsExtraMetrics()), - trustedSources: trackEvent(withRunTime(trustedSources), 'trustedsources', loggingClient.trustedSourcesExtraProps(), loggingClient.trustedSourcesExtraMetrics()), - twitterAccounts: trackEvent(withRunTime(twitterAccounts), 'twitterAccounts'), - trustedTwitterAccounts: trackEvent(withRunTime(trustedTwitterAccounts), 'trustedTwitterAccounts'), - facebookPages: trackEvent(withRunTime(facebookPages), 'facebookPages'), - facebookAnalytics: trackEvent(facebookAnalytics, 'facebookAnalytics'), + trustedSources: trackEvent(withRunTime(trustedSources), 'trustedSources', loggingClient.trustedSourcesExtraProps(), loggingClient.trustedSourcesExtraMetrics()), termBlacklist: trackEvent(withRunTime(termBlacklist), 'termBlacklist') }; \ No newline at end of file diff --git a/src/schemas/SettingsSchema.js b/src/schemas/SettingsSchema.js index 7ac50962..938fdc8a 100644 --- a/src/schemas/SettingsSchema.js +++ b/src/schemas/SettingsSchema.js @@ -4,7 +4,7 @@ module.exports = graphql.buildSchema(` type Query { sites: SiteCollection streams: StreamCollection - trustedSources(pipelinekeys: [String]!, sourcename: String): SourceCollection + trustedSources: SourceCollection siteTerms(translationLanguage: String, category: String): TermCollection twitterAccounts: TwitterAccountCollection trustedTwitterAccounts(siteId: String!): TrustedTwitterAccountCollection