# Google Traces Parser - clusterdata-2011-2

This Python notebook parse the google trace called "clusterdata-2011-2" and create the necessary trace files required to run "cluster-scheduler-simulator".

Usefull links:
<ul>
    <li>
        <a href="https://github.com/google/cluster-data"> GitHub </a>
    </li>
    <li>
        <a href="https://drive.google.com/open?id=0B5g07T_gRDg9Z0lsSTEtTWtpOW8&authuser=0"> Format + Schema Document </a>
    </li>
    <li>
        <a href="https://groups.google.com/forum/#!forum/googleclusterdata-discuss"> Mailing List </a>
    </li>
</ul>
Notes:
<ul>
    <li>
        All resources utilization are normalized as explained in the "Format + Schema Document" in the above link.
        For this reason we will adjust them to real values using the same cell size of the "cluster-scheduler-simulator".
        File "init-cluster-state.log" expect real value of CPU and Memory to calculate the distribution.
    </li>
</ul>

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext 
from pyspark.sql.types import *
import pyspark.sql.functions as func

import os
import errno
from collections import OrderedDict

In [2]:
# Config values
CLUSTER_INFO = {
    "mem_machine": 64 * (1024**3),
    "cpu_machine": 32
}

SIMULATOR_TRACES_OUTPUT_FOLDER = "google-simulator-traces/"
JOB_TRACES_OUTPUT_FOLDER = "job-distribution-traces/"

In [3]:
def mkdir_p(path):
    path = path.replace(" ", "_")
    dir_path = os.path.dirname(path)
    try:
        os.makedirs(dir_path)
    except OSError as exc:  # Python >2.5
        if exc.errno == errno.EEXIST and os.path.isdir(dir_path):
            pass
        else:
            raise
    return path

# Create output folder structure
SIMULATOR_TRACES_OUTPUT_FOLDER = mkdir_p(os.path.join(".", SIMULATOR_TRACES_OUTPUT_FOLDER))
JOB_TRACES_OUTPUT_FOLDER = mkdir_p(os.path.join(SIMULATOR_TRACES_OUTPUT_FOLDER, JOB_TRACES_OUTPUT_FOLDER))

In [4]:
# conf = (
#     SparkConf()
#     .set("spark.master", os.environ["SPARK_MASTER"])
#     .set("spark.executor.memory", os.environ["SPARK_EXECUTOR_RAM"])
#        )
# sc.stop()
# sc=SparkContext(conf=conf)
sqlContext = SQLContext(sc)

schema = StructType([ \
    StructField("file_pattern", StringType(), False), \
    StructField("field_number", IntegerType(), False), \
    StructField("content", StringType(), False), \
    StructField("format", StringType(), False), \
    StructField("mandatory", StringType(), False)])
schema_all = sqlContext.read.format("com.databricks.spark.csv").options(header='true').load("clusterdata-2011-2/schema.csv", schema=schema)
# Displays the content of the DataFrame to stdout
schema_all.show(n=schema_all.count(), truncate=False)
format_type = {
    "INTEGER": DecimalType(32,0),
    "STRING_HASH": StringType(),
    "FLOAT": FloatType(),
    "BOOLEAN": DecimalType(),
    "STRING_HASH_OR_INTEGER": StringType()    
}
schemas = {}
for line in schema_all.orderBy("file_pattern", "field_number").select("*").collect():
    line = line.asDict()
    try:
        if line["file_pattern"] not in schemas:
            schemas[line["file_pattern"]] = []

        schemas[line["file_pattern"]].append(
            StructField(
                line["content"].replace(' ', '_'),
                format_type[line["format"]],
                line["mandatory"] != "YES"
            )
        )
    except KeyError as e:
        print(e)

dataframes = {}
for file_pattern in schemas:
    dataframes[os.path.dirname(file_pattern)] = sqlContext.read.format("com.databricks.spark.csv").load("clusterdata-2011-2/" + file_pattern, schema=StructType(schemas[file_pattern]))
    
max_time = pow(2,63) - 1

+---------------------------------------------+------------+-------------------------------+----------------------+---------+
|file_pattern                                 |field_number|content                        |format                |mandatory|
+---------------------------------------------+------------+-------------------------------+----------------------+---------+
|job_events/part-?????-of-?????.csv.gz        |1           |time                           |INTEGER               |YES      |
|job_events/part-?????-of-?????.csv.gz        |2           |missing info                   |INTEGER               |NO       |
|job_events/part-?????-of-?????.csv.gz        |3           |job ID                         |INTEGER               |YES      |
|job_events/part-?????-of-?????.csv.gz        |4           |event type                     |INTEGER               |YES      |
|job_events/part-?????-of-?????.csv.gz        |5           |user                           |STRING_HASH           |NO 

