Skip to content

Commit

Permalink
fix: generalize mirror channel in hopes of getting it working on web
Browse files Browse the repository at this point in the history
  • Loading branch information
mehcode committed Nov 12, 2020
1 parent 7f6add5 commit e75bd2a
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 94 deletions.
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@
"test:integration:browser:safari": "vite -c vite.config.cjs serve --port 9013 test/ & sleep 2; mocha-webdriver-runner --safari http://localhost:9013/integration.html; kill %1"
},
"dependencies": {
"@hashgraph/protobufjs": "^6.10.1-hashgraph.2",
"@grpc/grpc-js": "^1.1.7",
"@hashgraph/cryptography": "^1.0.6",
"@hashgraph/proto": "^1.0.13",
"@hashgraph/proto": "^1.0.14",
"bignumber.js": "^9.0.1",
"long": "^4.0.0"
},
Expand Down
2 changes: 1 addition & 1 deletion src/account/AccountInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Timestamp from "../Timestamp.js";
import { keyFromProtobuf, keyToProtobuf } from "../cryptography/protobuf.js";
import Long from "long";
import TokenRelationshipMap from "./TokenRelationshipMap.js";
import proto from "@hashgraph/proto";
import * as proto from "@hashgraph/proto";
import Duration from "../Duration.js";

