Skip to content

Commit

Permalink
CAMEL-12530 Refactoring, integration tests for web3j Ethereum client
Browse files Browse the repository at this point in the history
  • Loading branch information
bibryam committed Jun 3, 2018
1 parent 9d51f04 commit 9da3f5a
Show file tree
Hide file tree
Showing 30 changed files with 1,302 additions and 236 deletions.
4 changes: 2 additions & 2 deletions components/camel-web3j/src/main/docs/web3j-component.adoc
Expand Up @@ -80,6 +80,7 @@ with the following path and query parameters:
| *gasPrice* (producer) | Gas price used for each paid gas. | | BigInteger | *gasPrice* (producer) | Gas price used for each paid gas. | | BigInteger
| *hashrate* (producer) | A hexadecimal string representation (32 bytes) of the hash rate. | | String | *hashrate* (producer) | A hexadecimal string representation (32 bytes) of the hash rate. | | String
| *headerPowHash* (producer) | The header's pow-hash (256 bits) used for submitting a proof-of-work solution. | | String | *headerPowHash* (producer) | The header's pow-hash (256 bits) used for submitting a proof-of-work solution. | | String
| *index* (producer) | The transactions/uncle index position in the block. | | BigInteger
| *keyName* (producer) | The key name in the database. | | String | *keyName* (producer) | The key name in the database. | | String
| *mixDigest* (producer) | The mix digest (256 bits) used for submitting a proof-of-work solution. | | String | *mixDigest* (producer) | The mix digest (256 bits) used for submitting a proof-of-work solution. | | String
| *nonce* (producer) | The nonce found (64 bits) used for submitting a proof-of-work solution. | | String | *nonce* (producer) | The nonce found (64 bits) used for submitting a proof-of-work solution. | | String
Expand All @@ -90,9 +91,8 @@ with the following path and query parameters:
| *signedTransactionData* (producer) | The signed transaction data for a new message call transaction or a contract creation for signed transactions. | | String | *signedTransactionData* (producer) | The signed transaction data for a new message call transaction or a contract creation for signed transactions. | | String
| *sourceCode* (producer) | The source code to compile. | | String | *sourceCode* (producer) | The source code to compile. | | String
| *transactionHash* (producer) | The information about a transaction requested by transaction hash. | | String | *transactionHash* (producer) | The information about a transaction requested by transaction hash. | | String
| *transactionIndex* (producer) | The transactions index position in the block. | | BigInteger
| *ttl* (producer) | The time to live in seconds of a whisper message. | | BigInteger | *ttl* (producer) | The time to live in seconds of a whisper message. | | BigInteger
| *value* (producer) | The value sent within a transaction. | | BigInteger | *value* (producer) | The value sent within a transaction. | | BigInteger
| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
|=== |===
// endpoint options: END // endpoint options: END
Expand Up @@ -63,7 +63,7 @@ public class Web3jConfiguration implements Cloneable {
private String signedTransactionData; private String signedTransactionData;


@UriParam(label = "producer") @UriParam(label = "producer")
private BigInteger transactionIndex; private BigInteger index;


@UriParam(label = "producer") @UriParam(label = "producer")
private BigInteger filterId; private BigInteger filterId;
Expand Down Expand Up @@ -334,15 +334,15 @@ public void setFilterId(BigInteger filterId) {
this.filterId = filterId; this.filterId = filterId;
} }


public BigInteger getTransactionIndex() { public BigInteger getIndex() {
return transactionIndex; return index;
} }


/** /**
* The transactions index position in the block. * The transactions/uncle index position in the block.
*/ */
public void setTransactionIndex(BigInteger transactionIndex) { public void setIndex(BigInteger index) {
this.transactionIndex = transactionIndex; this.index = index;
} }


