In [1]:
import os
import sys
import time
from collections import deque
import math
from datetime import datetime
import findspark

os.environ['PYARROW_IGNORE_TIMEZONE'] = "1"

from functools import reduce as _reduce
from toposort import toposort, CircularDependencyError
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
from pyspark.sql import SparkSession
import databricks.koalas as ks
from pyspark.sql.types import StructType, StructField, LongType

import preserve

In [2]:
def wait_for_reservation(manager, timeout, reservation_id, quiet=True):
    pm = manager
    starttime = time.time()
    lasttime = starttime + int(timeout)

    waittime = 5
    timeswaited = 0

    while True:
        state = pm.fetch_reservation(reservation_id).state
        if state == "R":
            break

        curtime = time.time()
        maxwaittime = lasttime - curtime
        nextwaittime = int(min(maxwaittime, waittime))
        if nextwaittime <= 0:
            print("[%.1f] Current state: %s. Reached timeout." % (curtime, state))
            sys.exit("wait-for-reservation timed out")
        if not quiet:
            print("[%.1f] Current state: %s. Waiting %u more seconds." % (curtime, state, nextwaittime))
        time.sleep(nextwaittime)

        timeswaited += 1
        if timeswaited == 12:
            waittime = 10 # After a minute, decrease the polling frequency
        elif timeswaited == 36:
            waittime = 15 # After 5 minutes, decrease the polling frequency
        elif timeswaited == 76:
            waittime = 30 # After 15 minutes, decrease the polling frequency

In [4]:
def lookahead_newer(df) -> pd.DataFrame["workflow_id": int, "task_id": int, "task_slack": int, "minimal_start_time": int]:
    df.set_index("id", inplace=True)
    
    df = df[((df['children'].map(len) > 0) | (df['parents'].map(len) > 0))]
    
    if(len(df) == 0):
        return pd.DataFrame(columns=["workflow_id", "task_id", "task_slack", "minimal_start_time"])
  
    graph = dict()
    forward_dict = dict()
    task_runtimes = dict()
    task_arrival_times = dict()
    workflow_id = None

    for row in df.to_records(): # 0: task id, 1: wf id, 2: children, 3: parents, 4: ts submit, 5: runtime
        graph[row[0]] = set(row[3].flatten())
        forward_dict[row[0]] = set(row[2].flatten())
        task_runtimes[row[0]] = row[5]
        task_arrival_times[row[0]] = row[4]
        workflow_id = row[1]
        
    del df
    del row
    
    try:
        groups = list(toposort(graph))
    except CircularDependencyError:
        del forward_dict
        del task_runtimes
        del task_arrival_times
        del workflow_id
        del graph
        return pd.DataFrame(columns=["workflow_id", "task_id", "task_slack", "minimal_start_time"])
    
    del graph
    
    for group in groups:
        for task_id in group:
            task_done = task_runtimes[task_id] + task_arrival_times[task_id]

            for c in forward_dict[task_id]:
                # Given the runtime of our parent, check if the submit time of this task is dominant or
                # if our parent is dominant.
                if c not in task_runtimes: continue  # Task was not in snapshot of trace
                if task_done > task_arrival_times[c]:
                    task_arrival_times[c] = task_done
                

    del task_done
    del c

    rows = deque() # More memory efficient, see https://towardsdatascience.com/memory-efficiency-of-common-python-data-structures-88f0f720421
    for task_id in task_runtimes.keys():
        min_child = None
        for c in forward_dict[task_id]:
            if c not in task_arrival_times: continue
            if min_child is None:
                min_child = task_arrival_times[c]
            else:
                min_child = min(task_arrival_times[c], min_child)
        
        # If it's an end task (with no childen) - just put its finish time there then.
        if min_child is None:
            rows.append([workflow_id, task_id, 0, task_arrival_times[task_id]])
        elif min_child >= task_arrival_times[task_id] + task_runtimes[task_id]:
            rows.append([workflow_id, task_id, min_child - (task_arrival_times[task_id] + task_runtimes[task_id]), task_arrival_times[task_id]])
        else:
            raise Exception("Error: A child should never have a start time lower than a parent")
    
                        
    del task_arrival_times
    del task_runtimes
    del forward_dict
    del min_child

    return pd.DataFrame(rows, columns=["workflow_id", "task_id", "task_slack", "minimal_start_time"])  # Remove [:0] to return the entire DF. Also remove to_parquet then.

