Skip to content

Commit

Permalink
Merge pull request #92 from kaleido-io/evmconnect-compatibility
Browse files Browse the repository at this point in the history
Add evmconnect compatibility
  • Loading branch information
peterbroadhurst committed Aug 19, 2022
2 parents 7393a4d + 6de7743 commit cbb45cf
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 51 deletions.
23 changes: 21 additions & 2 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { HttpService } from '@nestjs/axios';
import { Injectable, Logger } from '@nestjs/common';
import { lastValueFrom } from 'rxjs';
import * as WebSocket from 'ws';
import { IAbiMethod } from '../tokens/tokens.interfaces';
import { basicAuth } from '../utils';
import {
Event,
Expand Down Expand Up @@ -233,18 +234,24 @@ export class EventStreamService {

async createSubscription(
instancePath: string,
eventABI: IAbiMethod,
streamId: string,
event: string,
name: string,
address: string,
methods: IAbiMethod[],
fromBlock = '0', // subscribe from the start of the chain by default
): Promise<EventStreamSubscription> {
const response = await lastValueFrom(
this.http.post<EventStreamSubscription>(
`${this.baseUrl}/${instancePath}/${event}`,
`${instancePath}/subscriptions`,
{
name,
stream: streamId,
fromBlock,
event: eventABI,
address,
methods,
},
{
...basicAuth(this.username, this.password),
Expand All @@ -257,9 +264,12 @@ export class EventStreamService {

async getOrCreateSubscription(
instancePath: string,
eventABI: IAbiMethod,
streamId: string,
event: string,
name: string,
contractAddress: string,
possibleABIs: IAbiMethod[],
fromBlock = '0', // subscribe from the start of the chain by default
): Promise<EventStreamSubscription> {
const existingSubscriptions = await this.getSubscriptions();
Expand All @@ -268,7 +278,16 @@ export class EventStreamService {
this.logger.log(`Existing subscription for ${event}: ${sub.id}`);
return sub;
}
return this.createSubscription(instancePath, streamId, event, name, fromBlock);
return this.createSubscription(
instancePath,
eventABI,
streamId,
event,
name,
contractAddress,
possibleABIs,
fromBlock,
);
}

connect(
Expand Down
171 changes: 122 additions & 49 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,15 @@ const CUSTOM_URI_IID = '0xa1d87d57';

const sendTransactionHeader = 'SendTransaction';
const queryHeader = 'Query';
const tokenCreateFunctionName = 'create';
const tokenCreateEvent = 'TokenPoolCreation';
const tokenCreateEventSignatureOld = 'TokenCreate(address,uint256,bytes)';
const tokenCreateEventSignature = 'TokenPoolCreation(address,uint256,bytes)';
const safeTransferFromFunctionName = 'safeTransferFrom';
const transferSingleEvent = 'TransferSingle';
const transferSingleEventSignature = 'TransferSingle(address,address,address,uint256,uint256)';
const transferBatchEvent = 'TransferBatch';
const safeBatchTransferFromFunctionName = 'safeBatchTransferFrom';
const transferBatchEventSignature = 'TransferBatch(address,address,address,uint256[],uint256[])';
const approvalForAllEvent = 'ApprovalForAll';
const approvalForAllEventSignature = 'ApprovalForAll(address,address,bool)';
Expand Down Expand Up @@ -141,12 +144,22 @@ export class TokensService {
*/
async init() {
this.stream = await this.getStream();
await this.eventstream.getOrCreateSubscription(
this.instancePath,
this.stream.id,
tokenCreateEvent,
packSubscriptionName(this.instancePath, BASE_SUBSCRIPTION_NAME, tokenCreateEvent),
);

const eventABI = ERC1155MixedFungibleAbi.find(m => m.name === tokenCreateEvent);
const methodABI = ERC1155MixedFungibleAbi.find(m => m.name === tokenCreateFunctionName);

if (eventABI !== undefined && methodABI !== undefined) {
await this.eventstream.getOrCreateSubscription(
this.baseUrl,
eventABI,
this.stream.id,
tokenCreateEvent,
packSubscriptionName(this.instancePath, BASE_SUBSCRIPTION_NAME, tokenCreateEvent),
this.contractAddress,
[methodABI],
'0',
);
}
}

private async getContractAddress() {
Expand Down Expand Up @@ -373,7 +386,7 @@ export class TokensService {
const response = await this.sendTransaction(
dto.signer,
dto.requestId,
ERC1155MixedFungibleAbi.find(m => m.name === 'create'),
ERC1155MixedFungibleAbi.find(m => m.name === tokenCreateFunctionName),
[dto.type === TokenType.FUNGIBLE, encodeHex(dto.data ?? '')],
);
return { id: response.id };
Expand All @@ -382,38 +395,90 @@ export class TokensService {
async activatePool(dto: TokenPoolActivate) {
const stream = await this.getStream();
const poolLocator = unpackPoolLocator(dto.poolLocator);
await Promise.all([
this.eventstream.getOrCreateSubscription(
this.instancePath,
stream.id,
tokenCreateEvent,
packSubscriptionName(this.instancePath, dto.poolLocator, tokenCreateEvent, dto.poolData),
poolLocator.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.instancePath,
stream.id,
transferSingleEvent,
packSubscriptionName(this.instancePath, dto.poolLocator, transferSingleEvent, dto.poolData),
poolLocator.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.instancePath,
stream.id,
transferBatchEvent,
packSubscriptionName(this.instancePath, dto.poolLocator, transferBatchEvent, dto.poolData),
poolLocator.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.instancePath,
stream.id,
approvalForAllEvent,
packSubscriptionName(this.instancePath, dto.poolLocator, approvalForAllEvent, dto.poolData),
// Block number is 0 because it is important to receive all approval events,
// so existing approvals will be reflected in the newly created pool
'0',
),
]);

const tokenCreateEventABI = ERC1155MixedFungibleAbi.find(m => m.name === tokenCreateEvent);
const tokenCreateFunctionABI = ERC1155MixedFungibleAbi.find(
m => m.name === tokenCreateFunctionName,
);
const transferSingleEventABI = ERC1155MixedFungibleAbi.find(
m => m.name === transferSingleEvent,
);
const transferBatchEventABI = ERC1155MixedFungibleAbi.find(m => m.name === transferBatchEvent);
const transferFunctionABIs = ERC1155MixedFungibleAbi.filter(
m => m.name?.includes('mint') || m.name?.includes('transfer') || m.name?.includes('burn'),
);
const approvalForAllEventABI = ERC1155MixedFungibleAbi.find(
m => m.name === approvalForAllEvent,
);
const approvalFunctionABIs = ERC1155MixedFungibleAbi.filter(m => m.name?.includes('approval'));

if (
tokenCreateEventABI !== undefined &&
tokenCreateFunctionABI !== undefined &&
transferSingleEventABI !== undefined &&
transferBatchEventABI !== undefined &&
approvalForAllEventABI !== undefined
) {
await Promise.all([
this.eventstream.getOrCreateSubscription(
this.baseUrl,
tokenCreateEventABI,
stream.id,
tokenCreateEvent,
packSubscriptionName(this.instancePath, dto.poolLocator, tokenCreateEvent, dto.poolData),
this.contractAddress,
[tokenCreateFunctionABI],
poolLocator.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.baseUrl,
transferSingleEventABI,
stream.id,
transferSingleEvent,
packSubscriptionName(
this.instancePath,
dto.poolLocator,
transferSingleEvent,
dto.poolData,
),
this.contractAddress,
transferFunctionABIs,
poolLocator.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.baseUrl,
transferBatchEventABI,
stream.id,
transferBatchEvent,
packSubscriptionName(
this.instancePath,
dto.poolLocator,
transferBatchEvent,
dto.poolData,
),
this.contractAddress,
transferFunctionABIs,
poolLocator.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.baseUrl,
approvalForAllEventABI,
stream.id,
approvalForAllEvent,
packSubscriptionName(
this.instancePath,
dto.poolLocator,
approvalForAllEvent,
dto.poolData,
),
this.contractAddress,
approvalFunctionABIs,
// Block number is 0 because it is important to receive all approval events,
// so existing approvals will be reflected in the newly created pool
'0',
),
]);
}
}

async mint(dto: TokenMint): Promise<AsyncResponse> {
Expand Down Expand Up @@ -517,7 +582,7 @@ class TokenListener implements EventListener {
constructor(private readonly service: TokensService) {}

async onEvent(subName: string, event: Event, process: EventProcessor) {
switch (event.signature) {
switch (this.trimEventSignature(event.signature)) {
case tokenCreateEventSignatureOld:
case tokenCreateEventSignature:
process(await this.transformTokenPoolCreationEvent(subName, event));
Expand Down Expand Up @@ -558,6 +623,14 @@ class TokenListener implements EventListener {
return signature.substring(0, signature.indexOf('('));
}

private trimEventSignature(signature: string) {
let firstColon = signature.indexOf(':');
if (firstColon > 0) {
return signature.substring(firstColon + 1);
}
return signature;
}

private async transformTokenPoolCreationEvent(
subName: string,
event: TokenPoolCreationEvent,
Expand Down Expand Up @@ -597,9 +670,9 @@ class TokenListener implements EventListener {
info: eventInfo,
blockchain: {
id: this.formatBlockchainEventId(event),
name: this.stripParamsFromSignature(event.signature),
name: this.stripParamsFromSignature(this.trimEventSignature(event.signature)),
location: 'address=' + event.address,
signature: event.signature,
signature: this.trimEventSignature(event.signature),
timestamp: event.timestamp,
output,
info: {
Expand All @@ -608,7 +681,7 @@ class TokenListener implements EventListener {
transactionHash: event.transactionHash,
logIndex: event.logIndex,
address: event.address,
signature: event.signature,
signature: this.trimEventSignature(event.signature),
},
},
},
Expand Down Expand Up @@ -656,9 +729,9 @@ class TokenListener implements EventListener {
data: decodedData,
blockchain: {
id: eventId,
name: this.stripParamsFromSignature(event.signature),
name: this.stripParamsFromSignature(this.trimEventSignature(event.signature)),
location: 'address=' + event.address,
signature: event.signature,
signature: this.trimEventSignature(event.signature),
timestamp: event.timestamp,
output,
info: {
Expand All @@ -667,7 +740,7 @@ class TokenListener implements EventListener {
transactionHash: event.transactionHash,
logIndex: event.logIndex,
address: event.address,
signature: event.signature,
signature: this.trimEventSignature(event.signature),
},
},
};
Expand Down Expand Up @@ -749,9 +822,9 @@ class TokenListener implements EventListener {
data: decodedData,
blockchain: {
id: eventId,
name: this.stripParamsFromSignature(event.signature),
name: this.stripParamsFromSignature(this.trimEventSignature(event.signature)),
location: 'address=' + event.address,
signature: event.signature,
signature: this.trimEventSignature(event.signature),
timestamp: event.timestamp,
output,
info: {
Expand All @@ -760,7 +833,7 @@ class TokenListener implements EventListener {
transactionHash: event.transactionHash,
logIndex: event.logIndex,
address: event.address,
signature: event.signature,
signature: this.trimEventSignature(event.signature),
},
},
},
Expand Down

0 comments on commit cbb45cf

Please sign in to comment.