2727import java .util .Arrays ;
2828import java .util .Collection ;
2929import java .util .Collections ;
30+ import java .util .EnumSet ;
3031import java .util .HashMap ;
3132import java .util .HashSet ;
3233import java .util .LinkedHashMap ;
3536import java .util .Map ;
3637import java .util .Objects ;
3738import java .util .Set ;
38- import java .util .Vector ;
3939import java .util .concurrent .BlockingQueue ;
4040import java .util .concurrent .CompletableFuture ;
4141import java .util .concurrent .ExecutionException ;
8787import org .hyperledger .fabric .protos .peer .Query .ChaincodeQueryResponse ;
8888import org .hyperledger .fabric .protos .peer .Query .ChannelQueryResponse ;
8989import org .hyperledger .fabric .sdk .BlockEvent .TransactionEvent ;
90+ import org .hyperledger .fabric .sdk .Peer .PeerRole ;
9091import org .hyperledger .fabric .sdk .exception .CryptoException ;
9192import org .hyperledger .fabric .sdk .exception .EventHubException ;
9293import org .hyperledger .fabric .sdk .exception .InvalidArgumentException ;
@@ -138,7 +139,18 @@ public class Channel implements Serializable {
138139 private final String name ;
139140
140141 // The peers on this channel to which the client can connect
141- final Collection <Peer > peers = new Vector <>();
142+ private final Collection <Peer > peers = Collections .synchronizedSet (new HashSet <>());
143+ // final Set<Peer> eventingPeers = Collections.synchronizedSet(new HashSet<>());
144+
145+ private final Map <PeerRole , Set <Peer >> peerRoleSetMap = Collections .synchronizedMap (new HashMap <>());
146+
147+ {
148+ for (Peer .PeerRole peerRole : PeerRole .ALL ) {
149+
150+ peerRoleSetMap .put (peerRole , Collections .synchronizedSet (new HashSet <>()));
151+
152+ }
153+ }
142154
143155 // Temporary variables to control how long to wait for deploy and invoke to complete before
144156 // emitting events. This will be removed when the SDK is able to receive events from the
@@ -507,6 +519,66 @@ public String getName() {
507519 */
508520 public Channel addPeer (Peer peer ) throws InvalidArgumentException {
509521
522+ return addPeer (peer , PeerOptions .create ());
523+
524+ }
525+
526+ /**
527+ * Options for the peer.
528+ * <p>
529+ * Note: This code pasted from: https://gerrit.hyperledger.org/r/#/c/13895/ - WIP FAB-6066 Channelservice for events
530+ */
531+ public static class PeerOptions { // allows for future options with less likelihood of breaking api.
532+
533+ private EnumSet <PeerRole > peerRoles ;
534+ private String blockType = "Filter" ; // not yet used.
535+
536+ private PeerOptions () {
537+
538+ }
539+
540+ EnumSet <PeerRole > getPeerRoles () {
541+ if (peerRoles == null ) {
542+ return PeerRole .ALL ;
543+ }
544+ return peerRoles ;
545+ }
546+
547+ String getBlockType () {
548+ return blockType ;
549+ }
550+
551+ public PeerOptions setPeerRoles (EnumSet <PeerRole > peerRoles ) {
552+ this .peerRoles = peerRoles ;
553+ return this ;
554+ }
555+
556+ public PeerOptions addPeerRole (PeerRole peerRole ) {
557+
558+ if (peerRoles == null ) {
559+ peerRoles = EnumSet .noneOf (PeerRole .class );
560+
561+ }
562+ peerRoles .add (peerRole );
563+ return this ;
564+ }
565+
566+ public static PeerOptions create () {
567+ return new PeerOptions ();
568+ }
569+
570+ }
571+
572+ /**
573+ * Add a peer to the channel
574+ *
575+ * @param peer The Peer to add.
576+ * @param peerOptions see {@link PeerRole}
577+ * @return Channel The current channel added.
578+ * @throws InvalidArgumentException
579+ */
580+ public Channel addPeer (Peer peer , PeerOptions peerOptions ) throws InvalidArgumentException {
581+
510582 if (shutdown ) {
511583 throw new InvalidArgumentException (format ("Channel %s has been shutdown." , name ));
512584 }
@@ -515,14 +587,32 @@ public Channel addPeer(Peer peer) throws InvalidArgumentException {
515587 throw new InvalidArgumentException ("Peer is invalid can not be null." );
516588 }
517589
590+ if (peer .getChannel () != null && peer .getChannel () != this ) {
591+ throw new InvalidArgumentException (format ("Peer already connected to channel %s" , peer .getChannel ().getName ()));
592+ }
593+
594+ if (null == peerOptions ) {
595+ throw new InvalidArgumentException ("Peer is invalid can not be null." );
596+ }
597+
518598 peer .setChannel (this );
519599
520600 peers .add (peer );
521601
602+ for (Map .Entry <PeerRole , Set <Peer >> peerRole : peerRoleSetMap .entrySet ()) {
603+ if (peerOptions .getPeerRoles ().contains (peerRole .getKey ())) {
604+ peerRole .getValue ().add (peer );
605+ }
606+ }
522607 return this ;
523608 }
524609
610+
525611 public Channel joinPeer (Peer peer ) throws ProposalException {
612+ return joinPeer (peer , PeerOptions .create ());
613+ }
614+
615+ public Channel joinPeer (Peer peer , PeerOptions peerOptions ) throws ProposalException {
526616
527617 logger .debug (format ("Channel %s joining peer %s, url: %s" , name , peer .getName (), peer .getUrl ()));
528618
@@ -558,7 +648,7 @@ public Channel joinPeer(Peer peer) throws ProposalException {
558648 SignedProposal signedProposal = getSignedProposal (transactionContext , joinProposal );
559649 logger .debug ("Got signed proposal." );
560650
561- addPeer (peer ); //need to add peer.
651+ addPeer (peer , peerOptions ); //need to add peer.
562652
563653 Collection <ProposalResponse > resp = sendProposalToPeers (new ArrayList <>(Collections .singletonList (peer )),
564654 signedProposal , transactionContext );
@@ -568,27 +658,33 @@ public Channel joinPeer(Peer peer) throws ProposalException {
568658 if (pro .getStatus () == ProposalResponse .Status .SUCCESS ) {
569659 logger .info (format ("Peer %s joined into channel %s" , peer .getName (), name ));
570660 } else {
571- peers .remove (peer );
572- peer .unsetChannel ();
661+ removePeer (peer );
573662 throw new ProposalException (format ("Join peer to channel %s failed. Status %s, details: %s" ,
574663 name , pro .getStatus ().toString (), pro .getMessage ()));
575664
576665 }
577666 } catch (ProposalException e ) {
578- peers .remove (peer );
579- peer .unsetChannel ();
667+ removePeer (peer );
580668 logger .error (e );
581669 throw e ;
582670 } catch (Exception e ) {
583671 peers .remove (peer );
584- peer .unsetChannel ();
585672 logger .error (e );
586673 throw new ProposalException (e .getMessage (), e );
587674 }
588675
589676 return this ;
590677 }
591678
679+ private void removePeer (Peer peer ) {
680+ peers .remove (peer );
681+
682+ for (Set <Peer > peerRoleSet : peerRoleSetMap .values ()) {
683+ peerRoleSet .remove (peer );
684+ }
685+ peer .unsetChannel ();
686+ }
687+
592688 /**
593689 * Add an Orderer to this channel.
594690 *
@@ -648,6 +744,21 @@ public Collection<Peer> getPeers() {
648744 return Collections .unmodifiableCollection (peers );
649745 }
650746
747+ /**
748+ * Get the peers for this channel.
749+ *
750+ * @return the peers.
751+ */
752+ public Collection <Peer > getPeers (EnumSet <PeerRole > roles ) {
753+
754+ Set <Peer > ret = new HashSet <>(getPeers ().size ());
755+
756+ for (PeerRole peerRole : roles ) {
757+ ret .addAll (peerRoleSetMap .get (peerRole ));
758+ }
759+ return Collections .unmodifiableCollection (ret );
760+ }
761+
651762 /**
652763 * Get the deploy wait time in seconds.
653764 *
@@ -2155,11 +2266,12 @@ public Collection<ProposalResponse> sendTransactionProposal(TransactionProposalR
21552266 * @throws InvalidArgumentException
21562267 * @throws ProposalException
21572268 */
2158-
21592269 public Collection <ProposalResponse > queryByChaincode (QueryByChaincodeRequest queryByChaincodeRequest ) throws InvalidArgumentException , ProposalException {
2160- return sendProposal (queryByChaincodeRequest , peers );
2270+ return queryByChaincode (queryByChaincodeRequest , peers );
21612271 }
21622272
2273+
2274+
21632275 /**
21642276 * Send Query proposal
21652277 *
0 commit comments