Skip to content

Commit

Permalink
adds loopy scheduler v2
Browse files Browse the repository at this point in the history
  • Loading branch information
kaushikcfd authored and inducer committed Jun 2, 2023
1 parent 2c78a9c commit 0f478dd
Showing 1 changed file with 199 additions and 31 deletions.
230 changes: 199 additions & 31 deletions loopy/schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,161 @@ def is_similar_to_template(insn):

# {{{ scheduling algorithm

def _generate_loop_schedules_internal(
sched_state, debug=None):
def _get_outermost_diverging_inames(tree, within1, within2):
"""
For loop nestings *within1* and *within2*, returns the first inames at which
the loops nests diverge in the loop nesting tree *tree*.
:arg tree: A :class:`loopy.tools.Tree` of inames, denoting a loop nesting.
:arg within1: A :class:`frozenset` of inames.
:arg within2: A :class:`frozenset` of inames.
"""
common_ancestors = (within1 & within2) | {""}

innermost_parent = max(common_ancestors,
key=lambda k: tree.depth(k))
iname1, = tree.children(innermost_parent) & within1
iname2, = tree.children(innermost_parent) & within2

return iname1, iname2


class V2SchedulerNotImplementedException(RuntimeError):
pass


def generate_loop_schedules_v2(kernel):
from loopy.schedule.tools import get_loop_nest_tree
from functools import reduce
from pytools.graph import compute_topological_order
from loopy.kernel.data import ConcurrentTag, IlpBaseTag, VectorizeTag

concurrent_inames = {iname for iname in kernel.all_inames()
if kernel.iname_tags_of_type(iname, ConcurrentTag)}
ilp_inames = {iname for iname in kernel.all_inames()
if kernel.iname_tags_of_type(iname, IlpBaseTag)}
vec_inames = {iname for iname in kernel.all_inames()
if kernel.iname_tags_of_type(iname, VectorizeTag)}
parallel_inames = (concurrent_inames - ilp_inames - vec_inames)

# {{{ can v2 scheduler handle??

if any(len(insn.conflicts_with_groups) != 0 for insn in kernel.instructions):
raise V2SchedulerNotImplementedException("v2 scheduler cannot schedule"
" kernels with instruction having conflicts with groups.")

if any(insn.priority != 0 for insn in kernel.instructions):
raise V2SchedulerNotImplementedException("v2 scheduler cannot schedule"
" kernels with instruction priorities set.")

if kernel.schedule is not None:
# cannnot handle preschedule yet
raise V2SchedulerNotImplementedException("v2 scheduler cannot schedule"
" prescheduled kernels.")

if ilp_inames or vec_inames:
raise V2SchedulerNotImplementedException("v2 scheduler cannot schedule"
" loops tagged with 'ilp'/'vec' as they are not guaranteed to"
" be single entry loops.")

# }}}

loop_nest_tree = get_loop_nest_tree(kernel)

# loop_inames: inames that are realized as loops. Concurrent inames aren't
# realized as a loop in the generated code for a loopy.TargetBase.
loop_inames = (reduce(frozenset.union, (insn.within_inames
for insn in kernel.instructions),
frozenset())
- parallel_inames)

# The idea here is to build a DAG, where nodes are schedule items and if
# there exists an edge from schedule item A to schedule item B in the DAG =>
# B *must* come after A in the linearized result.

dag = {}

# LeaveLoop(i) *must* follow EnterLoop(i)
dag.update({EnterLoop(iname=iname): frozenset({LeaveLoop(iname=iname)})
for iname in loop_inames})
dag.update({LeaveLoop(iname=iname): frozenset()
for iname in loop_inames})
dag.update({RunInstruction(insn_id=insn.id): frozenset()
for insn in kernel.instructions})

# {{{ add constraints imposed by the loop nesting

for outer_loop in loop_nest_tree.nodes():
if outer_loop == "":
continue

for child in loop_nest_tree.children(outer_loop):
inner_loop = child
dag[EnterLoop(iname=outer_loop)] |= {EnterLoop(iname=inner_loop)}
dag[LeaveLoop(iname=inner_loop)] |= {LeaveLoop(iname=outer_loop)}

# }}}

# {{{ add deps. b/w schedule items coming from insn. depepdencies

for insn in kernel.instructions:
insn_loop_inames = insn.within_inames & loop_inames
for dep_id in insn.depends_on:
dep = kernel.id_to_insn[dep_id]
dep_loop_inames = dep.within_inames & loop_inames
# Enforce instruction dep:
dag[RunInstruction(insn_id=dep_id)] |= {RunInstruction(insn_id=insn.id)}

# {{{ register deps on loop entry/leave because of insn. deps

