Skip to content

Commit

Permalink
[LABY] Add Labyrinth compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
lmadaitahy authored and ggevay committed Jul 20, 2018
1 parent a428dcb commit 5836fdb
Show file tree
Hide file tree
Showing 35 changed files with 4,860 additions and 140 deletions.
Expand Up @@ -17,20 +17,13 @@ package org.emmalanguage
package cli

import api._
import examples.graphs.ConnectedComponents
import examples.graphs.EnumerateTriangles
import examples.graphs.model.Edge
import util.Iso

import breeze.linalg.{Vector => Vec}

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.reflect.ClassTag
//import examples.graphs._
//import examples.graphs.model._
import examples.text._
//import util.Iso

object LabyrinthExamplesRunner extends LabyrinthAware {
// Text
Expand Down Expand Up @@ -71,24 +64,6 @@ object LabyrinthExamplesRunner extends LabyrinthAware {
c
})

section("Graph Analytics")
cmd("connected-components")
.text("Label undirected graph vertices with component IDs")
.children(
arg[String]("input")
.text("edges path")
.action((x, c) => c.copy(input = x)),
arg[String]("output")
.text("labeled vertices path")
.action((x, c) => c.copy(output = x)))
note("")
cmd("triangle-count")
.text("Count the number of triangle cliques in a graph")
.children(
arg[String]("input")
.text("edges path")
.action((x, c) => c.copy(input = x)))