In [5]:
job_events = dataframes["job_events"]
#job_events.show(truncate=False)

task_events = dataframes["task_events"]
#task_events.show(truncate=False)

task_usage = dataframes["task_usage"]
#task_usage.show(truncate=False)

machine_events = dataframes["machine_events"]
#machine_events.show(truncate=False)

In [6]:
jobs_already_running = job_events[(job_events.event_type == 1) & (job_events.time == 0)]\
                        .select(job_events.job_ID, job_events.scheduling_class)
print("Jobs submitted before the event time window: {}".format(jobs_already_running.count()))

Jobs submitted before the event time window: 3984


In [7]:
finished_jobs_already_running = {}
for job in job_events[job_events.event_type == 4].join(jobs_already_running, on="job_ID")\
            .select(job_events.job_ID, job_events.time).collect():
        if job.job_ID not in finished_jobs_already_running:
            finished_jobs_already_running[job.job_ID] = job.time
print("Jobs submitted before that endend during the events time window: {}".format(len(finished_jobs_already_running)))

Jobs submitted before that endend during the events time window: 79


In [8]:
production_jobs_already_running = {}
for job in task_events[(task_events.event_type == 0) & (task_events.priority >= 9)].join(jobs_already_running, on="job_ID")\
            .groupBy(task_events.job_ID).count().collect():
        if job.job_ID not in production_jobs_already_running:
            production_jobs_already_running[job.job_ID] = 1
print("Production jobs submitted before the events time window: {}".format(len(production_jobs_already_running)))

Production jobs submitted before the events time window: 2525


In [9]:
tasks_jobs_already_running = {}
for job in task_events[task_events.event_type == 0].join(jobs_already_running, on="job_ID")\
            .groupBy(task_events.job_ID).agg(func.max("task_index").alias("tasks")).collect():
        if job.job_ID not in tasks_jobs_already_running:
            tasks_jobs_already_running[job.job_ID] = job.tasks + 1 # task_index starts from 0
print("Jobs submitted before the event time window that have at least 1 task: {}".format(len(tasks_jobs_already_running)))

Jobs submitted before the event time window that have at least 1 task: 3979


In [10]:
resources_jobs_already_running = {}
for job in task_usage.join(jobs_already_running, on="job_ID")\
            .groupBy(task_usage.job_ID, task_usage.task_index)\
            .agg(\
                 func.avg("CPU_rate").alias("cpu"),\
                 func.avg("canonical_memory_usage").alias("memory")
                )\
            .groupBy(task_usage.job_ID)\
            .agg(\
                 func.sum("cpu").alias("cpu"),\
                 func.sum("memory").alias("memory")
                )\
            .collect():
        if job.job_ID not in resources_jobs_already_running:
            resources_jobs_already_running[job.job_ID] = {
                'cpu': CLUSTER_INFO["cpu_machine"] * job.cpu,
                'memory': CLUSTER_INFO["mem_machine"] * job.memory
            }
print("Jobs processed: {}".format(len(resources_jobs_already_running)))

Jobs processed: 3976


In [18]:
job_count = 0
with open(os.path.join(SIMULATOR_TRACES_OUTPUT_FOLDER, "init-cluster-state.log"), "w") as init_cluster_state:
    for job in jobs_already_running.collect():
        # === Common Columns ===
        # Column 0: possible values are 11 or 12
        #   "11" - (8 column schema) something that was there at the beginning of timewindow
        #   "12" - (6 column schema) something that was there at beginning of timewindow and ended at [timestamp] (see Column 1)
        # Column 1: timestamp
        # Column 2: unique job ID
        # Column 3: 0 or 1 - prod_job - boolean flag indicating if this job is "production" priority as described in [1]
        # Column 4: 0, 1, 2, or 3 - sched_class - see description of "Scheduling Class" in [1]
        column_0 = 11
        column_1 = 0
        column_2 = job.job_ID
        column_3 = 0
        column_4 = job.scheduling_class
        # === 6 column format ===
        # Column 5: UNSPECIFIED/UNUSED
        #
        # === 8 column format ===
        # Column 5: number of tasks
        # Column 6: aggregate CPU usage of job (in num cores)
        # Column 7: aggregate Ram usage of job (in bytes)
        column_5 = 0
        column_6 = 0
        column_7 = 0
        
        if job.job_ID in production_jobs_already_running:
            column_3 = 1
        
        if job.job_ID in tasks_jobs_already_running:
            column_5 = tasks_jobs_already_running[job.job_ID]
        
        # What's the point in adding a job with 0 tasks to the trace?
        # Also, if we do so we will get an error
        if column_5 == 0:
            continue
        
        if job.job_ID in resources_jobs_already_running:
            column_6 = resources_jobs_already_running[job.job_ID]["cpu"]
            column_7 = int(resources_jobs_already_running[job.job_ID]["memory"])
            
        init_cluster_state.write("{} {} {} {} {} {} {} {}\n".format(
                column_0, column_1, column_2, column_3, column_4, column_5, column_6, column_7))
        
        if job.job_ID in finished_jobs_already_running:
            column_0 = 12
            column_1 = finished_jobs_already_running[job.job_ID] 
            init_cluster_state.write("{} {} {} {} {} {}\n".format(
                column_0, column_1, column_2, column_3, column_4, column_5))
        job_count += 1
