In [1]:
%classpath add mvn org.apache.spark spark-sql_2.11 2.1.0
org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.ERROR);


null

In [2]:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._ 


import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


In [3]:
val spark = SparkSession.builder() 
  .master("local[*]") 
  .config("spark.executor.memory", "3g")
  .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
  .appName("NohupReader")
  .getOrCreate()


org.apache.spark.sql.SparkSession@1025fb6f

In [4]:
import spark.implicits._

val research_home: String = scala.util.Properties.envOrElse("RESEARCH_HOME", "/home/acald013/Research/")
val folder = s"${research_home}Scripts/Python/"
val prefix = "nohup"

val nohup = spark.read.textFile(s"${folder}${prefix}*")


org.apache.spark.sql.SparkSession$implicits$@76553344

In [5]:
println(nohup.count)

22832


null

In [20]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
case class Line(line: String, n: Long)
case class Run(runID: Long, date: String, method: String, cores: Int, epsilon: Double, mu: Int, delta: Int, methodTime: Double)
case class Stage(runID: Long, n: Long, timestamp: String, stage: String, stageTime: Double, load: Int, unit: String)
case class MDFrow(mdfID: Long, n: Long, method: String, epsilon: Double, mu: Int, delta: Int, stage: String, time: Double, load: Int, unit: String)

implicit class DatasetOps(ds: org.apache.spark.sql.Dataset[_]) {
    def display(rows: Int = 20) = {
        import com.twosigma.beakerx.scala.table.TableDisplay
        val columns = ds.columns
        val rowVals = ds.toDF.take(rows)
        val t = new TableDisplay(rowVals map (row => (columns zip row.toSeq).toMap))
        t.display()
    }
}

defined class Line
defined class Run
defined class Stage
defined class MDFrow
defined class DatasetOps


In [7]:
val lines = nohup.toDF("line").withColumn("n", monotonicallyIncreasingId).as[Line].cache()
val nLines = lines.count()
lines.display(10)

null

In [53]:
val indicesRun = lines.filter{ l => 
        l.line.contains("=== MergeLast Start ===") || l.line.contains("method=MergeLast,") ||
        l.line.contains("=== SpatialJoin Start ===") || l.line.contains("method=SpatialJoin,")
    }
    .orderBy("n")
    .select("n")
    .collect()
    .toList
    .map(_.getLong(0))
    .grouped(2)
    .toList
    .map(pair => (pair.head, pair.last))
    .filter(r => r._1 != r._2)
    .zipWithIndex
val indexRun = spark.createDataset(indicesRun)
    .flatMap{ pair => 
        (pair._1._1 to pair._1._2)
        .toList.map(v => (pair._2, v))
    }
    .toDF("runID","n")
    .cache
indexRun.display(100)

null

In [52]:
val runs = indexRun.join(lines, "n").
    groupBy("runID").
    agg(max($"n").alias("n")).
    join(lines, "n").
    select("runID", "line").
    orderBy("runID").
    map{ row =>
        val runID = row.getInt(0)
        val line  = row.getString(1)
        var arr1  = line.split(" -> ")
        val date  = arr1(0)
        val arr2  = arr1(1).split(",")
        val method  = arr2(0).split("=")(1)
        val cores   = arr2(1).split("=")(1).toInt
        val epsilon = arr2(2).split("=")(1).toDouble
        val mu      = arr2(3).split("=")(1).toInt
        val delta   = arr2(4).split("=")(1).toInt
        val time    = arr2(5).split("=")(1).toDouble
        Run(runID, date, method, cores, epsilon, mu, delta, time)
    }.
    cache
runs.display(10)

null

In [10]:
val stages = lines.filter(_.line.contains("|")).
    join(indexRun, "n").
    map{ m =>
        val n         = m.getLong(0)
        val line      = m.getString(1)
        val runID     = m.getInt(2)
        var arr1      = line.split(" -> ")
        val timestamp = arr1(0).trim
        val arr2      = arr1(1).split("\\|")
        val stage     = arr2(0).trim
        val time      = arr2(1).trim.dropRight(1).toDouble
        val arr3      = arr2(2).trim.split(" ")
        val load      = arr3(0).toInt
        val unit      = arr3(1)
        Stage(runID, n, timestamp, stage, time, load, unit)
    }.cache
    
stages.count()
stages.display(10)

null

In [11]:
val ml_stages = stages.join(runs, "runID").
    filter($"method" === "MergeLast").
    select($"runID", $"n", $"method", $"epsilon", $"mu", $"delta", $"stage".alias("stage0"), $"stageTime").
    withColumn("stage1", regexp_replace($"stage0", "Reporting locations at t=\\d+", "0.Reporting locations")).
    withColumn("stage2", regexp_replace($"stage1", "Checking internal timestamps", "4.Checking internals")).
    withColumn("stage3", regexp_replace($"stage2", "\\.\\.\\.", "")).
    select($"runID", $"n", $"method", $"epsilon", $"mu", $"delta", $"stage3".alias("stage"), $"stageTime").
    filter(!$"stage".rlike("4.Distance Join phase")).
    filter(!$"stage".rlike("5.Getting candidates"))