/**
Expand Down
33 changes: 8 additions & 25 deletions src/channel/MirrorChannel.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,25 @@
import { Client, credentials } from "@grpc/grpc-js";

/**
* @internal
* @abstract
*/
export default class MirrorChannel {
/**
* @internal
* @param {string} address
*/
constructor(address) {
/**
* @type {Client}
* @private
*/
this._client = new Client(address, credentials.createInsecure());
}

/**
* @abstract
* @returns {void}
*/
close() {
this._client.close();
throw new Error("not implemented");
}

/**
* @override
* @abstract
* @internal
* @param {Uint8Array} requestData
* @returns {import("@grpc/grpc-js").ClientReadableStream<Buffer>}
* @param {(error: number?, data: Uint8Array?) => void} callback
* @returns {() => void}
*/
makeServerStreamRequest(requestData) {
return this._client.makeServerStreamRequest(
// `/proto.ConsensusService/SubscribeTopic`,
"/com.hedera.mirror.api.proto.ConsensusService/subscribeTopic",
(value) => value,
(value) => value,
Buffer.from(requestData)
);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
makeServerStreamRequest(requestData, callback) {
throw new Error("not implemented");
}
}
60 changes: 60 additions & 0 deletions src/channel/NodeMirrorChannel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import * as grpc from "@grpc/grpc-js";
import MirrorChannel from "./MirrorChannel.js";

/**
* @internal
*/
export default class NodeMirrorChannel extends MirrorChannel {
/**
* @internal
* @param {string} address
*/
constructor(address) {
super();

/**
* @type {grpc.Client}
* @private
*/
this._client = new grpc.Client(
address,
grpc.credentials.createInsecure()
);
}

/**
* @override
* @returns {void}
*/
close() {
this._client.close();
}

/**
* @override
* @internal
* @param {Uint8Array} requestData
* @param {(error: number?, data: Uint8Array?) => void} callback
* @returns {() => void}
*/
makeServerStreamRequest(requestData, callback) {
let stream = this._client
.makeServerStreamRequest(
// `/proto.ConsensusService/SubscribeTopic`,
"/com.hedera.mirror.api.proto.ConsensusService/subscribeTopic",
(value) => value,
(value) => value,
Buffer.from(requestData)
)
.on("data", (/** @type {Uint8Array} */ data) => {
callback(null, data);
})
.on("status", (/** @type {grpc.StatusObject} */ status) => {
callback(status.code, null);
});

return () => {
stream.cancel();
};
}
}
8 changes: 4 additions & 4 deletions src/client/NodeClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import fs from "fs";
import util from "util";
import Client from "./Client.js";
import NodeChannel from "../channel/NodeChannel.js";
import MirrorChannel from "../channel/MirrorChannel.js";
import NodeMirrorChannel from "../channel/NodeMirrorChannel.js";
import AccountId from "../account/AccountId.js";

const readFileAsync = util.promisify(fs.readFile);
Expand Down Expand Up @@ -88,7 +88,7 @@ export const MirrorNetwork = {
};

/**
* @augments {Client<NodeChannel, *>}
* @augments {Client<NodeChannel, NodeMirrorChannel>}
*/
export default class NodeClient extends Client {
/**
Expand Down Expand Up @@ -268,9 +268,9 @@ export default class NodeClient extends Client {

/**
* @override
* @returns {(address: string) => MirrorChannel}
* @returns {(address: string) => NodeMirrorChannel}
*/
_createMirrorNetworkChannel() {
return (address) => new MirrorChannel(address);
return (address) => new NodeMirrorChannel(address);
}
}
2 changes: 1 addition & 1 deletion src/contract/ContractInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Duration from "../Duration.js";
import Hbar from "../Hbar.js";
import { keyFromProtobuf, keyToProtobuf } from "../cryptography/protobuf.js";
import Long from "long";
import proto from "@hashgraph/proto";
import * as proto from "@hashgraph/proto";
import TokenRelationshipMap from "../account/TokenRelationshipMap.js";

/**
Expand Down
2 changes: 1 addition & 1 deletion src/file/FileInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
keyListFromProtobuf,
keyListToProtobuf,
} from "../cryptography/protobuf.js";
import proto from "@hashgraph/proto";
import * as proto from "@hashgraph/proto";

/**
* Response when the client sends the node CryptoGetInfoQuery.
Expand Down
2 changes: 1 addition & 1 deletion src/network/NetworkVersionInfo.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import SemanticVersion from "./SemanticVersion.js";
import proto from "@hashgraph/proto";
import * as proto from "@hashgraph/proto";

/**
* Response when the client sends the node CryptoGetVersionInfoQuery.
Expand Down
2 changes: 1 addition & 1 deletion src/network/SemanticVersion.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import proto from "@hashgraph/proto";
import * as proto from "@hashgraph/proto";

export default class SemanticVersion {
/**
Expand Down
78 changes: 30 additions & 48 deletions src/topic/TopicMessageQuery.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import * as grpc from "@grpc/grpc-js";
import TransactionId from "../transaction/TransactionId.js";
import SubscriptionHandle from "./SubscriptionHandle.js";
import TopicMessage from "./TopicMessage.js";
Expand All @@ -10,7 +9,6 @@ import Timestamp from "../Timestamp.js";
/**
* @typedef {import("../channel/Channel.js").default} Channel
* @typedef {import("../channel/MirrorChannel.js").default} MirrorChannel
*
*/

/**
Expand Down Expand Up @@ -157,36 +155,44 @@ export default class TopicMessageQuery {
/**
* @param {SubscriptionHandle} handle
* @param {number} attempt
* @param {import("../client/Client.js").default<*, MirrorChannel>} client
* @param {import("../client/Client.js").default<Channel, MirrorChannel>} client
* @param {(message: TopicMessage) => void} listener
* @returns {void}
*/
_makeServerStreamRequest(handle, attempt, client, listener) {
/** @type {Map<string, proto.ConsensusTopicResponse[]>} */

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const list = new Map();

const response = client._mirrorNetwork
const request = proto.ConsensusTopicQuery.encode({
topicID: this._topicId != null ? this._topicId._toProtobuf() : null,
consensusStartTime:
this._startTime != null ? this._startTime._toProtobuf() : null,
consensusEndTime:
this._endTime != null ? this._endTime._toProtobuf() : null,
limit: this._limit != null ? this._limit : null,
}).finish();

const cancel = client._mirrorNetwork
.getNextMirrorNode()
.channel.makeServerStreamRequest(
proto.ConsensusTopicQuery.encode({
topicID:
this._topicId != null
? this._topicId._toProtobuf()
: null,
consensusStartTime:
this._startTime != null
? this._startTime._toProtobuf()
: null,
consensusEndTime:
this._endTime != null
? this._endTime._toProtobuf()
: null,
limit: this._limit != null ? this._limit : null,
}).finish()
)
.on("data", (/** @type {Uint8Array} */ bytes) => {
const message = proto.ConsensusTopicResponse.decode(bytes);
.channel.makeServerStreamRequest(request, (err, data) => {
if (data == null || err != null) {
// NOT_FOUND or UNAVAILABLE
if (attempt < 10 && (err === 5 || err === 14)) {
setTimeout(() => {
this._makeServerStreamRequest(
handle,
attempt + 1,
client,
listener
);
}, 250 * 2 ** attempt);
}
return;
}

const message = proto.ConsensusTopicResponse.decode(data);

if (message.chunkInfo == null) {
listener(TopicMessage._ofSingle(message));
Expand Down Expand Up @@ -215,32 +221,8 @@ export default class TopicMessageQuery {
listener(TopicMessage._ofMany(responses));
}
}
})
.on("status", (/** @type {grpc.StatusObject} */ status) => {
if (
attempt < 10 &&
(status.code === grpc.status.NOT_FOUND ||
status.code === grpc.status.UNAVAILABLE)
) {
setTimeout(() => {
this._makeServerStreamRequest(
handle,
attempt + 1,
client,
listener
);
}, 250 * 2 ** attempt);
}
})
// eslint-disable-next-line @typescript-eslint/no-unused-vars
.on("end", (/** @type {grpc.StatusObject} */ status) => {
// Do nothing
})
// eslint-disable-next-line @typescript-eslint/no-unused-vars
.on("error", (/** @type {grpc.StatusObject} */ status) => {
// Do nothing
});

handle._setCall(() => response.cancel());
handle._setCall(() => cancel());
}
}
4 changes: 2 additions & 2 deletions src/transaction/Transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import Long from "long";
import * as sha384 from "../cryptography/sha384.js";
import * as hex from "../encoding/hex.js";
import {
Reader,
Transaction as ProtoTransaction,
TransactionBody as ProtoTransactionBody,
} from "@hashgraph/proto";
import AccountId from "../account/AccountId.js";
import $protobuf from "@hashgraph/protobufjs/minimal.js";

/**
* @typedef {import("bignumber.js").default} BigNumber
Expand Down Expand Up @@ -144,7 +144,7 @@ export default class Transaction extends Executable {
/** @type {Map<string, Map<AccountId, proto.ITransaction>>} */
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const transactions = new Map();
const reader = new $protobuf.Reader(bytes);
const reader = new Reader(bytes);
let first;

// eslint-disable-next-line no-constant-condition
Expand Down
2 changes: 1 addition & 1 deletion src/transaction/TransactionHashMap.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import AccountId from "../account/AccountId.js";
import proto from "@hashgraph/proto";
import * as proto from "@hashgraph/proto";
import * as sha384 from "../cryptography/sha384.js";
import ObjectMap from "../ObjectMap.js";

Expand Down
2 changes: 1 addition & 1 deletion src/transaction/TransactionId.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import AccountId from "../account/AccountId.js";
import Timestamp from "../Timestamp.js";
import proto from "@hashgraph/proto";
import * as proto from "@hashgraph/proto";

/**
* The client-generated ID for a transaction.
Expand Down
2 changes: 1 addition & 1 deletion src/transaction/TransactionReceipt.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import TokenId from "../token/TokenId.js";
import ExchangeRate from "../ExchangeRate.js";
import Status from "../Status.js";
import Long from "long";
import proto from "@hashgraph/proto";
import * as proto from "@hashgraph/proto";

/**
* The consensus result for a transaction, which might not be currently known,
Expand Down
2 changes: 1 addition & 1 deletion src/transaction/TransactionRecord.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Timestamp from "../Timestamp.js";
import Hbar from "../Hbar.js";
import Transfer from "../Transfer.js";
import ContractFunctionResult from "../contract/ContractFunctionResult.js";
import proto from "@hashgraph/proto";
import * as proto from "@hashgraph/proto";

/**
* Response when the client sends the node TransactionGetRecordResponse.
Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@
bignumber.js "^9.0.1"
tweetnacl "^1.0.3"

"@hashgraph/proto@^1.0.13":
version "1.0.13"
resolved "https://registry.yarnpkg.com/@hashgraph/proto/-/proto-1.0.13.tgz#98b6bb49eeeed552db3b577617ec88febb17199a"
integrity sha512-G0tap+hhigxJ1ni8E/JMirGB+RrnzAudm3QFFGUPoiKKFx+T+lBX0nO11BG+IjzSoDql6zdfR/sJQoch7Y+AvA==
"@hashgraph/proto@^1.0.14":
version "1.0.14"
resolved "https://registry.yarnpkg.com/@hashgraph/proto/-/proto-1.0.14.tgz#d47aabc60fdc0bda4c457d94e1d7f3ab054f8e5d"
integrity sha512-kBUQv5/0tBD6Wg9b4gqE88gv3s4SEOAeOayF3KthEwlXapA3QenwF2gpNXd0zslEuBKtqHVcybEKMBx6EBQJkA==
dependencies:
"@hashgraph/protobufjs" "^6.10.1-hashgraph.2"

Expand Down

0 comments on commit e75bd2a

Please sign in to comment.