In [1]:
import ipaddress

def ip_range_from_stream(stream):
    stream = stream.lstrip("spead://")
    ip_range, port = stream.split(":")
    port = int(port)
    try:
        base_ip, ip_count = ip_range.split("+")
        ip_count = int(ip_count)
    except ValueError:
        base_ip, ip_count = ip_range, 1
    return [ipaddress.ip_address(unicode(base_ip))+i for i in range(ip_count)], port

In [2]:
ips,_ = ip_range_from_stream("spead://123.1.1.1+10:5000")

In [3]:
ips

[IPv4Address(u'123.1.1.1'),
 IPv4Address(u'123.1.1.2'),
 IPv4Address(u'123.1.1.3'),
 IPv4Address(u'123.1.1.4'),
 IPv4Address(u'123.1.1.5'),
 IPv4Address(u'123.1.1.6'),
 IPv4Address(u'123.1.1.7'),
 IPv4Address(u'123.1.1.8'),
 IPv4Address(u'123.1.1.9'),
 IPv4Address(u'123.1.1.10')]

In [4]:
ips.reverse()

In [36]:
def ip_range_from_stream(stream):
    stream = stream.lstrip("spead://")
    ip_range, port = stream.split(":")
    port = int(port)
    try:
        base_ip, ip_count = ip_range.split("+")
        ip_count = int(ip_count)
    except ValueError:
        base_ip, ip_count = ip_range, 1
    return ContiguousIpRange(base_ip, port, ip_count)

class IpRangeAllocationError(Exception):
    pass

class ContiguousIpRange(object):
    def __init__(self, base_ip, port, count):
        self._base_ip = ipaddress.ip_address(unicode(base_ip))
        self._ips = [self._base_ip+ii for ii in range(count)]
        self._port = port
        self._count = count

    @property
    def count(self):
        return self._count

    @property
    def port(self):
        return self._port

    @property
    def base_ip(self):
        return self._base_ip

    def index(self, ip):
        return self._ips.index(ip)

    def __hash__(self):
        return hash(self.format_katcp())

    def __iter__(self):
        return self._ips.__iter__()

    def __repr__(self):
        return "<{} {}>".format(self.__class__.__name__, self.format_katcp())

    def format_katcp(self):
        return "spead://{}+{}:{}".format(str(self._base_ip), self._count, self._port)


class IpRangeManager(object):
    def __init__(self, ip_range):
        self._ip_range = ip_range
        self._allocated = [False for _ in ip_range]
        self._allocated_ranges = set()

    def __repr__(self):
        return "<{} {}>".format(self.__class__.__name__, self._ip_range.format_katcp())

    def _free_ranges(self):
        state_ranges = {True:[], False:[]}
        def find_state_range(idx, state):
            start_idx = idx
            while idx < len(self._allocated):
                if self._allocated[idx] == state:
                    idx+=1
                else:
                    state_ranges[state].append((start_idx, idx-start_idx))
                    return find_state_range(idx, not state)
            else:
                state_ranges[state].append((start_idx, idx-start_idx))
        find_state_range(0, self._allocated[0])
        return state_ranges[False]

    def allocate(self, n):
        ranges = self._free_ranges()
        best_fit = None
        for start,span in ranges:
            if span<n:
                continue
            elif best_fit is None:
                best_fit = (start, span)
            elif (span-n) < (best_fit[1]-n):
                best_fit = (start, span)
        if best_fit is None:
            raise IpRangeAllocationError("Could not allocate contiguous range of {} addresses".format(n))
        else:
            start,span = best_fit
            for ii in range(n):
                offset = start+ii
                self._allocated[offset] = True
            allocated_range = ContiguousIpRange(str(self._ip_range.base_ip + start), self._ip_range.port, n)
            self._allocated_ranges.add(allocated_range)
            return allocated_range

    def free(self, ip_range):
        self._allocated_ranges.remove(ip_range)
        for ip in ip_range:
            self._allocated[self._ip_range.index(ip)] = False

        
        

In [37]:
x = IpRangeManager(ip_range_from_stream('spead://239.1.1.150+128:7147'))
print x

<IpRangeManager spead://239.1.1.150+128:7147>


In [38]:
x._allocated[5:9] = [True for _ in range(4)]
x._allocated[10:11] = [True for _ in range(1)]
x._allocated[56:77] = [True for _ in range(77-56)]

