This repository has been archived by the owner on Apr 6, 2023. It is now read-only.
/
maybe_outfeed_queue.py
69 lines (56 loc) · 2.48 KB
/
maybe_outfeed_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# Copyright (c) 2020 Graphcore Ltd. All rights reserved.
from tensorflow.python.ipu import ipu_outfeed_queue
class MaybeOutfeedQueue:
"""A wrapper for an IPUOutfeedQueue.
This class allows key-value pairs to be selectively added to a dictionary that
can then be enqueued. If a filter is supplied then one of the filter elements
must be contained within the key.
Not tested with replication (behaviour unknown).
"""
def __init__(self, outfeed_mode=None, filters=None):
"""Construct a MaybeOutfeedQueue.
Args:
outfeed_mode: The outfeed_mode for the wrapped IPUOutfeedQueue.
filters: Optional list of strings. If not None then one of these strings
must be contained within the key for the key,value pair to be added to
the dictionary that will be enqueued.
"""
self._queue = ipu_outfeed_queue.IPUOutfeedQueue(outfeed_mode=outfeed_mode)
self._vals = {}
if filters is not None:
self._filters = filters[:]
else:
self._filters = None
def maybe_outfeed(self, key, value):
"""Potentially add the key,value pair to the internal dictionary.
If no filters were supplied or if one of the filter strings is
contained within the key (assumed to be a string) then
add the key,value pair to the dictionary that will be enqueued.
"""
if self._filters is not None:
if any(f in key for f in self._filters):
self._vals[key] = value
else:
self._vals[key] = value
def maybe_enqueue(self):
"""Potentially enqueue the internal dictionary on the wrapped IPUOutfeedQueue.
If the dictionary of key,value pairs contains at least one item then
enqueue the dictionary on the wrapped IPUOutfeedQueue and return the
enqueue op. Otherwise return None.
"""
if len(self._vals) > 0:
return self._queue.enqueue(self._vals)
else:
return None
def maybe_dequeue(self):
"""Potentially return the dequeue op for the wrapped IPUOutfeedQueue.
If the wrapped IPUOutfeedQueue has been enqueued, return the
dequeue op. Otherwise return None.
"""
if self._queue.enqueued:
return self._queue.dequeue()
else:
return None
def dequeue(self):
"""Return the dequeue op for the wrapped IPUOutfeedQueue."""
return self._queue.dequeue()