section("Text Analytics")
cmd("word-count")
.text("Word Count Example")
Expand All @@ -106,11 +81,6 @@ object LabyrinthExamplesRunner extends LabyrinthAware {
cfg <- parser.parse(args, Config())
cmd <- cfg.command
res <- cmd match {
// Graphs
case "connected-components" =>
Some(connectedComponents(cfg)(flinkEnv(cfg)))
case "triangle-count" =>
Some(triangleCount(cfg)(flinkEnv(cfg)))
// Text
case "word-count" =>
Some(wordCount(cfg)(flinkEnv(cfg)))
Expand All @@ -126,33 +96,8 @@ object LabyrinthExamplesRunner extends LabyrinthAware {
implicit def breezeVectorCSVConverter[V: CSVColumn : ClassTag]: CSVConverter[Vec[V]] =
CSVConverter.iso[Array[V], Vec[V]](Iso.make(Vec.apply, _.toArray), implicitly)

// Graphs

def connectedComponents(c: Config)(implicit flink: StreamExecutionEnvironment): Unit =
emma.onLabyrinth {
// read in set of edges to be used as input
val edges = DataBag.readCSV[Edge[Long]](c.input, c.csv)
// build the connected components
val paths = ConnectedComponents(edges)
// write the results into a file
paths.writeCSV(c.output, c.csv)
}

def triangleCount(c: Config)(implicit flink: StreamExecutionEnvironment): Unit =
emma.onLabyrinth {
// convert a bag of directed edges into an undirected set
val incoming = DataBag.readCSV[Edge[Long]](c.input, c.csv)
val outgoing = incoming.map(e => Edge(e.dst, e.src))
val edges = (incoming union outgoing).distinct
// compute all triangles
val triangles = EnumerateTriangles(edges)
// count the number of enumerated triangles
val triangleCount = triangles.size
// print the result to the console
println(s"The number of triangles in the graph is $triangleCount")
}

def wordCount(c: Config)(implicit flink: StreamExecutionEnvironment): Unit =

emma.onLabyrinth {
// read the input files and split them into lowercased words
val docs = DataBag.readText(c.input)
Expand Down
@@ -0,0 +1,27 @@
/*
* Copyright © 2014 TU Berlin (emma@dima.tu-berlin.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.emmalanguage
package examples

import api._

class ClickCountDiffsIntegrationSpec extends BaseClickCountDiffsIntegrationSpec with LabyrinthAware {

override def clickCountDiffs(baseInName: String, numDays: Int): Unit =
withDefaultFlinkStreamEnv(implicit flink => emma.onLabyrinth {
ClickCountDiffs(baseInName, numDays)
})
}
Expand Up @@ -22,12 +22,12 @@ import api._
object ClickCountDiffs {

def apply(baseName: String, numDays: Int): Unit = {

val baseInName = baseName

// (no join with pageAttributes yet)
var yesterdayCounts: DataBag[(Int, Int)] = DataBag.empty // should be null, but the compilation doesn't handle it
for(day <- 1 to numDays) {
var day = 1
while (day <= numDays) {
// Read all page-visits for this day
val visits: DataBag[Int] = DataBag.readText(baseInName + day).map(Integer.parseInt) // integer pageIDs
// Count how many times each page was visited:
Expand All @@ -44,11 +44,14 @@ object ClickCountDiffs {
if c._1 == y._1
} yield Math.abs(c._2 - y._2)
val sum = diffs.reduce(0)((x: Int, y: Int) => x + y)
//println(sum)
scala.tools.nsc.io.File(baseName + day + ".out").writeAll(sum.toString)
DataBag(Seq(sum)).writeCSV(baseName + day + ".out", csvConfig)
}
yesterdayCounts = counts
day += 1
}

}

val csvConfig = CSV()

}
5 changes: 5 additions & 0 deletions emma-labyrinth/pom.xml
Expand Up @@ -92,6 +92,11 @@
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
Expand Down
Expand Up @@ -192,6 +192,8 @@ public void open() throws Exception {

cb = new MyCFLCallback();
cflMan.subscribe(cb);

System.out.println("===Name: " + this.name);
}

private int getMul() {
Expand Down Expand Up @@ -305,7 +307,7 @@ private class MyCollector implements BagOperatorOutputCollector<OUT> {
@Override
public void collectElement(OUT e) {
numElements++;
assert outs.size() != 0; // If the operator wants to emit an element but there are no outs, then we just probably forgot to call .out()
//assert outs.size() != 0; // If the operator wants to emit an element but there are no outs, then we just probably forgot to call .out()
for(Out o: outs) {
o.collectElement(e);
}
Expand Down
Expand Up @@ -54,7 +54,7 @@ public void setNumToSubscribe(int totalPara) {



public static final boolean vlog = false;
public static final boolean vlog = true;

public static final boolean logStartEnd = false;
}
Expand Up @@ -233,7 +233,8 @@ public ElementOrEvent<T> deserialize(ElementOrEvent<T> r, DataInputView s) throw

private void deserializeEvent(ElementOrEvent<T> r, DataInputView s) throws IOException {
r.event = new Event();
r.event.type = Event.enumConsts[s.readInt()];
int type = s.readInt();
r.event.type = Event.enumConsts[type];
r.event.assumedTargetPara = s.readShort();
r.event.bagID = new BagID();
r.event.bagID.cflSize = s.readInt();
Expand Down
@@ -0,0 +1,93 @@
/*
* Copyright © 2014 TU Berlin (emma@dima.tu-berlin.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.emmalanguage.labyrinth;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import java.util.Objects;

public class ElementOrEventTypeInfo<T> extends TypeInformation<ElementOrEvent<T>> {

private final TypeInformation<T> elemTypeInfo;

public ElementOrEventTypeInfo(TypeInformation<T> elemTypeInfo) {
this.elemTypeInfo = elemTypeInfo;
}

@Override
public boolean isBasicType() {
return false;
}

@Override
public boolean isTupleType() {
return false;
}

@Override
public int getArity() {
return 1;
}

@Override
public int getTotalFields() {
return elemTypeInfo.getTotalFields();
}

@Override
public Class<ElementOrEvent<T>> getTypeClass() {
return (Class<ElementOrEvent<T>>)(Class<?>)ElementOrEvent.class;
}

@Override
public boolean isKeyType() {
return false;
}

@Override
public TypeSerializer<ElementOrEvent<T>> createSerializer(ExecutionConfig config) {
return new ElementOrEvent.ElementOrEventSerializer<>(elemTypeInfo.createSerializer(config));
}


@Override
public String toString() {
return "ElementOrEventTypeInfo{" +
"elemTypeInfo=" + elemTypeInfo +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ElementOrEventTypeInfo<?> that = (ElementOrEventTypeInfo<?>) o;
return Objects.equals(elemTypeInfo, that.elemTypeInfo);
}

@Override
public int hashCode() {
return Objects.hash(elemTypeInfo)*2;
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof ElementOrEventTypeInfo;
}
}
Expand Up @@ -22,6 +22,7 @@
import org.emmalanguage.labyrinth.partitioners.FlinkPartitioner;
import org.emmalanguage.labyrinth.partitioners.Partitioner;
import org.emmalanguage.labyrinth.util.LogicalInputIdFiller;
import org.emmalanguage.labyrinth.util.Nothing;
import org.emmalanguage.labyrinth.util.Util;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down
Expand Up @@ -90,7 +90,7 @@ public void closeInBag(int inputId) {
FileSystem fs = FileSystem.get(new URI(currentFileName));
//BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(currentFileName))));

//PrintWriter writer = new PrintWriter(currentFileName, "UTF-8");
//PrintWriter csvWriter = new PrintWriter(currentFileName, "UTF-8");
PrintWriter writer = new PrintWriter(fs.create(new Path(currentFileName), FileSystem.WriteMode.OVERWRITE));
for (Integer e : buffer) {
writer.println(Integer.toString(e));
Expand Down
Expand Up @@ -92,7 +92,7 @@ public void closeInBag(int inputId) {
FileSystem fs = FileSystem.get(new URI(currentFileName));
//BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(currentFileName))));

//PrintWriter writer = new PrintWriter(currentFileName, "UTF-8");
//PrintWriter csvWriter = new PrintWriter(currentFileName, "UTF-8");
PrintWriter writer = new PrintWriter(fs.create(new Path(currentFileName), FileSystem.WriteMode.OVERWRITE));
for (T e : buffer) {
writer.println(e.toString());
Expand Down
@@ -0,0 +1,29 @@
/*
* Copyright © 2014 TU Berlin (emma@dima.tu-berlin.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.emmalanguage.labyrinth.operators;

import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;

public class CFAwareTextSource extends CFAwareFileSourcePara<String, FileInputSplit> {
@Override
protected InputFormat<String, FileInputSplit> getInputFormatFromFilename(String filename) {
return new TextInputFormat(new Path(filename));
}
}
@@ -0,0 +1,46 @@
/*
* Copyright © 2014 TU Berlin (emma@dima.tu-berlin.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.emmalanguage.labyrinth.operators;

import org.emmalanguage.labyrinth.util.Unit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConditionNodeScala extends SingletonBagOperator<scala.Boolean, Unit> {

protected static final Logger LOG = LoggerFactory.getLogger(ConditionNodeScala.class);

private final int[] trueBranchBbIds;
private final int[] falseBranchBbIds;

public ConditionNodeScala(int trueBranchBbId, int falseBranchBbId) {
this(new int[]{trueBranchBbId}, new int[]{falseBranchBbId});
}

public ConditionNodeScala(int[] trueBranchBbIds, int[] falseBranchBbIds) {
this.trueBranchBbIds = trueBranchBbIds;
this.falseBranchBbIds = falseBranchBbIds;
}

@Override
public void pushInElement(scala.Boolean e, int logicalInputId) {
super.pushInElement(e, logicalInputId);
for (int b: scala.Boolean.unbox(e) ? trueBranchBbIds : falseBranchBbIds) {
out.appendToCfl(b);
}
}
}

0 comments on commit 5836fdb

Please sign in to comment.