Skip to content
This repository has been archived by the owner on Aug 20, 2021. It is now read-only.

Commit

Permalink
新建 ThreadUtil.execute(List<T>, PartitionThreadConfig, Map<String, ?>,
Browse files Browse the repository at this point in the history
PartitionPerHandler<T>) fix #803

新建 ThreadUtil.execute(List<T>, int, Map<String, ?>,
PartitionPerHandler<T>) fix #802

新建 ThreadUtil.execute(List<T>, int, PartitionPerHandler<T>) fix #801
  • Loading branch information
venusdrogon committed Sep 6, 2019
1 parent bf45176 commit 6d184c4
Show file tree
Hide file tree
Showing 17 changed files with 682 additions and 31 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -5,7 +5,7 @@ feilong core 让Java开发更简便的工具包
![JDK 1.7](https://img.shields.io/badge/JDK-1.7-green.svg "JDK 1.7")
[![jar size 110K](https://img.shields.io/badge/size-110K-green.svg "size 110K")](https://github.com/venusdrogon/feilong-platform/tree/repository/com/feilong/platform/feilong-core/1.14.0)
[![javadoc 83%](http://progressed.io/bar/83?title=javadoc "javadoc 83%")](http://venusdrogon.github.io/feilong-platform/javadocs/feilong-core/)
[![tests 2164](https://img.shields.io/badge/tests-2164%20%2F%202164-green.svg "tests 2164")](https://github.com/venusdrogon/feilong-core/tree/master/src/test/java/com/feilong/core)
[![tests 2192](https://img.shields.io/badge/tests-2192%20%2F%202192-green.svg "tests 2192")](https://github.com/venusdrogon/feilong-core/tree/master/src/test/java/com/feilong/core)
![Coverage 91%](http://progressed.io/bar/91?title=Coverage "Coverage 91%")

![sonar](http://venusdrogon.github.io/feilong-platform/mysource/sonar/feilong-core-summary2.jpg)
Expand Down
30 changes: 15 additions & 15 deletions src/main/java/com/feilong/core/lang/ThreadUtil.java
Expand Up @@ -15,7 +15,6 @@
*/
package com.feilong.core.lang;

import static com.feilong.core.Validator.isNullOrEmpty;
import static com.feilong.core.date.DateExtensionUtil.formatDuration;
import static com.feilong.core.date.DateUtil.now;

Expand All @@ -31,10 +30,10 @@
import com.feilong.core.TimeInterval;
import com.feilong.core.lang.thread.DefaultPartitionRunnableBuilder;
import com.feilong.core.lang.thread.DefaultPartitionThreadExecutor;
import com.feilong.core.lang.thread.PartitionConfig;
import com.feilong.core.lang.thread.PartitionEachSizeEntityUtil;
import com.feilong.core.lang.thread.PartitionEachSizeThreadConfigBuilder;
import com.feilong.core.lang.thread.PartitionPerHandler;
import com.feilong.core.lang.thread.PartitionRunnableBuilder;
import com.feilong.core.lang.thread.PartitionThreadConfig;

/**
* 线程相关工具类.
Expand Down Expand Up @@ -504,7 +503,7 @@ public static <T> void execute(List<T> list,int eachSize,PartitionRunnableBuilde
* 如果 <code>list</code> 是null,抛出 {@link NullPointerException}<br>
* 如果 <code>list</code> 是empty,抛出 {@link IllegalArgumentException}<br>
* 如果 {@code eachSize <=0} ,抛出 {@link IllegalArgumentException}<br>
* 如果 <code>partitionRunnableBuilder</code> 是null,抛出 {@link NullPointerException}<br>
* 如果 <code>partitionPerHandler</code> 是null,抛出 {@link NullPointerException}<br>
* </p>
* </blockquote>
*
Expand Down Expand Up @@ -780,7 +779,7 @@ public static <T> void execute(List<T> list,int eachSize,Map<String, ?> paramsMa

/**
* 给定一个待解析的 <code>list</code>,设定每个线程执行多少条 <code>eachSize</code>,传入一些额外的参数 <code>paramsMap</code>,使用自定义的
* <code>partitionRunnableBuilder</code>,自动<span style="color:green">构造多条线程</span>并运行.
* <code>partitionPerHandler</code>,自动<span style="color:green">构造多条线程</span>并运行.
*
* <h3>适用场景:</h3>
* <blockquote>
Expand Down Expand Up @@ -985,7 +984,7 @@ public static <T> void execute(List<T> list,int eachSize,Map<String, ?> paramsMa
* 如果 <code>list</code> 是null,抛出 {@link NullPointerException}<br>
* 如果 <code>list</code> 是empty,抛出 {@link IllegalArgumentException}<br>
* 如果 {@code eachSize <=0} ,抛出 {@link IllegalArgumentException}<br>
* 如果 <code>partitionRunnableBuilder</code> 是null,抛出 {@link NullPointerException}<br>
* 如果 <code>partitionPerHandler</code> 是null,抛出 {@link NullPointerException}<br>
* </p>
* </blockquote>
*
Expand Down Expand Up @@ -1018,6 +1017,7 @@ public static <T> void execute(List<T> list,int eachSize,Map<String, ?> paramsMa
* @since 2.0.0
*/
public static <T> void execute(List<T> list,int eachSize,Map<String, ?> paramsMap,PartitionPerHandler<T> partitionPerHandler){
Validate.notNull(partitionPerHandler, "partitionPerHandler can't be null!");
execute(list, eachSize, paramsMap, new DefaultPartitionRunnableBuilder<T>(partitionPerHandler));
}

Expand Down Expand Up @@ -1223,7 +1223,7 @@ public static <T> void execute(List<T> list,int eachSize,Map<String, ?> paramsMa
* <p>
* 如果 <code>list</code> 是null,抛出 {@link NullPointerException}<br>
* 如果 <code>list</code> 是empty,抛出 {@link IllegalArgumentException}<br>
* 如果 {@code eachSize <=0} ,抛出 {@link IllegalArgumentException}<br>
* 如果 <code>partitionThreadConfig</code> 是null,抛出 {@link NullPointerException}<br>
* 如果 <code>partitionPerHandler</code> 是null,抛出 {@link NullPointerException}<br>
* </p>
* </blockquote>
Expand All @@ -1236,7 +1236,7 @@ public static <T> void execute(List<T> list,int eachSize,Map<String, ?> paramsMa
* <p>
* 比如 100000个 User,不能为null或者empty
* </p>
* @param partitionConfig
* @param partitionThreadConfig
* the partition config
* @param paramsMap
* 自定义的相关参数
Expand All @@ -1247,21 +1247,19 @@ public static <T> void execute(List<T> list,int eachSize,Map<String, ?> paramsMa
* </p>
* @param partitionPerHandler
* the partition per handler
* @see com.feilong.core.lang.thread.DefaultPartitionThreadExecutor
* @see com.feilong.core.lang.thread.DefaultPartitionThreadExecutor#INSTANCE
* @since 2.0.0
*/
public static <T> void execute(
List<T> list,
PartitionConfig partitionConfig,
PartitionThreadConfig partitionThreadConfig,
Map<String, ?> paramsMap,
PartitionPerHandler<T> partitionPerHandler){
if (isNullOrEmpty(list)){
return;
}
Validate.notNull(partitionConfig, "partitionConfig can't be null!");
Validate.notEmpty(list, "list can't be null/empty!");
Validate.notNull(partitionThreadConfig, "partitionConfig can't be null!");
Validate.notNull(partitionPerHandler, "partitionPerHandler can't be null!");
//---------------------------------------------------------------
int eachSize = PartitionEachSizeEntityUtil.build(list.size(), partitionConfig);
int eachSize = new PartitionEachSizeThreadConfigBuilder(partitionThreadConfig).build(list.size());
execute(list, eachSize, paramsMap, new DefaultPartitionRunnableBuilder<T>(partitionPerHandler));
}

Expand All @@ -1286,6 +1284,8 @@ public static <T> void execute(
public static void startAndJoin(Thread[] threads){
Validate.notEmpty(threads, "threads can't be null/empty!");

//---------------------------------------------------------------

for (Thread thread : threads){
thread.start();// 使该线程开始执行;Java 虚拟机调用该线程的 run 方法。
LOGGER.debug("thread [{}] start", thread.getName());
Expand Down
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2008 feilong
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.feilong.core.lang.thread;

/**
* 用来计算 each size 大小的.
*
* @author <a href="http://feitianbenyue.iteye.com/">feilong</a>
* @since 2.0.0
*/
public interface PartitionEachSizeBuilder{

/**
* Builds the.
*
* @param totalSize
* the total size
* @return 如果 <code>totalSize<=0</code> 是empty,抛出 {@link IllegalArgumentException}<br>
*/
int build(int totalSize);
}
@@ -0,0 +1,115 @@
/*
* Copyright (C) 2008 feilong
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.feilong.core.lang.thread;

import static com.feilong.core.bean.ConvertUtil.toInteger;

import java.math.RoundingMode;

import org.apache.commons.lang3.Validate;

import com.feilong.core.lang.NumberUtil;

/**
* 用来计算 each size 大小的.
*
* @author <a href="http://feitianbenyue.iteye.com/">feilong</a>
* @since 2.0.0
*/
public class PartitionEachSizeThreadConfigBuilder implements PartitionEachSizeBuilder{

/** The partition thread config. */
private PartitionThreadConfig partitionThreadConfig;

//---------------------------------------------------------------

/**
* Instantiates a new partition each size thread config builder.
*/
public PartitionEachSizeThreadConfigBuilder(){
super();
}

/**
* Instantiates a new partition each size thread config builder.
*
* @param partitionThreadConfig
* the partition thread config
*/
public PartitionEachSizeThreadConfigBuilder(PartitionThreadConfig partitionThreadConfig){
super();
this.partitionThreadConfig = partitionThreadConfig;
}

//---------------------------------------------------------------
/**
* 构造每个分区大小.
*
* @param totalSize
* the total size
* @return 如果 <code>totalSize<=0</code> 是empty,抛出 {@link IllegalArgumentException}<br>
* 如果 <code>partitionConfig</code> 是null,抛出 {@link NullPointerException}<br>
* 如果 <code>partitionConfig.maxThreadCount<=0</code> 是empty,抛出 {@link IllegalArgumentException}<br>
* 如果 <code>partitionConfig.minPerThreadHandlerCount<=0</code> 是empty,抛出 {@link IllegalArgumentException}<br>
*
* 如果 <code>totalSize</code> 小于等于 <code>minPerThreadHandlerCount</code>(每个线程最少处理数量),那么直接返回<code>totalSize</code>,也就是说接下来开 1
* 个线程就足够了<br>
*/
@Override
public int build(int totalSize){
Validate.isTrue(totalSize > 0, "totalSize must >0,totalSize:%s", totalSize);
Validate.notNull(partitionThreadConfig, "partitionConfig can't be null!");
//---------------------------------------------------------------
//启动最大线程数
int maxThreadCount = partitionThreadConfig.getMaxThreadCount();
// 每个线程最少处理数量
int minPerThreadHandlerCount = partitionThreadConfig.getMinPerThreadHandlerCount();

Validate.isTrue(maxThreadCount > 0, "maxThreadCount must >0,totalSize:%s", maxThreadCount);
Validate.isTrue(minPerThreadHandlerCount > 0, "minPerThreadHandlerCount must >0,totalSize:%s", minPerThreadHandlerCount);
//---------------------------------------------------------------
//如果 totalSize 小于等于 minPerThreadHandlerCount(每个线程最少处理数量), 那么直接返回totalSize ,也就是说接下来开 1 个线程就足够了
if (totalSize <= minPerThreadHandlerCount){
return totalSize;
}

//---------------------------------------------------------------
//用总数 totalSize 除以 minPerThreadHandlerCount (每个线程最少处理数量),向上取整
int threadCount = toInteger(NumberUtil.getDivideValue(totalSize, minPerThreadHandlerCount, 0, RoundingMode.UP));
//如果 算出来的线程数 要超过 maxThreadCount,比如 总数 100 条, 但是设置的minPerThreadHandlerCount (每个线程最少处理数量) 是 20, 最大线程数是 4 ,此时 100/20 是 5 ,大约最大线程数 4

//那么最大只能开启 4 个线程, 返回的每个线程处理的数量是 100/4 =25
if (threadCount >= maxThreadCount){
return toInteger(NumberUtil.getDivideValue(totalSize, maxThreadCount, 0, RoundingMode.UP));
}

//否则返回 threadCount
return threadCount;
}

//---------------------------------------------------------------

/**
* Sets the partition thread config.
*
* @param partitionThreadConfig
* the partitionThreadConfig to set
*/
public void setPartitionThreadConfig(PartitionThreadConfig partitionThreadConfig){
this.partitionThreadConfig = partitionThreadConfig;
}

}
126 changes: 126 additions & 0 deletions src/main/java/com/feilong/core/lang/thread/PartitionThreadConfig.java
@@ -0,0 +1,126 @@
/*
* Copyright (C) 2008 feilong
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.feilong.core.lang.thread;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

/**
* 分区控制.
*
* @author <a href="http://feitianbenyue.iteye.com/">feilong</a>
* @since 2.0.0
*/
public class PartitionThreadConfig{

/** Static instance. */
// the static instance works for all types
public static final PartitionThreadConfig INSTANCE = new PartitionThreadConfig();

//---------------------------------------------------------------

/** 启动最大线程数. */
private int maxThreadCount = 100;

/** 每个线程最少处理数量. */
private int minPerThreadHandlerCount = 20;

//---------------------------------------------------------------

/**
* Instantiates a new partition each size entity.
*/
public PartitionThreadConfig(){
super();
}

/**
* Instantiates a new partition each size entity.
*
* @param minPerThreadHandlerCount
* 每个线程最少处理数量
*/
public PartitionThreadConfig(int minPerThreadHandlerCount){
super();
this.minPerThreadHandlerCount = minPerThreadHandlerCount;
}

/**
* Instantiates a new partition each size entity.
*
* @param maxThreadCount
* 启动最大线程数
* @param minPerThreadHandlerCount
* 每个线程最少处理数量
*/
public PartitionThreadConfig(int maxThreadCount, int minPerThreadHandlerCount){
super();
this.maxThreadCount = maxThreadCount;
this.minPerThreadHandlerCount = minPerThreadHandlerCount;
}

//---------------------------------------------------------------

/**
* 获得 启动最大线程数.
*
* @return the maxThreadCount
*/
public int getMaxThreadCount(){
return maxThreadCount;
}

/**
* 设置 启动最大线程数.
*
* @param maxThreadCount
* the maxThreadCount to set
*/
public void setMaxThreadCount(int maxThreadCount){
this.maxThreadCount = maxThreadCount;
}

/**
* 获得 每个线程最少处理数量.
*
* @return the minPerThreadHandlerCount
*/
public int getMinPerThreadHandlerCount(){
return minPerThreadHandlerCount;
}

/**
* 设置 每个线程最少处理数量.
*
* @param minPerThreadHandlerCount
* the minPerThreadHandlerCount to set
*/
public void setMinPerThreadHandlerCount(int minPerThreadHandlerCount){
this.minPerThreadHandlerCount = minPerThreadHandlerCount;
}

//---------------------------------------------------------------
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString(){
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}

}

0 comments on commit 6d184c4

Please sign in to comment.