Skip to content

Commit

Permalink
Merge pull request #49 from shorsher/auth
Browse files Browse the repository at this point in the history
add basic auth to ethconnect connectons
  • Loading branch information
shorsher committed Dec 17, 2021
2 parents 8f9470d + 8797dc0 commit f15a533
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 25 deletions.
73 changes: 60 additions & 13 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { HttpService } from '@nestjs/axios';
import { Injectable, Logger } from '@nestjs/common';
import { lastValueFrom } from 'rxjs';
import * as WebSocket from 'ws';
import { basicAuth } from '../utils';
import {
Event,
EventStream,
Expand All @@ -40,6 +41,8 @@ export class EventStreamSocket {
constructor(
private url: string,
private topic: string,
private username: string,
private password: string,
private handleEvents: (events: Event[]) => void,
private handleReceipt: (receipt: EventStreamReply) => void,
) {
Expand All @@ -50,7 +53,9 @@ export class EventStreamSocket {
this.disconnectDetected = false;
this.closeRequested = false;

this.ws = new WebSocket(this.url);
const auth =
this.username && this.password ? { auth: `${this.username}:${this.password}` } : undefined;
this.ws = new WebSocket(this.url, auth);
this.ws
.on('open', () => {
if (this.disconnectDetected) {
Expand Down Expand Up @@ -131,16 +136,22 @@ export class EventStreamService {
private readonly logger = new Logger(EventStreamService.name);

private baseUrl: string;
private username: string;
private password: string;

constructor(private http: HttpService) {}

configure(baseUrl: string) {
configure(baseUrl: string, username: string, password: string) {
this.baseUrl = baseUrl;
this.username = username;
this.password = password;
}

async getStreams(): Promise<EventStream[]> {
const response = await lastValueFrom(
this.http.get<EventStream[]>(`${this.baseUrl}/eventstreams`),
this.http.get<EventStream[]>(`${this.baseUrl}/eventstreams`, {
...basicAuth(this.username, this.password),
}),
);
return response.data;
}
Expand All @@ -161,25 +172,47 @@ export class EventStreamService {
const stream = existingStreams.find(s => s.name === streamDetails.name);
if (stream) {
const patchedStreamRes = await lastValueFrom(
this.http.patch<EventStream>(`${this.baseUrl}/eventstreams/${stream.id}`, streamDetails),
this.http.patch<EventStream>(
`${this.baseUrl}/eventstreams/${stream.id}`,
{
...streamDetails,
},
{
...basicAuth(this.username, this.password),
},
),
);
this.logger.log(`Event stream for ${topic}: ${stream.id}`);
return patchedStreamRes.data;
}
const newStreamRes = await lastValueFrom(
this.http.post<EventStream>(`${this.baseUrl}/eventstreams`, streamDetails),
this.http.post<EventStream>(
`${this.baseUrl}/eventstreams`,
{
...streamDetails,
},
{
...basicAuth(this.username, this.password),
},
),
);
this.logger.log(`Event stream for ${topic}: ${newStreamRes.data.id}`);
return newStreamRes.data;
}

async deleteStream(id: string) {
await lastValueFrom(this.http.delete(`${this.baseUrl}/eventstreams/${id}`));
await lastValueFrom(
this.http.delete(`${this.baseUrl}/eventstreams/${id}`, {
...basicAuth(this.username, this.password),
}),
);
}

async getSubscriptions(): Promise<EventStreamSubscription[]> {
const response = await lastValueFrom(
this.http.get<EventStreamSubscription[]>(`${this.baseUrl}/subscriptions`),
this.http.get<EventStreamSubscription[]>(`${this.baseUrl}/subscriptions`, {
...basicAuth(this.username, this.password),
}),
);
return response.data;
}
Expand All @@ -188,6 +221,7 @@ export class EventStreamService {
const response = await lastValueFrom(
this.http.get<EventStreamSubscription>(`${this.baseUrl}/subscriptions/${subId}`, {
validateStatus: status => status < 300 || status === 404,
...basicAuth(this.username, this.password),
}),
);
if (response.status === 404) {
Expand All @@ -204,11 +238,17 @@ export class EventStreamService {
fromBlock = '0', // subscribe from the start of the chain by default
): Promise<EventStreamSubscription> {
const response = await lastValueFrom(
this.http.post<EventStreamSubscription>(`${this.baseUrl}/${instancePath}/${event}`, {
name,
stream: streamId,
fromBlock,
}),
this.http.post<EventStreamSubscription>(
`${this.baseUrl}/${instancePath}/${event}`,
{
name,
stream: streamId,
fromBlock,
},
{
...basicAuth(this.username, this.password),
},
),
);
this.logger.log(`Created subscription ${event}: ${response.data.id}`);
return response.data;
Expand Down Expand Up @@ -236,6 +276,13 @@ export class EventStreamService {
handleEvents: (events: Event[]) => void,
handleReceipt: (receipt: EventStreamReply) => void,
) {
return new EventStreamSocket(url, topic, handleEvents, handleReceipt);
return new EventStreamSocket(
url,
topic,
this.username,
this.password,
handleEvents,
handleReceipt,
);
}
}
8 changes: 6 additions & 2 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,16 @@ async function bootstrap() {
const topic = config.get<string>('ETHCONNECT_TOPIC', 'token');
const shortPrefix = config.get<string>('ETHCONNECT_PREFIX', 'fly');
const autoInit = config.get<string>('AUTO_INIT', 'true');
const username = config.get<string>('ETHCONNECT_USERNAME', '');
const password = config.get<string>('ETHCONNECT_PASSWORD', '');

const wsUrl = ethConnectUrl.replace('http', 'ws') + '/ws';

app.get(EventStreamService).configure(ethConnectUrl);
app.get(EventStreamService).configure(ethConnectUrl, username, password);
app.get(EventStreamProxyGateway).configure(wsUrl, topic);
app.get(TokensService).configure(ethConnectUrl, instancePath, topic, shortPrefix);
app
.get(TokensService)
.configure(ethConnectUrl, instancePath, topic, shortPrefix, username, password);

try {
await app.get(TokensService).migrate();
Expand Down
39 changes: 34 additions & 5 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
// limitations under the License.

import { HttpService } from '@nestjs/axios';
import { AxiosRequestConfig } from 'axios';
import { Injectable, Logger, NotFoundException } from '@nestjs/common';
import { lastValueFrom } from 'rxjs';
import { EventStreamService } from '../event-stream/event-stream.service';
import { Event, EventStream, EventStreamReply } from '../event-stream/event-stream.interfaces';
import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway';
import { EventListener, EventProcessor } from '../eventstream-proxy/eventstream-proxy.interfaces';
import { WebSocketMessage } from '../websocket-events/websocket-events.base';
import { basicAuth } from '../utils';
import {
AsyncResponse,
EthConnectAsyncResponse,
Expand Down Expand Up @@ -76,20 +78,33 @@ export class TokensService {
topic: string;
shortPrefix: string;
stream: EventStream;
username: string;
password: string;

constructor(
private http: HttpService,
private eventstream: EventStreamService,
private proxy: EventStreamProxyGateway,
) {}

configure(baseUrl: string, instancePath: string, topic: string, shortPrefix: string) {
configure(
baseUrl: string,
instancePath: string,
topic: string,
shortPrefix: string,
username: string,
password: string,
) {
this.baseUrl = baseUrl;
this.instancePath = instancePath;
this.instanceUrl = baseUrl + instancePath;
this.topic = topic;
this.shortPrefix = shortPrefix;
this.proxy.addListener(new TokenListener(this.http, this.instanceUrl, this.topic));
this.username = username;
this.password = password;
this.proxy.addListener(
new TokenListener(this.http, this.instanceUrl, this.topic, this.username, this.password),
);
}

/**
Expand Down Expand Up @@ -153,19 +168,24 @@ export class TokensService {
const from = `${this.shortPrefix}-from`;
const sync = `${this.shortPrefix}-sync`;
const id = `${this.shortPrefix}-id`;
return {

const requestOptions: AxiosRequestConfig = {
params: {
[from]: operator,
[sync]: 'false',
[id]: requestId,
},
...basicAuth(this.username, this.password),
};

return requestOptions;
}

async getReceipt(id: string): Promise<EventStreamReply> {
const response = await lastValueFrom(
this.http.get<EventStreamReply>(`${this.baseUrl}/reply/${id}`, {
validateStatus: status => status < 300 || status === 404,
...basicAuth(this.username, this.password),
}),
);
if (response.status === 404) {
Expand Down Expand Up @@ -295,6 +315,7 @@ export class TokensService {
account: dto.account,
id: packTokenId(dto.poolId, dto.tokenIndex),
},
...basicAuth(this.username, this.password),
}),
);
return { balance: response.data.output };
Expand All @@ -306,7 +327,13 @@ class TokenListener implements EventListener {

private uriPattern: string | undefined;

constructor(private http: HttpService, private instanceUrl: string, private topic: string) {}
constructor(
private http: HttpService,
private instanceUrl: string,
private topic: string,
private username: string,
private password: string,
) {}

async onEvent(subName: string, event: Event, process: EventProcessor) {
switch (event.signature) {
Expand Down Expand Up @@ -449,7 +476,9 @@ class TokenListener implements EventListener {
// Fetch and cache the URI pattern (assume it is the same for all tokens)
try {
const response = await lastValueFrom(
this.http.get<EthConnectReturn>(`${this.instanceUrl}/uri?input=0`),
this.http.get<EthConnectReturn>(`${this.instanceUrl}/uri?input=0`, {
...basicAuth(this.username, this.password),
}),
);
this.uriPattern = response.data.output;
} catch (err) {
Expand Down
12 changes: 12 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { AxiosRequestConfig } from 'axios';

export const basicAuth = (username: string, password: string) => {
const requestOptions: AxiosRequestConfig = {};
if (username !== '' && password !== '') {
requestOptions.auth = {
username: username,
password: password,
};
}
return requestOptions;
};
10 changes: 5 additions & 5 deletions test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ describe('AppController (e2e)', () => {
await app.init();

app.get(EventStreamProxyGateway).configure('url', TOPIC);
app.get(TokensService).configure(BASE_URL, INSTANCE_PATH, TOPIC, PREFIX);
app.get(TokensService).configure(BASE_URL, INSTANCE_PATH, TOPIC, PREFIX, '', '');

(app.getHttpServer() as Server).listen();
server = request(app.getHttpServer());
Expand Down Expand Up @@ -523,7 +523,7 @@ describe('AppController (e2e)', () => {
});

expect(http.get).toHaveBeenCalledTimes(1);
expect(http.get).toHaveBeenCalledWith(`${BASE_URL}${INSTANCE_PATH}/uri?input=0`);
expect(http.get).toHaveBeenCalledWith(`${BASE_URL}${INSTANCE_PATH}/uri?input=0`, {});
});

it('Websocket: token burn event', async () => {
Expand Down Expand Up @@ -596,7 +596,7 @@ describe('AppController (e2e)', () => {
});

expect(http.get).toHaveBeenCalledTimes(1);
expect(http.get).toHaveBeenCalledWith(`${BASE_URL}${INSTANCE_PATH}/uri?input=0`);
expect(http.get).toHaveBeenCalledWith(`${BASE_URL}${INSTANCE_PATH}/uri?input=0`, {});
});

it('Websocket: token transfer event', async () => {
Expand Down Expand Up @@ -661,7 +661,7 @@ describe('AppController (e2e)', () => {
});

expect(http.get).toHaveBeenCalledTimes(1);
expect(http.get).toHaveBeenCalledWith(`${BASE_URL}${INSTANCE_PATH}/uri?input=0`);
expect(http.get).toHaveBeenCalledWith(`${BASE_URL}${INSTANCE_PATH}/uri?input=0`, {});
});

it('Websocket: token transfer event from wrong pool', () => {
Expand Down Expand Up @@ -805,7 +805,7 @@ describe('AppController (e2e)', () => {
});

expect(http.get).toHaveBeenCalledTimes(1);
expect(http.get).toHaveBeenCalledWith(`${BASE_URL}${INSTANCE_PATH}/uri?input=0`);
expect(http.get).toHaveBeenCalledWith(`${BASE_URL}${INSTANCE_PATH}/uri?input=0`, {});
});

it('Websocket: success receipt', () => {
Expand Down

0 comments on commit f15a533

Please sign in to comment.