ml_stages.show(truncate = false)
//ml_stages.display(20)


+-----+---+---------+-------+---+-----+----------------------------+---------+
|runID|n  |method   |epsilon|mu |delta|stage                       |stageTime|
+-----+---+---------+-------+---+-----+----------------------------+---------+
|0    |19 |MergeLast|10.0   |4  |5    |0.Reporting locations       |5.3      |
|0    |35 |MergeLast|10.0   |4  |5    |1.Set of disks for t_i      |29.4     |
|0    |36 |MergeLast|10.0   |4  |5    |0.Reporting locations       |5.01     |
|0    |52 |MergeLast|10.0   |4  |5    |2.Set of disks for t_i+delta|32.2     |
|0    |53 |MergeLast|10.0   |4  |5    |3.Joining timestams         |3.89     |
|0    |117|MergeLast|10.0   |4  |5    |4.Checking internals        |57.76    |
|0    |118|MergeLast|10.0   |4  |5    |0.Reporting locations       |4.76     |
|0    |134|MergeLast|10.0   |4  |5    |1.Set of disks for t_i      |18.14    |
|0    |135|MergeLast|10.0   |4  |5    |0.Reporting locations       |4.78     |
|0    |151|MergeLast|10.0   |4  |5    |2.Set of disk

null

In [12]:
val sj_stages = stages.join(runs, "runID").
    filter($"method" === "SpatialJoin").
    select($"runID", $"n", $"method", $"epsilon", $"mu", $"delta", $"stage".alias("stage0"), $"stageTime").
    withColumn("stage1", regexp_replace($"stage0", "\\.\\.\\.", "")).
    withColumn("stage2", regexp_replace($"stage1", "Reporting", "0.Reporting")).
    select($"runID", $"n", $"method", $"epsilon", $"mu", $"delta", $"stage2".alias("stage"), $"stageTime").
    filter(!$"stage".rlike("4.Distance Join phase")).
    filter(!$"stage".rlike("5.Getting candidates"))
sj_stages.show(truncate = false)
//sj_stages.display(20)


+-----+----+-----------+-------+---+-----+----------------------+---------+
|runID|n   |method     |epsilon|mu |delta|stage                 |stageTime|
+-----+----+-----------+-------+---+-----+----------------------+---------+
|5    |3325|SpatialJoin|10.0   |4  |5    |0.Reporting locations |27.57    |
|5    |3341|SpatialJoin|10.0   |4  |5    |1.Set of disks for t_i|141.36   |
|5    |3342|SpatialJoin|10.0   |4  |5    |4.Found flocks        |0.41     |
|5    |3343|SpatialJoin|10.0   |4  |5    |5.Updating times      |0.42     |
|5    |3344|SpatialJoin|10.0   |4  |5    |6.Filter phase        |0.86     |
|5    |3345|SpatialJoin|10.0   |4  |5    |0.Reporting locations |5.19     |
|5    |3361|SpatialJoin|10.0   |4  |5    |1.Set of disks for t_i|20.51    |
|5    |3362|SpatialJoin|10.0   |4  |5    |2.Distance Join phase |2.95     |
|5    |3363|SpatialJoin|10.0   |4  |5    |3.Getting candidates  |1.55     |
|5    |3364|SpatialJoin|10.0   |4  |5    |4.Found flocks        |0.2      |
|5    |3365|

null

In [13]:
val data = runs.select($"runID", $"method", $"epsilon", $"mu", $"delta", $"methodTime".alias("time")).
    orderBy($"runID", $"epsilon", $"method").
    cache
data.count()
data.display(10)

null

In [14]:
val d = data.collect.map(_.mkString(";")).mkString("\n")

import java.io._
val pw = new PrintWriter(new File(s"${folder}methods.csv" ))
pw.write(s"$d\n")
pw.close

null

In [15]:
val d = ml_stages.union(sj_stages).collect.map(_.mkString(";")).mkString("\n")

import java.io._
val pw = new PrintWriter(new File(s"${folder}stages.csv" ))
pw.write(s"$d\n")
pw.close

null

In [23]:
val indicesMdf = lines.filter{ l => 
        l.line.contains(" -> Setting mu=") || l.line.contains(" ->   berlin0-10,") 
    }
    .orderBy("n")
    .select("n")
    .collect()
    .toList
    .map(_.getLong(0))
    .grouped(2)
    .toList
    .map(pair => (pair.head, pair.last))
    .filter(r => r._1 != r._2)
    .zipWithIndex