In [39]:
x = IpRangeManager(ip_range_from_stream('spead://239.1.1.150+128:7147'))
x._allocated[5:9] = [True for _ in range(4)]
x._allocated[10:11] = [True for _ in range(1)]
x._allocated[56:77] = [True for _ in range(77-56)]
print x._free_ranges()
a = x.allocate(40)
print x._allocated_ranges
print x

[(0, 5), (9, 1), (11, 45), (77, 51)]
set([<ContiguousIpRange spead://239.1.1.161+40:7147>])
<IpRangeManager spead://239.1.1.150+128:7147>


In [40]:
print a.format_katcp()

spead://239.1.1.161+40:7147


In [41]:
print x._free_ranges()
x.free(a)

[(0, 5), (9, 1), (51, 5), (77, 51)]


In [42]:
print x._free_ranges()
print x._allocated_ranges

[(0, 5), (9, 1), (11, 45), (77, 51)]
set([])


In [12]:
x = [1,2,3]
x.index(2)
a = x.__iter__()

In [13]:
dir(a)

['__class__',
 '__delattr__',
 '__doc__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__iter__',
 '__length_hint__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 'next']

In [32]:
import numpy as np
import logging

logging.basicConfig(level=logging.DEBUG)
log = logging

class Config(object):
    def __init__(self):
        self.n_servers = 64
        self.n_mcast_groups = 128
        
    def _sanitize_sb_configuration(self,  tscrunch, fscrunch, desired_nbeams, beam_granularity=None):

        # What are the key constraints:
        # 1. The data rate per multicast group
        # 2. The aggregate data rate out of the instrument (not as important)
        # 3. The processing limitations (has to be determined empirically)
        # 4. The number of multicast groups available
        # 5. The possible numbers of beams per multicast group (such that TUSE can receive N per node)
        # 6. Need to use at least 16 multicast groups
        # 7. Should have even flow across multicast groups, so same number of beams in each
        # 8. Multicast groups should be contiguous


        # Constants for data rates and bandwidths
        # these are hardcoded here for the moment but ultimately
        # they should be moved to a higher level or even dynamically
        # specified
        MAX_RATE_PER_MCAST = 6.8e9 # bits/s
        MAX_RATE_PER_SERVER = 4.375e9 # bits/s, equivalent to 280 Gb/s over 64 (virtual) nodes
        MAX_OUTPUT_RATE = 280.0e9 # bits/s
        BANDWIDTH = 856e6 # MHz

        # Calculate the data rate for each beam assuming 8-bit packing and
        # no metadata overheads
        data_rate_per_beam = BANDWIDTH / tscrunch / fscrunch * 8 # bits/s
        log.debug("Data rate per coherent beam: {} Gb/s".format(data_rate_per_beam/1e9))

        if data_rate_per_beam > MAX_RATE_PER_MCAST:
            raise Exception("Data rate per beam is greater than the data rate per multicast group")
            
        if data_rate_per_beam * desired_nbeams > MAX_OUTPUT_RATE:
            desired_nbeams = MAX_OUTPUT_RATE // data_rate_per_beam
            log.warning("Aggregate data rate larger than supported, reducing nbeams to {}".format(desired_nbeams))
             
        # Calculate the maximum number of beams that will fit in one multicast
        # group assuming. Each multicast group must be receivable on a 10 GbE
        # connection so the max rate must be < 8 Gb/s
        max_beams_per_mcast = MAX_RATE_PER_MCAST // data_rate_per_beam
        log.debug("Maximum number of beams per multicast group: {}".format(int(max_beams_per_mcast)))

        # For instuments such as TUSE, they require a fixed number of beams per node. For their
        # case we assume that they will only acquire one multicast group per node and as such
        # the minimum number of beams per multicast group should be whatever TUSE requires.
        # Multicast groups can contain more beams than this but only in integer multiples of
        # the minimum
        if beam_granularity:
            if max_beams_per_mcast < beam_granularity:
                log.warning("Cannot fit {} beams into one multicast group, updating number of beams per multicast group to {}".format(
                    beam_granularity, max_beams_per_mcast))
                while np.modf(beam_granularity/max_beams_per_mcast)[0] != 0.0:
                    max_beams_per_mcast -= 1
                beam_granularity = max_beams_per_mcast
            beams_per_mcast = beam_granularity * (max_beams_per_mcast // beam_granularity)
            log.debug("Number of beams per multicast group, accounting for granularity: {}".format(int(beams_per_mcast)))
        else:
            beams_per_mcast = max_beams_per_mcast

        # Calculate the total number of beams that could be produced assuming the only
        # rate limit was that limit per multicast groups
        max_beams = self.n_mcast_groups * beams_per_mcast
        log.debug("Maximum possible beams (assuming on multicast group rate limit): {}".format(max_beams))

        if desired_nbeams > max_beams:
            log.warning("Requested number of beams is greater than theoretical maximum, "
                "updating setting the number of beams of beams to {}".format(max_beams))
            desired_nbeams = max_beams

        # Calculate the total number of multicast groups that are required to satisfy
        # the requested number of beams
        num_mcast_groups_required = round(desired_nbeams / beams_per_mcast + 0.5)
        log.debug("Number of multicast groups required for {} beams: {}".format(desired_nbeams, num_mcast_groups_required))
        actual_nbeams = num_mcast_groups_required * beams_per_mcast
        nmcast_groups = num_mcast_groups_required

        # Now we need to check the server rate limits
        if (actual_nbeams * data_rate_per_beam)/self.n_servers > MAX_RATE_PER_SERVER:
            log.warning("Number of beams limited by output data rate per server")
        actual_nbeams = MAX_RATE_PER_SERVER*self.n_servers // data_rate_per_beam
        log.info("Number of beams that can be generated: {}".format(actual_nbeams))
        return actual_nbeams

In [33]:
x = Config()
x._sanitize_sb_configuration(5,1,16,6)

DEBUG:root:Data rate per coherent beam: 1.3696 Gb/s
DEBUG:root:Maximum number of beams per multicast group: 4
DEBUG:root:Number of beams per multicast group, accounting for granularity: 3
DEBUG:root:Maximum possible beams (assuming on multicast group rate limit): 384.0
DEBUG:root:Number of multicast groups required for 16 beams: 6.0
INFO:root:Number of beams that can be generated: 204.0


204.0

In [35]:
class MulticastGroupConfig(object):
    def __init__(self, data_rate_per_beam, granularity):
        self.data_rate_per_beam = data_rate_per_beam
        self.granularity = granularity
        self.group_count = 16
    
    
    
    def add_group(self):
        prior_nbeams = self.group_count * self.beam_count
        self.group_count += 1
        self.beam_count = round(prior_nbeams / self.group_count + 0.5)
    
    def nbeams(self):
        return self.group_count * self.beam_count
    
    def add_beam(self):
        self.beam_count+=1
        
    def remove_beam(self):
        self.beam_count-=1
        
    def matches_granularity(self, granularity):
        if self.beam_count % granularity == 0:
            return True
        elif granularity % self.beam_count == 0:
            return True
        else:
            return False
        

def test():
    group = MulticastGroup()
        
class MulticastConfiguration(object):
    def __init__(self, nservers, max_groups):
        self.nservers = nservers
        self.max_groups = max_groups
        self.base_groups = [[] for _ in range(16)]
        
    def config(self, tscrunch, fscrunch, desired_nbeams, beam_granularity):
        MAX_RATE_PER_MCAST = 6.8e9 # bits/s
        MAX_RATE_PER_SERVER = 4.375e9 # bits/s, equivalent to 280 Gb/s over 64 (virtual) nodes
        BANDWIDTH = 856e6 # MHz

        # Calculate the data rate for each beam assuming 8-bit packing and
        # no metadata overheads
        data_rate_per_beam = BANDWIDTH / tscrunch / fscrunch * 8 # bits/s
        log.debug("Data rate per coherent beam: {} Gb/s".format(data_rate_per_beam/1e9))
        # Calculate the maximum number of beams that will fit in one multicast
        # group assuming. Each multicast group must be receivable on a 10 GbE
        # connection so the max rate must be < 8 Gb/s
        max_beams_per_mcast = MAX_RATE_PER_MCAST // data_rate_per_beam
        log.debug("Maximum number of beams per multicast group: {}".format(int(max_beams_per_mcast)))

        if max_beams_per_mcast == 0:
            raise Exception("Data rate per beam is greater than the data rate per multicast group")
        
        
    