In [7]:
location = "./WTA/parquet"
if 'DAS5' in os.environ:  # If we want to execute it on the DAS-5 super computer
    if 'DEPLOYER_HOME' not in os.environ:
            print("NEED TO SET $DEPLOYER_HOME - see Tim Hegeman's DAS deploy script")
            exit(-2)
            
    os.environ['JAVA_HOME'] = "/usr/lib/jvm/jre-11-openjdk"
    
    hadoop_version = "3.2.2"
    spark_version = "3.1.1"
    
    import pyspark
    if str(pyspark.__version__) != spark_version:
        print(str(pyspark.__version__), spark_version)
        print("Version mismatch between spark and pyspark. Update one or downgrade the other.")
        exit(-1)
    
    num_machines = 11
    reservation_manager = preserve.get_PreserveManager()
    reservation_id = reservation_manager.create_reservation(num_machines, "72:00:00")
    wait_for_reservation(reservation_manager, 12000, str(reservation_id), False)
    
    master_node = reservation_manager.get_own_reservations()[reservation_id].assigned_machines[0]
    print("We are on DAS5, {0} is master.".format(master_node))
    
    # Now start Hadoop and Spark
    os.system("cd {}; ./deployer deploy --preserve-id {} -s env/das5-spark.settings spark {}".format(os.environ['DEPLOYER_HOME'], reservation_id, spark_version))
    os.system("cd {}; ./deployer deploy --preserve-id {} -s env/das5-hadoop.settings hadoop {} yarn_enable=false".format(os.environ['DEPLOYER_HOME'], reservation_id, hadoop_version))
    
    try:
        findspark.init(f'./big-data-frameworks/spark-{spark_version}') 
        spark = SparkSession.builder \
            .master("spark://" + master_node + ":7077") \
            .appName("Energy Efficiency Data Analysis") \
            .config("spark.executor.memory", "60G") \
            .config("spark.executor.cores", "16") \
            .config("spark.executor.instances", "1") \
            .config("spark.driver.memory", "60G") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
            .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true") \
            .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
            .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
            .config("spark.local.dir", "/localspark, /tmp") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
            .config("spark.kryoserializer.buffer.max", "128m") \
            .config('spark.memory.storageFraction', 0.6) \
            .config('spark.memory.fraction', 0.4) \
            .config('spark.rdd.compress', 'true') \
            .config('spark.checkpoint.compress', 'true') \
            .config('spark.sql.shuffle.partitions', 4*(num_machines-1)*16) \
            .getOrCreate()
    except Exception as e:
        print("Spark could not be started!!")
        reservation_manager.kill_reservation(reservation_id)
        raise e
    
    print("Loading data into HDFS")
    folders = next(os.walk(location))[1]
    for folder in folders:
        # Skip too large datasets or those without task dependencies
        if "alibaba" in str(folder).lower() and "100k" not in str(folder).lower(): continue
        if "google" in str(folder).lower(): continue
        if "lanl" in str(folder).lower(): continue
        if "two_sigma" in str(folder).lower(): continue

        data_folder = os.path.join(location, folder, "tasks", "schema-1.0")
        if not os.path.exists(data_folder): continue
        hdfs_path = os.path.join("/WTA/parquet/", folder, "tasks", "schema-1.0")
        os.system("cd {}; ./frameworks/hadoop-{}/bin/hdfs dfs -mkdir -p {}".format(os.environ['DEPLOYER_HOME'], hadoop_version, hdfs_path))
        os.system("cd {}; ./frameworks/hadoop-{}/bin/hdfs dfs -copyFromLocal {} {}".format(os.environ['DEPLOYER_HOME'], hadoop_version, os.path.join(data_folder, "*"), hdfs_path))
    print("Done! Starting...")
else:
    findspark.init(spark_home="<path to spark>")
    spark = SparkSession.builder \
        .master("local[8]") \
        .appName("WTA parser") \
        .config("spark.executor.memory", "20G") \
        .config("spark.driver.memory", "8G") \
        .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
        .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
        .getOrCreate()

[1617278042.0] Current state: PD. Waiting 5 more seconds.
We are on DAS5, node053.ib.cluster is master.
Loading data into HDFS
Done! Starting...


In [None]:
output_location = "./ic2e-wta-output"
hdfs_path = f"hdfs://{master_node}:9000/WTA/parquet/"
try:
    folders = next(os.walk(location))[1]
    
    for folder in folders:
        if "alibaba" in str(folder).lower() and "100k" not in str(folder).lower(): continue
        if "google" in str(folder).lower(): continue
        if "lanl" in str(folder).lower(): continue
        if "two_sigma" in str(folder).lower(): continue
        data_folder = os.path.join(location, folder)
        output_location_look_ahead = os.path.join(output_location, "look_ahead", folder.replace("_parquet", "") + "_slack.parquet")

        if not os.path.exists(os.path.join(data_folder, "tasks", "schema-1.0")): continue
        
        print(os.path.join(hdfs_path, folder, "tasks", "schema-1.0"))
        try:
            kdf = ks.read_parquet(os.path.join(hdfs_path, folder, "tasks", "schema-1.0"),
                         columns=[
                             "workflow_id", "id", "task_id", "children", "parents", "ts_submit", "runtime", 
                         ], pandas_metadata=False, engine='pyarrow')
        except:
            kdf = ks.read_parquet(os.path.join(hdfs_path, folder, "tasks", "schema-1.0"),
                     columns=[
                         "workflow_id", "id", "task_id", "children", "parents", "ts_submit", "runtime"
                     ], pandas_metadata=True, engine='pyarrow')
            
        os.makedirs(output_location_look_ahead, exist_ok=True)
        
        if "task_id" in kdf.columns:
            kdf = kdf.rename(columns={"task_id": "id"})
            
        print("Removing NAs")
        kdf.dropna(inplace=True)
            
        def parse_kdf(kdf, append=False):
            print("Grouping based on workflow ID")
            grouped_df = kdf.groupby("workflow_id")

            print("Running look ahead")
            grouped_df.apply(lookahead_newer) \
                .to_parquet(output_location_look_ahead, compression='snappy', engine='pyarrow',
                           mode='append' if append else 'overwrite')
            
        parse_kdf(kdf)

except Exception as e:
    print("Exception")
    raise e
finally:
    print("Stopping...")
    spark.stop()
    reservation_manager.kill_reservation(reservation_id)