print("Jobs processed: {}".format(job_count))

Jobs processed: 3979


In [12]:
job_that_finished = job_events[((job_events.event_type == 1) | (job_events.event_type == 4))  & ((job_events.time > 0) & (job_events.time < max_time))]\
                    .orderBy(job_events.time).groupBy(job_events.job_ID).count().where("count = 2")

In [13]:
jobs = OrderedDict({})
for job in job_events[(job_events.event_type == 1)].join(job_that_finished, on="job_ID")\
                    .orderBy(job_events.time).select(job_events.job_ID, job_events.time, job_events.scheduling_class)\
                    .collect():
        if job.job_ID not in jobs:
            jobs[job.job_ID] = {
                "production": False,
                "tasks": 0,
                "start": 0,
                "end": 0,
                "scheduling_class": None
            }
        jobs[job.job_ID]["start"] = job.time
        jobs[job.job_ID]["scheduling_class"] = job.scheduling_class
print("Jobs submitted during the event time window: {}".format(len(jobs)))

Jobs submitted during the event time window: 385502


In [14]:
job_count = 0
for job in job_events[(job_events.event_type == 4)].join(job_that_finished, on="job_ID")\
                .select(job_events.job_ID, job_events.time).collect():
        if job.job_ID in jobs:
            jobs[job.job_ID]["end"] = job.time
            job_count += 1
print("Jobs processed: {}".format(job_count))

Jobs processed: 385502


In [15]:
job_count = 0
for job in task_events[task_events.event_type == 0].join(job_that_finished, on="job_ID")\
                .groupBy(task_events.job_ID).agg(func.max("task_index").alias("tasks")).collect():
        if job.job_ID in jobs:
            jobs[job.job_ID]["tasks"] = job.tasks + 1
            job_count += 1
print("Jobs processed: {}".format(job_count))

Jobs processed: 385482


In [16]:
job_count = 0
for job in task_events[(task_events.event_type == 0) & (task_events.priority >= 9)].join(job_that_finished, on="job_ID") \
        .groupBy(task_events.job_ID).count().collect():
    if job.job_ID in jobs:
        jobs[job.job_ID]["production"] = True
        job_count += 1
print("Jobs processed: {}".format(job_count))

Jobs processed: 35838


In [19]:
interarrival_cmb = open(os.path.join(JOB_TRACES_OUTPUT_FOLDER, "interarrival_cmb.log"), "w")
runtimes_cmb = open(os.path.join(JOB_TRACES_OUTPUT_FOLDER, "runtimes_cmb.log"), "w")
csizes_cmb = open(os.path.join(JOB_TRACES_OUTPUT_FOLDER, "csizes_cmb.log"), "w")

previous_arrival = 0
for job in jobs:
    job = jobs[job]
    # What's the point in adding a job with 0 tasks to the trace?
    # Also, if we do so we will get an error
    if job["tasks"] == 0:
        continue
        
    # === Columns ===
    # Column 0: cluster_name
    # Column 1: assignment policy ("cmb-new" = "CMB_PBB")
    # Column 2: scheduler id, values can be 0 or 1. 0 = batch, service = 1
    # Column 3: depending on which trace file:
    #     interarrival time (seconds since last job arrival)
    #     OR tasks in job
    #     OR job runtime (seconds)
    column_0 = "test"
    column_1 = "cmb-new"
    column_2 = 1 if job["production"] and (job["scheduling_class"] != 0 and job["scheduling_class"] != 1) else 0

    if job["start"] - previous_arrival < 0:
        print("Error. Dataset is not ordered by arrival time!")
        break
    interarrival_cmb.write("{} {} {} {} \n".format(column_0, column_1, column_2,
                                                   (job["start"] - previous_arrival)/1000000))
    runtimes_cmb.write("{} {} {} {} \n".format(column_0, column_1, column_2,
                                               (job["end"] - job["start"])/1000000))
    csizes_cmb.write("{} {} {} {} \n".format(column_0, column_1, column_2,
                                             job["tasks"]))
    
    previous_arrival = job["start"]

interarrival_cmb.close()
runtimes_cmb.close()
csizes_cmb.close()