Skip to content

Commit

Permalink
Merge pull request #16 from Flux-Coordinator/feature/measurements-str…
Browse files Browse the repository at this point in the history
…eaming

Feature/measurements streaming
  • Loading branch information
eluchsinger authored May 1, 2018
2 parents 4e71e14 + 4cea5af commit 8fa5130
Show file tree
Hide file tree
Showing 18 changed files with 261 additions and 90 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ addons:
sonarqube:
organization: flux-coordinator

services:
- mongodb

before_script:
- sleep 15

script:
- sbt ++$TRAVIS_SCALA_VERSION test
- sonar-scanner
Expand Down
34 changes: 34 additions & 0 deletions app/actors/measurements/MeasurementActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package actors.measurements;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import models.Reading;

public class MeasurementActor extends AbstractActor {
// HACK: ELU Had to make the Actor Ref public static, until I find a way to find the actor inside the actorsystem (and tell him something).
public static ActorRef out;

public static Props props(ActorRef out) {
return Props.create(MeasurementActor.class, out);
}

public MeasurementActor(final ActorRef out) {
MeasurementActor.out = out;
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(Reading.class, putReading -> {
out.tell(putReading, self());
})
.build();
}

@Override
public void postStop() throws Exception {
super.postStop();
out = null;
}
}
35 changes: 20 additions & 15 deletions app/controllers/AdminController.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.inject.Inject;
import models.MeasurementMetadata;
import models.MeasurementReadings;
import models.Project;
import org.bson.types.ObjectId;
import play.mvc.Controller;
Expand All @@ -10,13 +11,14 @@
import repositories.measurements.MeasurementsRepository;
import repositories.projects.ProjectsRepository;

import java.util.ArrayList;
import java.util.List;

