Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

调度定制化接口(0.9.4.1 及以下版本)

Longda edited this page Sep 1, 2014 · 1 revision

从JStorm 0.9.0 开始, JStorm 提供非常强大的调度功能, 基本上可以满足大部分的需求。

在学习如何使用新调度前, 麻烦先学习 JStorm 0.9.0介绍 提供哪些功能

概念

JStorm的资源不在是以前worker单独的一个端口, 而是以4个维度展现,CPU/Memory/Disk/Net

接口

申请更多的CPU slot

默认一个task,一个cpu slot 当task消耗更多的cpu时,可以申请更多cpu slot。 示例代码如下:


        int boltParal = get("bolt.parallel", 1);
        BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
        
        Map totalBoltConf = new HashMap();
        
        ConfigExtension.setCpuSlotsPerTask(totalBoltConf, 3);  // 申请每个task 3个cpu slot
        
        totalBolt.addConfigurations(totalBoltConf);

申请更多的Memory slot

默认一个task,一个memory slot 当task消耗更多的memory时,可以申请更多memory slot。 示例代码如下:


        int boltParal = get("bolt.parallel", 1);
        BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
        
        Map totalBoltConf = new HashMap();
        
        ConfigExtension.setMemSlotPerTask(totalBoltConf, 3); // 申请每个task 3个memory slot
        
        totalBolt.addConfigurations(totalBoltConf);

申请更多的Disk slot

默认task不申请磁盘slot,但当task 磁盘IO较重时,可以申请Disk slot 当task消耗更多的disk时,可以申请更多disk slot。 示例代码如下:


        int boltParal = get("bolt.parallel", 1);
        BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
        
        Map totalBoltConf = new HashMap();
        
        ConfigExtension.setTaskAllocDisk(totalBoltConf, true); // 申请磁盘slot
        
        totalBolt.addConfigurations(totalBoltConf);

一旦分配成功, 在task的prepare里面即可 获得分配的disk slot目录

public class TotalCount implements IRichBolt {
    ....

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        //...
        
        String diskSlot = ConfigExtension.getTaskAssignDiskSlot(stormConf);

        //...
    }

强制相同的task 运行在不同的节点

可以强制某个component的task 运行在不同的节点上


        int boltParal = get("bolt.parallel", 1);
        BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
        
        Map totalBoltConf = new HashMap();
        
        ConfigExtension.setTaskOnDifferentNode(totalBoltConf, true); // 申请task 运行在不同的节点上
        
        totalBolt.addConfigurations(totalBoltConf);

强制所有task 运行在一个节点

对于一些小应用,可以强制这个topology的所有task运行在同一个节点上,从而节省网络开销


        Config.setNumAckers(conf, ackerParal);
        ConfigExtension.setUseSingleNode(conf, true); // 强制所有的task 运行在同一个节点上
        StormSubmitter.submitTopology(streamName, conf,
                builder.createTopology());

自定义任务分配

在某些情况下,可以自定义某个component的task分配到某些特定机器的特定端口,当指定的端口被占时或指定的机器资源不够时,nimbus会降级默认分配算法

Nimbus 分配算法如下:

  • 优先使用自定义任务分配, 当资源无法满足需求时,该任务放到下一级任务分配算法
  • 使用历史任务分配算法,如果打开使用历史任务属性开关后,则使用该算法, 当资源无法满足需求时,该任务放到下一级任务分配算法。
  • 使用默认资源平衡算法, 计算每个supervisor上剩余资源权值, 取权值最高的supervisor进行分配。

示例代码如下:


        int boltParal = get("bolt.parallel", 1);
        BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
        
        Map totalBoltConf = new HashMap();
        List<ResourceAssignment> userDefineAssignments = new ArrayList<ResourceAssignment>();
        
        for (int i = 0, base = 150; i < boltParal; i++, base++) {
            ResourceAssignment assign = new ResourceAssignment();
            
            
            assign.setCpuSlotNum(2);
            assign.setMemSlotNum(2);
            assign.setPort(6800 + i);
            assign.setHostname("free-56-151.shucang.alipay.net"); // 
            
            userDefineAssignments.add(assign);
        }
        ConfigExtension.setUserDefineAssignment(totalBoltConf, userDefineAssignments); //申请使用自定义资源
        totalBolt.addConfigurations(totalBoltConf);

使用历史任务

可以预约上一次成功运行时的任务分配,上次task分配了什么资源,这次还是使用这些资源。 如果上次的资源被占或不能满足时,使用默认分配算法。

可以对某个component 使用该特性


        int boltParal = get("bolt.parallel", 1);
        BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
        
        Map totalBoltConf = new HashMap();
        
        ConfigExtension.setUseOldAssignment(totalBoltConf, true); // 申请task 运行使用上一次资源
        
        totalBolt.addConfigurations(totalBoltConf);

可以对整个topology使用该特性


        Config.setNumAckers(conf, ackerParal);
        ConfigExtension.setUseOldAssignment(totalBoltConf, true); // 对整个topology所有task 优先使用上次资源
        StormSubmitter.submitTopology(streamName, conf,
                builder.createTopology());
Clone this wiki locally