Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Questions and answers regrading codes in spark_env/job_dag.py #7

Open
jiaweiguo1019 opened this issue Dec 20, 2019 · 1 comment
Open

Comments

@jiaweiguo1019
Copy link

jiaweiguo1019 commented Dec 20, 2019

Here are some questions I met when reading codes in /decima-sim/spark_env/.
I asked Hongzi and he replied clearly and concisely.
It's an honor to post Hongzi's answers here so that maybe someone has the same questions can get some inspiration form Hongzi's answers.



There're some questions about the fundamental assumption of spark — every job ends with a single final stage I can’t really figure out.

reply form Hongzi

It’s just every Spark job (at least the ones we studied) ends with a single leaf node. You can see this in the DAG visualization that you have found in spark_env/tpch/dag_visualization.


first question

My first question is about the code

def merge_job_dags(job_dags):
    # merge all DAGs into a general big DAG
    # this function will modify the original data structure
    # 1. take nodes from the natural order
    # 2. wire the parent and children across DAGs
    # 3. reconstruct adj_mat by properly connecting
    # the new edges among individual adj_mats

    total_num_nodes = sum([d.num_nodes for d in job_dags])
    nodes = []
    adj_mat = np.zeros([total_num_nodes, total_num_nodes])

    base = 0  # for figuring out new node index
    leaf_nodes = []  # leaf nodes in the current job_dag

    for job_dag in job_dags:

        num_nodes = job_dag.num_nodes

        for n in job_dag.nodes:
            n.idx += base
            nodes.append(n)

        # update the adj matrix
        adj_mat[base : base + num_nodes, \
            base : base + num_nodes] = job_dag.adj_mat

        # fundamental assumption of spark --
        # every job ends with a single final stage
        if base != 0:  # at least second job
            for i in range(num_nodes):
                if np.sum(job_dag.adj_mat[:, i]) == 0:
                    assert len(job_dag.nodes[i].parent_nodes) == 0
                    adj_mat[base - 1, base + i] = 1

        # store a set of new root nodes
        root_nodes = []
        for n in job_dag.nodes:
            if len(n.parent_nodes) == 0:
                root_nodes.append(n)

        # connect the root nodes with leaf nodes
        for root_node in root_nodes:
            for leaf_node in leaf_nodes:
                leaf_node.child_nodes.append(root_node)
                root_node.parent_nodes.append(leaf_node)

        # store a set of new leaf nodes
        leaf_nodes = []
        for n in job_dag.nodes:
            if len(n.child_nodes) == 0:
                leaf_nodes.append(n)

        # update base
        base += num_nodes

    assert len(nodes) == adj_mat.shape[0]

    merged_job_dag = JobDAG(nodes, adj_mat)

    return merged_job_dag
        # fundamental assumption of spark --
        # every job ends with a single final stage
        if base != 0:  # at least second job
            for i in range(num_nodes):
                if np.sum(job_dag.adj_mat[:, i]) == 0:
                    assert len(job_dag.nodes[i].parent_nodes) == 0
                    adj_mat[base - 1, base + i] = 1

I guess the base - 1 here is because all the job_dags ends with the last idx node?
So base - 1 just means the final stage of the previous job_dag?

        # store a set of new leaf nodes
        leaf_nodes = []
        for n in job_dag.nodes:
            if len(n.child_nodes) == 0:
                leaf_nodes.append(n)

follw my previous guess and the assumption every job ends with a single final stage, I think there is only one leaf node is base + job_dag.num_nodes -1.

reply form Hongzi

I think your understanding is correct. This function is connecting multiple DAGs sequentially to make them a bigger one. However, I don’t think this function is used anywhere else in the repo. At some point we wanted to study Decima’s behavior of scheduling a single DAG and we used functions similar to this one to create big DAGs.

there is still some confusions, so I asked Hongzi again

I didn’t express clearly last time. What I wanted to say is that in my understanding there are no leaf nodes but only one leaf node — base + job_dag.num_nodes - 1.
So I’m confused about the leaf_nodes setup.

reply from Hongzi

Hmm, I don’t think we explicitly put the leaf node at the end of the list of nodes. We just pick the leaf node based on its definition — that it doesn’t have a child node to it (len(n.child_nodes == 0).


second question

My second question is about the pictures of decima-sim/spark_env/tpch/dag_visualization and decima-sim/spark_env/tpch/task_durations

屏幕快照 2019-12-19 下午4 01 20

I understand the 0 1 2 3 means the idx the node but I’m not sure whether the 200 means 200 tasks and 0.30 sec means the duration of a single task?

reply from Hongzi

You assumption is correct. Here’s the code snippets to generate the text on each node

        # a *very* rough average of task duration
        for n in task_durations:
            task_duration = task_durations[n]

            e = next(iter(task_duration['first_wave']))
            num_tasks = len(task_duration['first_wave'][e]) + \
                        len(task_duration['rest_wave'][e])

            avg_task_duration = np.mean(
                [i for l in task_duration['first_wave'].values() for i in l] + \
                [i for l in task_duration['rest_wave'].values() for i in l] + \
                [i for l in task_duration['fresh_durations'].values() for i in l])

            nodes_mat[n, 0] = num_tasks
            nodes_mat[n, 1] = '{0:.2f}'.format(avg_task_duration / 1000.0) + 'sec'

屏幕快照 2019-12-19 下午4 03 49

And I also don’t understand what the different colors mean here.

reply from Hongzi

The color means different “waves” of task durations under different number of executors. More details of the wave behavior can be found in the Decima paper section 3 and section 6.2 (1). The code snippet for visualizing is

    for exec_num in sorted(duration_info.keys()):

        fw = duration_info[exec_num]['first_wave']
        rw = duration_info[exec_num]['rest_wave']
        fd = duration_info[exec_num]['fresh_durations']

        plt.plot([plt_idx] * len(fw), fw, 'rx')
        plt.plot([plt_idx] * len(rw), rw, 'bx')
        plt.plot([plt_idx] * len(fd), fd, 'gx')

        plt_idx += 1
@jiaweiguo1019 jiaweiguo1019 changed the title Some questions about codes in /decima-sim/spark_env/job_dag.py Questions and answers regrading codes in /decima-sim/spark_env/job_dag.py Dec 20, 2019
@jiaweiguo1019 jiaweiguo1019 changed the title Questions and answers regrading codes in /decima-sim/spark_env/job_dag.py Questions and answers regrading codes in spark_env/job_dag.py Dec 20, 2019
@hongzimao
Copy link
Owner

Thanks for posting!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants