Skip to content

Commit

Permalink
Merge pull request #2114, enable configuration of Consumer thread pool.
Browse files Browse the repository at this point in the history
Fixes #2013
  • Loading branch information
tswstarplanet authored and chickenlj committed Jul 25, 2018
1 parent eed3db3 commit ed4384a
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 0 deletions.
Expand Up @@ -31,6 +31,18 @@ public class ConsumerConfig extends AbstractReferenceConfig {
// networking framework client uses: netty, mina, etc.
private String client;

// consumer thread pool type: cached, fixed, limit, eager
private String threadpool;

// consumer threadpool core thread size
private Integer corethreads;

// consumer threadpool thread size
private Integer threads;

// consumer threadpool queue size
private Integer queues;

@Override
public void setTimeout(Integer timeout) {
super.setTimeout(timeout);
Expand All @@ -56,4 +68,40 @@ public String getClient() {
public void setClient(String client) {
this.client = client;
}

public String getThreadpool() {
return threadpool;
}

public void setThreadpool(String threadpool) {
this.threadpool = threadpool;
}

public Boolean getDefault() {
return isDefault;
}

public Integer getCorethreads() {
return corethreads;
}

public void setCorethreads(Integer corethreads) {
this.corethreads = corethreads;
}

public Integer getThreads() {
return threads;
}

public void setThreads(Integer threads) {
this.threads = threads;
}

public Integer getQueues() {
return queues;
}

public void setQueues(Integer queues) {
this.queues = queues;
}
}
Expand Up @@ -50,4 +50,32 @@ public void testClient() throws Exception {
consumer.setClient("client");
assertThat(consumer.getClient(), equalTo("client"));
}

@Test
public void testThreadpool() throws Exception {
ConsumerConfig consumer = new ConsumerConfig();
consumer.setThreadpool("fixed");
assertThat(consumer.getThreadpool(), equalTo("fixed"));
}

@Test
public void testCorethreads() throws Exception {
ConsumerConfig consumer = new ConsumerConfig();
consumer.setCorethreads(10);
assertThat(consumer.getCorethreads(), equalTo(10));
}

@Test
public void testThreads() throws Exception {
ConsumerConfig consumer = new ConsumerConfig();
consumer.setThreads(20);
assertThat(consumer.getThreads(), equalTo(20));
}

@Test
public void testQueues() throws Exception {
ConsumerConfig consumer = new ConsumerConfig();
consumer.setQueues(5);
assertThat(consumer.getQueues(), equalTo(5));
}
}
Expand Up @@ -708,6 +708,26 @@
<xsd:documentation><![CDATA[ Transporter layer framework: netty mina.... ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="threadpool" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ Consumer threadpool: cached, fixed, limited, eager]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="corethreads" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The thread pool core threads size. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="threads" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The thread pool size. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="queues" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The thread pool queue size. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:extension>
</xsd:complexContent>
Expand Down
Expand Up @@ -708,6 +708,26 @@
<xsd:documentation><![CDATA[ Transporter layer framework: netty mina.... ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="threadpool" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ Consumer threadpool: cached, fixed, limited, eager]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="corethreads" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The thread pool core threads size. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="threads" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The thread pool size. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="queues" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[ The thread pool queue size. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:extension>
</xsd:complexContent>
Expand Down

0 comments on commit ed4384a

Please sign in to comment.