val indexMdf = spark.createDataset(indicesMdf)
    .flatMap{ pair => 
        (pair._1._1 to pair._1._2)
        .toList.map(v => (pair._2, v))
    }
    .toDF("mdfID","n")
    .cache
indexMdf.display(15)

                                                                                

null

In [41]:
val mdfInfo = indexMdf.groupBy($"mdfID").agg(max($"n").alias("m")).orderBy($"m")
val mdfInternalIDs = mdfInfo.join(lines, $"m" === $"n").
    map{ m =>
        val mdfID  = m.getInt(0)
        val line   = m.getString(2).split(" -> ")(1)
        val t      = line.split(",").last.trim.toInt
        (mdfID, t)
    }.
    toDF("mdfID", "t").
    filter($"t" < 0).
    cache
//mdfInternalIDs.show(truncate = false)

                                                                                

[mdfID: int, t: int]

In [70]:
val mdfInternals = lines.join(indexMdf, "n").
    join(mdfInternalIDs, "mdfID").
    filter($"line".rlike("[A-K]\\.")).
    map{ m =>
        val mdfID = m.getInt(0)
        val n = m.getLong(1)
        val line = m.getString(2).split(" -> ")(1)
        val arr =  line.split("\\[")
        val stage = arr(0).trim
        val time = arr(1).trim.split("s")(0).toDouble
        val load = arr(2).trim.split(" ")(0).toInt
        (mdfID, n, stage, time, load)
    }.
    toDF("mdfID", "n", "stage", "time", "load").
    orderBy("n").join(indexRun, "n").
    join(runs, "runID").
    withColumn("stage2", regexp_replace($"stage", "\\.\\.\\.", "")).
    select($"runID", $"mdfID", $"n", $"epsilon", $"mu", $"delta", $"method", $"stage2".alias("stage"), $"load", $"time").
    cache
mdfInternals.show(truncate = false)

                                                                                

+-----+-----+---+-------+---+-----+---------+---------------------------------+----+-----+
|runID|mdfID|n  |epsilon|mu |delta|method   |stage                            |load|time |
+-----+-----+---+-------+---+-----+---------+---------------------------------+----+-----+
|0    |2    |57 |10.0   |4  |5    |MergeLast|A.Indexing points                |34  |3.311|
|0    |2    |58 |10.0   |4  |5    |MergeLast|B.Getting pairs                  |56  |0.852|
|0    |2    |59 |10.0   |4  |5    |MergeLast|C.Computing centers              |112 |0.483|
|0    |2    |60 |10.0   |4  |5    |MergeLast|D.Indexing centers               |112 |0.706|
|0    |2    |61 |10.0   |4  |5    |MergeLast|E.Getting disks                  |112 |1.364|
|0    |2    |62 |10.0   |4  |5    |MergeLast|F.Filtering less-than-mu disks   |29  |0.261|
|0    |2    |63 |10.0   |4  |5    |MergeLast|G.Prunning duplicate candidates  |13  |0.692|
|0    |2    |64 |10.0   |4  |5    |MergeLast|H.Indexing candidates            |13  |1.149|

null

In [71]:
val d = mdfInternals.collect.map(_.mkString(";")).mkString("\n")

import java.io._
val pw = new PrintWriter(new File(s"${folder}mdfInternals.csv" ))
pw.write(s"$d\n")
pw.close

                                                                                

null

In [81]:
val mdfInfo = indexMdf.groupBy($"mdfID").agg(max($"n").alias("n")).orderBy($"n").
    join(indexRun, "n").
    join(runs, "runID").
    join(lines, "n").
    select($"method", $"epsilon", $"mu", $"delta", $"line").
    withColumn("timestamp", substring($"line", 122, 124)).
    select($"method", $"epsilon", $"mu", $"delta", $"timestamp")

mdfInfo.show(truncate = false)

                                                                                

+---------+-------+---+-----+---------+
|method   |epsilon|mu |delta|timestamp|
+---------+-------+---+-----+---------+
|MergeLast|10.0   |4  |5    |  0      |
|MergeLast|10.0   |4  |5    |  4      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    |  1      |
|MergeLast|10.0   |4  |5    |  5      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    |  2      |
|MergeLast|10.0   |4  |5    |  6      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    |  3      |
|MergeLast|10.0   |4  |5    |  7      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    | -1      |
|MergeLast|10.0   |4  |5    | -1      |
+---------+-------+---+-----+---------+
only showing top 20 rows



null

In [82]:
val d = mdfInfo.collect.map(_.mkString(";")).mkString("\n")

import java.io._
val pw = new PrintWriter(new File(s"${folder}mdfInfo.csv" ))
pw.write(s"$d\n")
pw.close

null