Skip to content

Commit

Permalink
Improve: Support search and adding federated users through regular en…
Browse files Browse the repository at this point in the history
…dpoints (#13936)
  • Loading branch information
alansikora authored and sampaiodiego committed Mar 28, 2019
1 parent be8ae64 commit 65b5be3
Show file tree
Hide file tree
Showing 25 changed files with 330 additions and 357 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import { callbacks } from '../../callbacks';
import { settings } from '../../settings';
import { FederationEvents, FederationKeys, Messages, Rooms, Subscriptions, Users } from '../../models';

import { Federation } from '.';
import peerDNS from './peerDNS';
import peerHTTP from './peerHTTP';
import { updateStatus } from './settingsUpdater';
import { logger } from './logger';
import { FederatedMessage, FederatedRoom, FederatedUser } from './federatedResources';
import { Federation } from './';

class PeerClient {
export class PeerClient {
constructor() {
this.config = {};

Expand Down Expand Up @@ -69,7 +67,7 @@ class PeerClient {
if (this.config.hub.active) {
updateStatus('Registering with Hub...');

return peerDNS.register(this.peer);
return Federation.peerDNS.register(this.peer);
}

return true;
Expand Down Expand Up @@ -143,7 +141,7 @@ class PeerClient {

const { peer: domain } = e;

const peer = peerDNS.searchPeer(domain);
const peer = Federation.peerDNS.searchPeer(domain);

if (!peer || !peer.public_key) {
this.log(`Could not find valid peer:${ domain }`);
Expand All @@ -159,7 +157,7 @@ class PeerClient {
// Encrypt with the local private key
payload = Federation.privateKey.encryptPrivate(payload);

peerHTTP.request(peer, 'POST', '/api/v1/federation.events', { payload }, { total: 5, stepSize: 500, stepMultiplier: 10 });
Federation.peerHTTP.request(peer, 'POST', '/api/v1/federation.events', { payload }, { total: 5, stepSize: 500, stepMultiplier: 10 });

FederationEvents.setEventAsFullfilled(e);
} catch (err) {
Expand Down Expand Up @@ -242,22 +240,22 @@ class PeerClient {
// Users
//
// #####
findUsers(email, options = {}) {
const [username, domain] = email.split('@');
findUsers(identifier, options = {}) {
const [username, domain] = identifier.split('@');

const { peer: { domain: localPeerDomain } } = this;

let peer = null;

try {
peer = peerDNS.searchPeer(options.domainOverride || domain);
peer = Federation.peerDNS.searchPeer(options.domainOverride || domain);
} catch (err) {
this.log(`Could not find peer using domain:${ domain }`);
throw new Meteor.Error('federation-peer-does-not-exist', `Could not find peer using domain:${ domain }`);
}

try {
const { data: { federatedUsers: remoteFederatedUsers } } = peerHTTP.request(peer, 'GET', `/api/v1/federation.users?${ qs.stringify({ username, domain, emailOnly: options.emailOnly }) }`);
const { data: { federatedUsers: remoteFederatedUsers } } = Federation.peerHTTP.request(peer, 'GET', `/api/v1/federation.users?${ qs.stringify({ username, domain, usernameOnly: options.usernameOnly }) }`);

const federatedUsers = [];

Expand All @@ -268,7 +266,7 @@ class PeerClient {
return federatedUsers;
} catch (err) {
this.log(`Could not find user:${ username } at ${ peer.domain }`);
throw new Meteor.Error('federation-user-does-not-exist', `Could not find user:${ email } at ${ peer.domain }`);
throw new Meteor.Error('federation-user-does-not-exist', `Could not find user:${ identifier } at ${ peer.domain }`);
}
}

Expand All @@ -283,13 +281,13 @@ class PeerClient {
let peer = null;

try {
peer = peerDNS.searchPeer(domain);
peer = Federation.peerDNS.searchPeer(domain);
} catch (err) {
this.log(`Could not find peer using domain:${ domain }`);
throw new Meteor.Error('federation-peer-does-not-exist', `Could not find peer using domain:${ domain }`);
}

const { data: { upload, buffer } } = peerHTTP.request(peer, 'GET', `/api/v1/federation.uploads?${ qs.stringify({ upload_id: fileId }) }`);
const { data: { upload, buffer } } = Federation.peerHTTP.request(peer, 'GET', `/api/v1/federation.uploads?${ qs.stringify({ upload_id: fileId }) }`);

return { upload, buffer: Buffer.from(buffer) };
}
Expand Down Expand Up @@ -611,5 +609,3 @@ class PeerClient {
FederationEvents.userUnmuted(federatedRoom, federatedUnmutedUser, federatedUserWhoUnmuted, { skipPeers: [localPeerDomain] });
}
}

export default new PeerClient();
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { Meteor } from 'meteor/meteor';
import { FederationDNSCache } from '../../models';

import { logger } from './logger';
import peerHTTP from './peerHTTP';
import { updateStatus } from './settingsUpdater';
import { Federation } from './';

const dnsResolveSRV = Meteor.wrapAsync(dns.resolveSrv);
const dnsResolveTXT = Meteor.wrapAsync(dns.resolveTxt);

class PeerDNS {
export class PeerDNS {
constructor() {
this.config = {};
}
Expand Down Expand Up @@ -44,7 +44,7 @@ class PeerDNS {

// Attempt to register peer
try {
peerHTTP.request(this.HubPeer, 'POST', '/api/v1/peers', { uniqueId, domain, url, public_key }, { total: 5, stepSize: 1000, tryToUpdateDNS: false }, headers);
Federation.peerHTTP.request(this.HubPeer, 'POST', '/api/v1/peers', { uniqueId, domain, url, public_key }, { total: 5, stepSize: 1000, tryToUpdateDNS: false }, headers);

this.log('Peer registered!');

Expand Down Expand Up @@ -114,7 +114,7 @@ class PeerDNS {
this.log(`getPeerUsingHub: ${ domain }`);

// If there is no DNS entry for that, get from the Hub
const { data: { peer } } = peerHTTP.simpleRequest(this.HubPeer, 'GET', `/api/v1/peers?search=${ domain }`);
const { data: { peer } } = Federation.peerHTTP.simpleRequest(this.HubPeer, 'GET', `/api/v1/peers?search=${ domain }`);

return peer;
}
Expand Down Expand Up @@ -169,5 +169,3 @@ class PeerDNS {
}
}
}

export default new PeerDNS();
100 changes: 100 additions & 0 deletions app/federation/server/PeerHTTP/PeerHTTP.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { Meteor } from 'meteor/meteor';
import { HTTP } from 'meteor/http';

import { logger } from '../logger';
import { Federation } from '../';

import { skipRetryOnSpecificError, delay } from './utils';

export class PeerHTTP {
constructor() {
this.config = {};
}

setConfig(config) {
// General
this.config = config;
}

log(message) {
logger.http.info(message);
}

//
// Direct request
simpleRequest(peer, method, uri, body, headers) {
this.log(`Request: ${ method } ${ uri }`);

const { url: serverBaseURL } = peer;

const url = `${ serverBaseURL }${ uri }`;

let data = null;

if (method === 'POST' || method === 'PUT') {
data = body;
}

this.log(`Sending request: ${ method } - ${ uri }`);

return HTTP.call(method, url, { data, timeout: 2000, headers: { ...headers, 'x-federation-domain': this.config.peer.domain } });
}

//
// Request trying to find DNS entries
request(peer, method, uri, body, retryInfo = {}, headers = {}) {
// Normalize retry info
retryInfo = {
total: retryInfo.total || 1,
stepSize: retryInfo.stepSize || 100,
stepMultiplier: retryInfo.stepMultiplier || 1,
tryToUpdateDNS: retryInfo.tryToUpdateDNS === undefined ? true : retryInfo.tryToUpdateDNS,
DNSUpdated: false,
};

for (let i = 0; i <= retryInfo.total; i++) {
try {
return this.simpleRequest(peer, method, uri, body, headers);
} catch (err) {
try {
if (retryInfo.tryToUpdateDNS && !retryInfo.DNSUpdated) {
i--;

retryInfo.DNSUpdated = true;

this.log(`Trying to update local DNS cache for peer:${ peer.domain }`);

peer = Federation.peerDNS.updatePeerDNS(peer.domain);

continue;
}
} catch (err) {
if (err.response && err.response.statusCode === 404) {
throw new Meteor.Error('federation-peer-does-not-exist', 'Peer does not exist');
}
}

// Check if we need to skip due to specific error
if (skipRetryOnSpecificError(err)) {
this.log('Retry: skipping due to specific error');

throw err;
}

if (i === retryInfo.total - 1) {
// Throw the error, as we could not fulfill the request
this.log('Retry: could not fulfill the request');

throw err;
}

const timeToRetry = retryInfo.stepSize * (i + 1) * retryInfo.stepMultiplier;

this.log(`Trying again in ${ timeToRetry / 1000 }s: ${ method } - ${ uri }`);

// Otherwise, wait and try again
delay(timeToRetry);
}
}
}
}
1 change: 1 addition & 0 deletions app/federation/server/PeerHTTP/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { PeerHTTP } from './PeerHTTP';
19 changes: 19 additions & 0 deletions app/federation/server/PeerHTTP/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Meteor } from 'meteor/meteor';

// Should skip the retry if the error is one of the below?
const errorsToSkipRetrying = [
'error-app-prevented-sending',
];

export function skipRetryOnSpecificError(err) {
err = err && err.response && err.response.data;
return errorsToSkipRetrying.includes(err && err.errorType);
}

// Delay method to wait a little bit before retrying
export const delay = Meteor.wrapAsync(function(ms, callback) {
Meteor.setTimeout(function() {
callback(null);
}, ms);
});

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import { Rooms, Subscriptions } from '../../../models';

import { FederatedMessage, FederatedRoom, FederatedUser } from '../federatedResources';
import { logger } from '../logger.js';
import peerClient from '../peerClient';
import { Federation } from '../';

class PeerServer {
export class PeerServer {
constructor() {
this.config = {};
this.enabled = false;
Expand Down Expand Up @@ -75,7 +75,7 @@ class PeerServer {
federatedRoom.createUsers();

// Then, create the room, if needed
federatedRoom.create();
federatedRoom.create(true);
}

handleUserJoinedEvent(e) {
Expand All @@ -93,7 +93,7 @@ class PeerServer {
const localUser = federatedUser.create();

// Callback management
peerClient.addCallbackToSkip('afterAddedToRoom', federatedUser.getFederationId());
Federation.peerClient.addCallbackToSkip('afterAddedToRoom', federatedUser.getFederationId());

// Add the user to the room
addUserToRoom(federatedRoom.room._id, localUser, null, false);
Expand Down Expand Up @@ -129,7 +129,7 @@ class PeerServer {
const localUser = federatedUser.create();

// Callback management
peerClient.addCallbackToSkip('afterAddedToRoom', federatedUser.getFederationId());
Federation.peerClient.addCallbackToSkip('afterAddedToRoom', federatedUser.getFederationId());

// Add the user to the room
addUserToRoom(federatedRoom.room._id, localUser, localInviter, false);
Expand All @@ -156,7 +156,7 @@ class PeerServer {
const localUser = federatedUser.getLocalUser();

// Callback management
peerClient.addCallbackToSkip('beforeLeaveRoom', federatedUser.getFederationId());
Federation.peerClient.addCallbackToSkip('beforeLeaveRoom', federatedUser.getFederationId());

// Remove the user from the room
removeUserFromRoom(federatedRoom.room._id, localUser);
Expand Down Expand Up @@ -187,7 +187,7 @@ class PeerServer {
const localUserWhoRemoved = federatedUserWhoRemoved.getLocalUser();

// Callback management
peerClient.addCallbackToSkip('beforeRemoveFromRoom', federatedUser.getFederationId());
Federation.peerClient.addCallbackToSkip('beforeRemoveFromRoom', federatedUser.getFederationId());

// Remove the user from the room
removeUserFromRoom(federatedRoom.room._id, localUser, { byUser: localUserWhoRemoved });
Expand Down Expand Up @@ -260,7 +260,7 @@ class PeerServer {
const federatedMessage = new FederatedMessage(localPeerDomain, message);

// Callback management
peerClient.addCallbackToSkip('afterSaveMessage', federatedMessage.getFederationId());
Federation.peerClient.addCallbackToSkip('afterSaveMessage', federatedMessage.getFederationId());

// Create the federated message
federatedMessage.create();
Expand All @@ -280,7 +280,7 @@ class PeerServer {
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id);

// Callback management
peerClient.addCallbackToSkip('afterSaveMessage', federatedMessage.getFederationId());
Federation.peerClient.addCallbackToSkip('afterSaveMessage', federatedMessage.getFederationId());

// Update the federated message
federatedMessage.update(federatedUser);
Expand All @@ -302,7 +302,7 @@ class PeerServer {
const localAuthor = federatedMessage.federatedAuthor.getLocalUser();

// Callback management
peerClient.addCallbackToSkip('afterDeleteMessage', federatedMessage.getFederationId());
Federation.peerClient.addCallbackToSkip('afterDeleteMessage', federatedMessage.getFederationId());

// Create the federated message
deleteMessage(localMessage, localAuthor);
Expand All @@ -318,7 +318,7 @@ class PeerServer {
// Load the federated room
const federatedRoom = FederatedRoom.loadByFederationId(localPeerDomain, federated_room_id);

peerClient.addCallbackToSkip('afterReadMessages', federatedRoom.getFederationId());
Federation.peerClient.addCallbackToSkip('afterReadMessages', federatedRoom.getFederationId());

// Load the user who left
const federatedUser = FederatedUser.loadByFederationId(localPeerDomain, federated_user_id);
Expand Down Expand Up @@ -352,7 +352,7 @@ class PeerServer {
const localMessage = federatedMessage.getLocalMessage();

// Callback management
peerClient.addCallbackToSkip('afterSetReaction', federatedMessage.getFederationId());
Federation.peerClient.addCallbackToSkip('afterSetReaction', federatedMessage.getFederationId());

// Set message reaction
setReaction(localRoom, localUser, localMessage, reaction, shouldReact);
Expand All @@ -378,11 +378,9 @@ class PeerServer {
const localMessage = federatedMessage.getLocalMessage();

// Callback management
peerClient.addCallbackToSkip('afterUnsetReaction', federatedMessage.getFederationId());
Federation.peerClient.addCallbackToSkip('afterUnsetReaction', federatedMessage.getFederationId());

// Unset message reaction
setReaction(localRoom, localUser, localMessage, reaction, shouldReact);
}
}

export default new PeerServer();
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import peerServer from './peerServer';

// Setup routes
import './routes/events';
import './routes/uploads';
import './routes/users';

export default peerServer;
export { PeerServer } from './PeerServer';
Loading

0 comments on commit 65b5be3

Please sign in to comment.