public class AdminController extends Controller {
private static final int AMOUNT_OF_PROJECTS = 10;
private static final int AMOUNT_OF_ROOMS_PER_PROJECT = 5;
private static final int AMOUNT_OF_MEASUREMENTS_PER_ROOM = 10;
private static final int AMOUNT_OF_READINGS_PER_MEASUREMENT = 100;
private static final int AMOUNT_OF_READINGS_PER_MEASUREMENT = 1;

private final ProjectsRepository projectsRepository;
private final MeasurementsRepository measurementsRepository;
Expand All @@ -29,21 +31,24 @@ public AdminController(final ProjectsRepository projectsRepository, final Measur

public Result resetData() {
this.projectsRepository.resetRepository();
this.measurementsRepository.resetRepository();

final List<Project> projects = DataGenerator.generateProjects(AMOUNT_OF_PROJECTS, AMOUNT_OF_ROOMS_PER_PROJECT);


projects.forEach(this.projectsRepository::addProject);
this.projectsRepository.getProjects().forEachRemaining(project -> {
project.getRooms().forEach(room -> {
final List<MeasurementMetadata> roomMeasurements = DataGenerator.generateMeasurements(AMOUNT_OF_MEASUREMENTS_PER_ROOM);
roomMeasurements.forEach(measurementMetadata -> {
final ObjectId measurementId = this.projectsRepository.addMeasurement(project.getProjectId(), room.getName(), measurementMetadata);
this.measurementsRepository.addReadings(measurementId, DataGenerator.generateReadings(AMOUNT_OF_READINGS_PER_MEASUREMENT));
});
});
});
final List<MeasurementReadings> measurementReadings = new ArrayList<>(AMOUNT_OF_PROJECTS * AMOUNT_OF_ROOMS_PER_PROJECT * AMOUNT_OF_MEASUREMENTS_PER_ROOM);

projects.parallelStream()
.forEach(project -> project.getRooms().forEach(room -> {
final List<MeasurementMetadata> roomMeasurements = DataGenerator.generateMeasurements(AMOUNT_OF_MEASUREMENTS_PER_ROOM);
roomMeasurements.forEach(measurementMetadata -> {
room.getMeasurements().add(measurementMetadata);
final MeasurementReadings readings = new MeasurementReadings();
readings.setMeasurementId(measurementMetadata.getMeasurementId());
readings.getReadings().addAll(DataGenerator.generateReadings(AMOUNT_OF_READINGS_PER_MEASUREMENT));
measurementReadings.add(readings);
});
})
);

projectsRepository.addProjects(projects);
measurementsRepository.addMeasurements(measurementReadings);

return ok("Created " + AMOUNT_OF_PROJECTS + " projects with " + AMOUNT_OF_ROOMS_PER_PROJECT + " rooms each," +
" " + AMOUNT_OF_MEASUREMENTS_PER_ROOM + " measurements per room" +
Expand Down
51 changes: 33 additions & 18 deletions app/controllers/MeasurementsController.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package controllers;

import com.fasterxml.jackson.core.type.TypeReference;
import actors.measurements.MeasurementActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import java.io.IOException;
import models.MeasurementReadings;
import models.Reading;
import org.bson.types.ObjectId;
import play.Logger;
import play.libs.Json;
import play.libs.concurrent.HttpExecutionContext;
import play.mvc.BodyParser;
import play.mvc.Controller;
import play.mvc.Result;
import play.mvc.Results;
import play.libs.streams.ActorFlow;
import play.mvc.*;
import repositories.measurements.MeasurementsRepository;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -27,13 +27,19 @@
public class MeasurementsController extends Controller {
private final HttpExecutionContext httpExecutionContext;
private final MeasurementsRepository measurementsRepository;
private final ActorSystem actorSystem;
private final Materializer materializer;

private MeasurementReadings activeMeasurement;
private Flow measurementStreamFlow;

@Inject
public MeasurementsController(final HttpExecutionContext httpExecutionContext, final MeasurementsRepository measurementsRepository) {
public MeasurementsController(final HttpExecutionContext httpExecutionContext, final MeasurementsRepository measurementsRepository,
final ActorSystem actorSystem, final Materializer materializer) {
this.httpExecutionContext = httpExecutionContext;
this.measurementsRepository = measurementsRepository;
this.actorSystem = actorSystem;
this.materializer = materializer;
}

public CompletionStage<Result> getMeasurementById(final String measurementId) {
Expand Down Expand Up @@ -98,9 +104,11 @@ public Result stopMeasurement() {
return ok();
}

public Result isMeasurementActive() {
public Result getActiveMeasurement() {
if(this.activeMeasurement != null){
return Results.ok();
final MeasurementReadings activeMeasurement = this.measurementsRepository
.getMeasurementReadingsById(this.activeMeasurement.getMeasurementId());
return Results.ok(Json.toJson(activeMeasurement));
}
return Results.noContent();
}
Expand All @@ -109,20 +117,27 @@ public Result isMeasurementActive() {
public CompletableFuture<Result> addReading() {
return CompletableFuture.supplyAsync(() -> {
if(this.activeMeasurement == null) {
return Results.badRequest();
return Results.noContent();
}
try {
final JsonNode json = request().body().asJson();
final ObjectMapper mapper = new ObjectMapper();
final ObjectReader reader = mapper.readerFor(new TypeReference<List<Reading>>() {});
final List<Reading> readings = reader.readValue(json);
final Reading[] array = Json.fromJson(json, Reading[].class);
final List<Reading> readings = new ArrayList<>(Arrays.asList(array));
measurementsRepository.addReadings(activeMeasurement.getMeasurementId(), readings);
} catch (IOException ex) {
if(measurementStreamFlow != null && MeasurementActor.out != null) {
MeasurementActor.out.tell(Json.toJson(readings), ActorRef.noSender());
}
return ok();
}
catch (final Exception ex) {
Logger.error("Error while adding new readings to measurement" + this.activeMeasurement.getMeasurementId(), ex);
return badRequest("Error while adding new readings.");
}

return Results.ok();
}, httpExecutionContext.current());
}

public WebSocket streamMeasurements() {
measurementStreamFlow = ActorFlow.actorRef(MeasurementActor::props, actorSystem, materializer);
return WebSocket.Json.accept(request -> measurementStreamFlow);
}
}
1 change: 0 additions & 1 deletion app/controllers/ProjectsController.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import play.mvc.BodyParser;
import play.mvc.Controller;
import play.mvc.Result;
import repositories.generator.DataGenerator;
import repositories.projects.ProjectsRepository;

import javax.inject.Inject;
Expand Down
1 change: 0 additions & 1 deletion app/models/MeasurementReadings.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package models;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import org.bson.codecs.pojo.annotations.BsonId;
Expand Down
1 change: 0 additions & 1 deletion app/models/Project.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package models;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import org.bson.codecs.pojo.annotations.BsonId;
Expand Down
12 changes: 8 additions & 4 deletions app/repositories/generator/DataGenerator.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package repositories.generator;

import models.*;
import org.bson.types.ObjectId;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
Expand All @@ -13,7 +14,9 @@
* Helper class to generate data more easily.
*/
public class DataGenerator {
private final static Random random = new Random();
private static final Random random = new Random();

private DataGenerator() { }

public static List<Project> generateProjects(final int amountOfProjects, final int roomsPerProject) {
try {
Expand All @@ -33,7 +36,7 @@ public static Project generateProject(final int rooms) {
try {
final Project project = new Project();

project.setName("Project-" + Math.abs(random.nextInt()));
project.setName("Project-" + random.nextInt(Integer.MAX_VALUE));
project.setDescription("This is an example project and was automatically generated on " + getLocalDateTime() + ".");
project.setRooms(generateRooms(rooms));

Expand Down Expand Up @@ -61,7 +64,7 @@ public static Room generateRoom() {
try {
final Room room = new Room();

room.setName("Room-" + Math.abs(random.nextInt()));
room.setName("Room-" + random.nextInt(Integer.MAX_VALUE));
room.setDescription("This is an example room and was automatically generated on " + getLocalDateTime() + ".");
room.setLength(random.nextDouble() * 100);
room.setWidth(random.nextDouble() * 100);
Expand Down Expand Up @@ -122,7 +125,7 @@ public static AnchorPosition generateAnchorPosition() {
try {
final AnchorPosition position = new AnchorPosition();

position.setName("Anker-" + Math.abs(random.nextInt()));
position.setName("Anker-" + random.nextInt(Integer.MAX_VALUE));
position.setXPosition(random.nextDouble());
position.setYPosition(random.nextDouble());
position.setZPosition(random.nextDouble());
Expand All @@ -139,6 +142,7 @@ public static MeasurementMetadata generateMeasurementMetadata() {
try {
final MeasurementMetadata measurementMetadata = new MeasurementMetadata();

measurementMetadata.setMeasurementId(new ObjectId());
measurementMetadata.setCreator("Generated");
measurementMetadata.setDescription("Generated automatically");
measurementMetadata.setName("AutoGenenerated" + random.nextInt());
Expand Down
3 changes: 2 additions & 1 deletion app/repositories/measurements/MeasurementsRepository.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package repositories.measurements;

import models.MeasurementMetadata;
import models.MeasurementReadings;
import models.Reading;
import org.bson.types.ObjectId;
Expand All @@ -19,4 +18,6 @@ public interface MeasurementsRepository {
void addReadings(final ObjectId measurementId, final List<Reading> readings);

void resetRepository();

void addMeasurements(final List<MeasurementReadings> measurementReadings);
}
11 changes: 6 additions & 5 deletions app/repositories/measurements/MeasurementsRepositoryMock.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package repositories.measurements;

import com.google.inject.Singleton;
import models.MeasurementMetadata;
import models.MeasurementReadings;
import models.Reading;
import org.bson.types.ObjectId;
import repositories.generator.DataGenerator;

import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -47,12 +45,15 @@ public void addReadings(final ObjectId measurementId, final List<Reading> readin
final Optional<MeasurementReadings> measurementReadings = readingsList.parallelStream()
.filter(m -> m.getMeasurementId().equals(measurementId))
.findAny();

measurementReadings.get().getReadings().addAll(readings);
measurementReadings.orElseThrow(() -> new NullPointerException("Measurement to add the reading was not found.")).getReadings().addAll(readings);
}

@Override
public void resetRepository() {
this.readingsList.clear();
}

@Override
public void addMeasurements(final List<MeasurementReadings> measurementReadings) {
this.readingsList.addAll(measurementReadings);
}
}
11 changes: 8 additions & 3 deletions app/repositories/measurements/MeasurementsRepositoryMongo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

@Singleton
public class MeasurementsRepositoryMongo implements MeasurementsRepository {
private final static String DATABASE_NAME = "flux";
private final static String COLLECTION_NAME = "measurements";
private static final String DATABASE_NAME = "flux";
private static final String COLLECTION_NAME = "measurements";

private final MongoClient mongoClient;

Expand All @@ -45,7 +45,7 @@ public MeasurementReadings getMeasurementReadingsById(final ObjectId measurement

@Override
public ObjectId addMeasurement(final MeasurementReadings readings) {
if(readings.getMeasurementId() != null) {
if(readings.getMeasurementId() == null) {
readings.setMeasurementId(new ObjectId());
}
getCollection().insertOne(readings);
Expand All @@ -62,4 +62,9 @@ public void resetRepository() {
getCollection().drop();
this.mongoClient.getDatabase(DATABASE_NAME).createCollection(COLLECTION_NAME);
}

@Override
public void addMeasurements(final List<MeasurementReadings> measurementReadings) {
this.getCollection().insertMany(measurementReadings);
}
}
3 changes: 2 additions & 1 deletion app/repositories/projects/ProjectsRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import models.MeasurementMetadata;
import models.Project;
import models.Reading;
import org.bson.types.ObjectId;

import java.util.Iterator;
Expand All @@ -13,6 +12,8 @@ public interface ProjectsRepository {

ObjectId addProject(final Project project);

void addProjects(final List<Project> projects);

Project getProjectById(final ObjectId projectId);

ObjectId addMeasurement(final ObjectId projectId, final String roomName, final MeasurementMetadata measurementMetadata);
Expand Down
Loading

0 comments on commit 8fa5130

Please sign in to comment.