public String getSignedTransactionData() { public String getSignedTransactionData() {
Expand Down
Expand Up @@ -104,17 +104,17 @@ public interface Web3jConstants {
String POSITION = "POSITION"; String POSITION = "POSITION";
String BLOCK_HASH = "BLOCK_HASH"; String BLOCK_HASH = "BLOCK_HASH";
String TRANSACTION_HASH = "TRANSACTION_HASH"; String TRANSACTION_HASH = "TRANSACTION_HASH";
String SHA3_HASH_OF_DATA_TO_SIGN = "sha3HashOfDataToSign"; String SHA3_HASH_OF_DATA_TO_SIGN = "SHA3_HASH_OF_DATA_TO_SIGN";
String SIGNED_TRANSACTION_DATA = "signedTransactionData"; String SIGNED_TRANSACTION_DATA = "SIGNED_TRANSACTION_DATA";
String FULL_TRANSACTION_OBJECTS = "fullTransactionObjects"; String FULL_TRANSACTION_OBJECTS = "FULL_TRANSACTION_OBJECTS";
String TRANSACTION_INDEX = "fullTransactionObjects"; String INDEX = "INDEX";
String SOURCE_CODE = "SOURCE_CODE"; String SOURCE_CODE = "SOURCE_CODE";
String FILTER_ID = "FILTER_ID"; String FILTER_ID = "FILTER_ID";
String DATABASE_NAME = "DATABASE_NAME"; String DATABASE_NAME = "DATABASE_NAME";
String KEY_NAME = "KEY_NAME"; String KEY_NAME = "KEY_NAME";
String NONCE = "NONCE"; String NONCE = "NONCE";
String HEADER_POW_HASH = "headerPowHash"; String HEADER_POW_HASH = "HEADER_POW_HASH";
String MIX_DIGEST = "mixDigest"; String MIX_DIGEST = "MIX_DIGEST";
String CLIENT_ID = "CLIENT_ID"; String CLIENT_ID = "CLIENT_ID";
String GAS_PRICE = "GAS_PRICE"; String GAS_PRICE = "GAS_PRICE";
String GAS_LIMIT = "GAS_LIMIT"; String GAS_LIMIT = "GAS_LIMIT";
Expand Down
Expand Up @@ -53,106 +53,105 @@ public Web3jEndpoint getEndpoint() {
@Override @Override
protected void doStart() throws Exception { protected void doStart() throws Exception {
super.doStart(); super.doStart();
LOG.info("Subscribing: " + this.configuration); LOG.info("Subscribing to: " + endpoint.getNodeAddress());
switch (configuration.getOperation()) { switch (configuration.getOperation()) {
case Web3jConstants.ETH_LOG_OBSERVABLE: case Web3jConstants.ETH_LOG_OBSERVABLE:
EthFilter ethFilter = endpoint.buildEthFilter(configuration.getFromBlock(), configuration.getToBlock(), configuration.getAddresses(), configuration.getTopics()); EthFilter ethFilter = endpoint.buildEthFilter(configuration.getFromBlock(), configuration.getToBlock(), configuration.getAddresses(), configuration.getTopics());
subscription = web3j.ethLogObservable(ethFilter).subscribe( subscription = web3j.ethLogObservable(ethFilter).subscribe(
x -> ethLogObservable(x), x -> ethLogObservable(x),
t -> processError(t), t -> processError(t, Web3jConstants.ETH_LOG_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.ETH_LOG_OBSERVABLE)
); );
break; break;


case Web3jConstants.ETH_BLOCK_HASH_OBSERVABLE: case Web3jConstants.ETH_BLOCK_HASH_OBSERVABLE:
subscription = web3j.ethBlockHashObservable().subscribe( subscription = web3j.ethBlockHashObservable().subscribe(
x -> ethBlockHashObservable(x), x -> ethBlockHashObservable(x),
t -> processError(t), t -> processError(t, Web3jConstants.ETH_BLOCK_HASH_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.ETH_BLOCK_HASH_OBSERVABLE)
); );
break; break;


case Web3jConstants.ETH_PENDING_TRANSACTION_HASH_OBSERVABLE: case Web3jConstants.ETH_PENDING_TRANSACTION_HASH_OBSERVABLE:
subscription = web3j.ethPendingTransactionHashObservable().subscribe( subscription = web3j.ethPendingTransactionHashObservable().subscribe(
x -> ethPendingTransactionHashObservable(x), x -> ethPendingTransactionHashObservable(x),
t -> processError(t), t -> processError(t, Web3jConstants.ETH_PENDING_TRANSACTION_HASH_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.ETH_PENDING_TRANSACTION_HASH_OBSERVABLE)
); );
break; break;


case Web3jConstants.TRANSACTION_OBSERVABLE: case Web3jConstants.TRANSACTION_OBSERVABLE:
subscription = web3j.transactionObservable().subscribe( subscription = web3j.transactionObservable().subscribe(
x -> processTransaction(x), x -> processTransaction(x),
t -> processError(t), t -> processError(t, Web3jConstants.TRANSACTION_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.TRANSACTION_OBSERVABLE)
); );
break; break;


case Web3jConstants.PENDING_TRANSACTION_OBSERVABLE: case Web3jConstants.PENDING_TRANSACTION_OBSERVABLE:
subscription = web3j.pendingTransactionObservable().subscribe( subscription = web3j.pendingTransactionObservable().subscribe(
x -> processTransaction(x), x -> processTransaction(x),
t -> processError(t), t -> processError(t, Web3jConstants.PENDING_TRANSACTION_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.PENDING_TRANSACTION_OBSERVABLE)
); );
break; break;


case Web3jConstants.BLOCK_OBSERVABLE: case Web3jConstants.BLOCK_OBSERVABLE:
subscription = web3j.blockObservable(configuration.isFullTransactionObjects()).subscribe( subscription = web3j.blockObservable(configuration.isFullTransactionObjects()).subscribe(
x -> blockObservable(x), x -> blockObservable(x),
t -> processError(t), t -> processError(t, Web3jConstants.BLOCK_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.BLOCK_OBSERVABLE)
); );
break; break;


case Web3jConstants.REPLAY_BLOCKS_OBSERVABLE: case Web3jConstants.REPLAY_BLOCKS_OBSERVABLE:
subscription = web3j.replayBlocksObservable(configuration.getFromBlock(), configuration.getToBlock(), configuration.isFullTransactionObjects()).subscribe( subscription = web3j.replayBlocksObservable(configuration.getFromBlock(), configuration.getToBlock(), configuration.isFullTransactionObjects()).subscribe(
x -> blockObservable(x), x -> blockObservable(x),
t -> processError(t), t -> processError(t, Web3jConstants.REPLAY_BLOCKS_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.REPLAY_BLOCKS_OBSERVABLE)
); );
break; break;


case Web3jConstants.REPLAY_TRANSACTIONS_OBSERVABLE: case Web3jConstants.REPLAY_TRANSACTIONS_OBSERVABLE:
subscription = web3j.replayTransactionsObservable(configuration.getFromBlock(), configuration.getToBlock()).subscribe( subscription = web3j.replayTransactionsObservable(configuration.getFromBlock(), configuration.getToBlock()).subscribe(
x -> processTransaction(x), x -> processTransaction(x),
t -> processError(t), t -> processError(t, Web3jConstants.REPLAY_TRANSACTIONS_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.REPLAY_TRANSACTIONS_OBSERVABLE)
); );
break; break;


case Web3jConstants.CATCH_UP_TO_LATEST_BLOCK_OBSERVABLE: case Web3jConstants.CATCH_UP_TO_LATEST_BLOCK_OBSERVABLE:
subscription = web3j.catchUpToLatestBlockObservable(configuration.getFromBlock(), configuration.isFullTransactionObjects()).subscribe( subscription = web3j.catchUpToLatestBlockObservable(configuration.getFromBlock(), configuration.isFullTransactionObjects()).subscribe(
x -> blockObservable(x), x -> blockObservable(x),
t -> processError(t), t -> processError(t, Web3jConstants.CATCH_UP_TO_LATEST_BLOCK_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.CATCH_UP_TO_LATEST_BLOCK_OBSERVABLE)
); );
break; break;


case Web3jConstants.CATCH_UP_TO_LATEST_TRANSACTION_OBSERVABLE: case Web3jConstants.CATCH_UP_TO_LATEST_TRANSACTION_OBSERVABLE:
subscription = web3j.catchUpToLatestTransactionObservable(configuration.getFromBlock()).subscribe( subscription = web3j.catchUpToLatestTransactionObservable(configuration.getFromBlock()).subscribe(
x -> processTransaction(x), x -> processTransaction(x),
t -> processError(t), t -> processError(t, Web3jConstants.CATCH_UP_TO_LATEST_TRANSACTION_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.CATCH_UP_TO_LATEST_TRANSACTION_OBSERVABLE)
); );
break; break;


case Web3jConstants.CATCH_UP_TO_LATEST_AND_SUBSCRIBE_TO_NEW_BLOCKS_OBSERVABLE: case Web3jConstants.CATCH_UP_TO_LATEST_AND_SUBSCRIBE_TO_NEW_BLOCKS_OBSERVABLE:
subscription = web3j.catchUpToLatestAndSubscribeToNewBlocksObservable(configuration.getFromBlock(), configuration.isFullTransactionObjects()).subscribe( subscription = web3j.catchUpToLatestAndSubscribeToNewBlocksObservable(configuration.getFromBlock(), configuration.isFullTransactionObjects()).subscribe(
x -> blockObservable(x), x -> blockObservable(x),
t -> processError(t), t -> processError(t, Web3jConstants.CATCH_UP_TO_LATEST_AND_SUBSCRIBE_TO_NEW_BLOCKS_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.CATCH_UP_TO_LATEST_AND_SUBSCRIBE_TO_NEW_BLOCKS_OBSERVABLE)
); );
break; break;


case Web3jConstants.CATCH_UP_TO_LATEST_AND_SUBSCRIBE_TO_NEW_TRANSACTIONS_OBSERVABLE: case Web3jConstants.CATCH_UP_TO_LATEST_AND_SUBSCRIBE_TO_NEW_TRANSACTIONS_OBSERVABLE:
subscription = web3j.catchUpToLatestAndSubscribeToNewTransactionsObservable(configuration.getFromBlock()).subscribe( subscription = web3j.catchUpToLatestAndSubscribeToNewTransactionsObservable(configuration.getFromBlock()).subscribe(
x -> processTransaction(x), x -> processTransaction(x),
t -> processError(t), t -> processError(t, Web3jConstants.CATCH_UP_TO_LATEST_AND_SUBSCRIBE_TO_NEW_TRANSACTIONS_OBSERVABLE),
() -> processDone() () -> processDone(Web3jConstants.CATCH_UP_TO_LATEST_AND_SUBSCRIBE_TO_NEW_TRANSACTIONS_OBSERVABLE)
); );
break; break;



default: default:
throw new IllegalArgumentException("Unsupported operation " + configuration.getOperation()); throw new IllegalArgumentException("Unsupported operation " + configuration.getOperation());
} }
Expand All @@ -174,13 +173,6 @@ private EthFilter buildEthFilter() {
return ethFilter; return ethFilter;
} }


private void ethLogObservable(Log x) {
LOG.debug("processLogObservable " + x);
Exchange exchange = this.getEndpoint().createExchange();
exchange.getIn().setBody(x);
processEvent(exchange);
}

private void ethBlockHashObservable(String x) { private void ethBlockHashObservable(String x) {
LOG.debug("processEthBlock " + x); LOG.debug("processEthBlock " + x);
Exchange exchange = this.getEndpoint().createExchange(); Exchange exchange = this.getEndpoint().createExchange();
Expand All @@ -196,9 +188,10 @@ private void ethPendingTransactionHashObservable(String x) {
} }


private void blockObservable(EthBlock x) { private void blockObservable(EthBlock x) {
LOG.debug("processEthBlock " + x); EthBlock.Block block = x.getBlock();
LOG.debug("processEthBlock " + block);
Exchange exchange = this.getEndpoint().createExchange(); Exchange exchange = this.getEndpoint().createExchange();
exchange.getIn().setBody(x); exchange.getIn().setBody(block);
processEvent(exchange); processEvent(exchange);
} }


Expand All @@ -209,6 +202,13 @@ private void processTransaction(Transaction x) {
processEvent(exchange); processEvent(exchange);
} }


private void ethLogObservable(Log x) {
LOG.debug("processLogObservable " + x);
Exchange exchange = this.getEndpoint().createExchange();
exchange.getIn().setBody(x);
processEvent(exchange);
}

public void processEvent(Exchange exchange) { public void processEvent(Exchange exchange) {
LOG.debug("processEvent " + exchange); LOG.debug("processEvent " + exchange);
try { try {
Expand All @@ -218,15 +218,16 @@ public void processEvent(Exchange exchange) {
} }
} }


private void processDone() { private void processDone(String operation) {
LOG.debug("processDone "); LOG.debug("processDone for operation: " + operation);
Exchange exchange = this.getEndpoint().createExchange(); Exchange exchange = this.getEndpoint().createExchange();
exchange.getIn().setHeader("status", "done"); exchange.getIn().setHeader("status", "done");
exchange.getIn().setHeader("operation", operation);
processEvent(exchange); processEvent(exchange);
} }


private void processError(Throwable throwable) { private void processError(Throwable throwable, String operation) {
LOG.debug("processError " + throwable); LOG.debug("processError for operation: " + operation + " " + throwable);
Exchange exchange = this.getEndpoint().createExchange(); Exchange exchange = this.getEndpoint().createExchange();
exchange.setException(throwable); exchange.setException(throwable);
processEvent(exchange); processEvent(exchange);
Expand Down

0 comments on commit 9da3f5a

Please sign in to comment.