if dep_loop_inames < insn_loop_inames:
for iname in insn_loop_inames - dep_loop_inames:
dag[RunInstruction(insn_id=dep.id)] |= {EnterLoop(iname=iname)}
elif insn_loop_inames < dep_loop_inames:
for iname in dep_loop_inames - insn_loop_inames:
dag[LeaveLoop(iname=iname)] |= {RunInstruction(insn_id=insn.id)}
elif dep_loop_inames != insn_loop_inames:
insn_iname, dep_iname = _get_outermost_diverging_inames(
loop_nest_tree, insn_loop_inames, dep_loop_inames)
dag[LeaveLoop(iname=dep_iname)] |= {EnterLoop(iname=insn_iname)}
else:
pass

# }}}

for iname in insn_loop_inames:
# For an insn within a loop nest 'i'
# for i
# insn
# end i
# 'insn' *must* come b/w 'for i' and 'end i'
dag[EnterLoop(iname=iname)] |= {RunInstruction(insn_id=insn.id)}
dag[RunInstruction(insn_id=insn.id)] |= {LeaveLoop(iname=iname)}

# }}}

def iname_key(iname):
all_ancestors = sorted(loop_nest_tree.ancestors(iname),
key=lambda x: loop_nest_tree.depth(x))
return ",".join(all_ancestors+[iname])

def key(x):
if isinstance(x, RunInstruction):
iname = max((kernel.id_to_insn[x.insn_id].within_inames & loop_inames),
key=lambda k: loop_nest_tree.depth(k),
default="")
result = (iname_key(iname), x.insn_id)
elif isinstance(x, (EnterLoop, LeaveLoop)):
result = (iname_key(x.iname),)
else:
raise NotImplementedError

return result

return compute_topological_order(dag, key=key)


def _generate_loop_schedules_internal(sched_state, debug=None):
# allow_insn is set to False initially and after entering each loop
# to give loops containing high-priority instructions a chance.
kernel = sched_state.kernel
Expand Down Expand Up @@ -2019,6 +2172,41 @@ def generate_loop_schedules(
callables_table, debug_args=debug_args)


def postprocess_schedule(kernel, callables_table, gen_sched):
from loopy.kernel import KernelState

gen_sched = convert_barrier_instructions_to_barriers(
kernel, gen_sched)

gsize, lsize = kernel.get_grid_size_upper_bounds(callables_table,
return_dict=True)

if (gsize or lsize):
if not kernel.options.disable_global_barriers:
logger.debug("%s: barrier insertion: global" % kernel.name)
gen_sched = insert_barriers(kernel, callables_table, gen_sched,
synchronization_kind="global",
verify_only=(not
kernel.options.insert_gbarriers))

logger.debug("%s: barrier insertion: local" % kernel.name)
gen_sched = insert_barriers(kernel, callables_table, gen_sched,
synchronization_kind="local", verify_only=False)
logger.debug("%s: barrier insertion: done" % kernel.name)

new_kernel = kernel.copy(
linearization=gen_sched,
state=KernelState.LINEARIZED)

from loopy.schedule.device_mapping import \
map_schedule_onto_host_or_device
if kernel.state != KernelState.LINEARIZED:
# Device mapper only gets run once.
new_kernel = map_schedule_onto_host_or_device(new_kernel)

return new_kernel


def _generate_loop_schedules_inner(
kernel: LoopKernel,
callables_table: Mapping[str, InKernelCallable],
Expand All @@ -2031,6 +2219,14 @@ def _generate_loop_schedules_inner(
raise LoopyError("cannot schedule a kernel that has not been "
"preprocessed")

try:
gen_sched = generate_loop_schedules_v2(kernel)
yield postprocess_schedule(kernel, callables_table, gen_sched)
return
except V2SchedulerNotImplementedException as e:
from warnings import warn
warn(f"Falling back to a slow scheduler implementation due to: {e}")

schedule_count = 0

debug = ScheduleDebugger(**debug_args)
Expand Down Expand Up @@ -2141,35 +2337,7 @@ def print_longest_dead_end():
sched_state, debug=debug, **schedule_gen_kwargs):
debug.stop()

gen_sched = convert_barrier_instructions_to_barriers(
kernel, gen_sched)

gsize, lsize = kernel.get_grid_size_upper_bounds(callables_table,
return_dict=True)

if (gsize or lsize):
if not kernel.options.disable_global_barriers:
logger.debug("%s: barrier insertion: global" % kernel.name)
gen_sched = insert_barriers(kernel, callables_table, gen_sched,
synchronization_kind="global",
verify_only=(not
kernel.options.insert_gbarriers))

logger.debug("%s: barrier insertion: local" % kernel.name)
gen_sched = insert_barriers(kernel, callables_table, gen_sched,
synchronization_kind="local", verify_only=False)
logger.debug("%s: barrier insertion: done" % kernel.name)

new_kernel = kernel.copy(
linearization=gen_sched,
state=KernelState.LINEARIZED)

from loopy.schedule.device_mapping import \
map_schedule_onto_host_or_device
if kernel.state != KernelState.LINEARIZED:
# Device mapper only gets run once.
new_kernel = map_schedule_onto_host_or_device(new_kernel)

new_kernel = postprocess_schedule(kernel, callables_table, gen_sched)
yield new_kernel

debug.start()
Expand Down

0 comments on commit 0f478dd

Please sign in to comment.