Skip to content

Commit 7ba2d64

Browse files
author
muencseb
committedJun 29, 2018
Initial commit
0 parents  commit 7ba2d64

File tree

6 files changed

+198
-0
lines changed

6 files changed

+198
-0
lines changed
 

‎pom.xml

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>ch.mue.seb</groupId>
8+
<artifactId>javarank</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
<build>
11+
<plugins>
12+
<plugin>
13+
<groupId>org.apache.maven.plugins</groupId>
14+
<artifactId>maven-compiler-plugin</artifactId>
15+
<configuration>
16+
<source>8</source>
17+
<target>8</target>
18+
</configuration>
19+
</plugin>
20+
</plugins>
21+
</build>
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.apache.spark</groupId>
25+
<artifactId>spark-core_2.10</artifactId>
26+
<version>1.6.1</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.spark</groupId>
30+
<artifactId>spark-sql_2.10</artifactId>
31+
<version>1.6.1</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.spark</groupId>
35+
<artifactId>spark-mllib_2.10</artifactId>
36+
<version>1.6.1</version>
37+
</dependency>
38+
</dependencies>
39+
40+
41+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package recommendation.data;
2+
3+
public class InputRating {
4+
private Integer userId;
5+
private Integer productId;
6+
private Integer rating;
7+
8+
public InputRating(Integer userId, Integer productId, Integer rating) {
9+
this.userId = userId;
10+
this.productId = productId;
11+
this.rating = rating;
12+
}
13+
14+
public Integer getUserId() {
15+
return userId;
16+
}
17+
18+
public void setUserId(Integer userId) {
19+
this.userId = userId;
20+
}
21+
22+
public Integer getProductId() {
23+
return productId;
24+
}
25+
26+
public void setProductId(Integer productId) {
27+
this.productId = productId;
28+
}
29+
30+
public Integer getRating() {
31+
return rating;
32+
}
33+
34+
public void setRating(Integer rating) {
35+
this.rating = rating;
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package recommendation.data;
2+
3+
import org.apache.spark.api.java.JavaRDD;
4+
import org.apache.spark.api.java.JavaSparkContext;
5+
6+
import java.util.Collection;
7+
import java.util.function.Function;
8+
import java.util.stream.Collectors;
9+
10+
public class RDDHelper {
11+
12+
private static JavaSparkContext jsc;
13+
14+
public RDDHelper(JavaSparkContext javaSparkContext) {
15+
jsc = javaSparkContext;
16+
}
17+
18+
public <T> JavaRDD<T> getRddFromCollection(Collection<T> collection) {
19+
return jsc.parallelize(collection.stream().parallel().collect(Collectors.toList())).cache();
20+
}
21+
22+
public <T> JavaRDD<T> getRddFromCollection(Collection<?> collection, Function<? super Object, ? extends T> mapper) {
23+
return getRddFromCollection(collection.stream().parallel().map(mapper).collect(Collectors.toList()));
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package recommendation.exceptions;
2+
3+
public class ModelNotReadyException extends Exception {
4+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package recommendation.model;
2+
3+
import org.apache.spark.api.java.JavaRDD;
4+
import org.apache.spark.api.java.JavaSparkContext;
5+
import org.apache.spark.mllib.recommendation.ALS;
6+
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
7+
import org.apache.spark.mllib.recommendation.Rating;
8+
import recommendation.data.InputRating;
9+
import recommendation.data.RDDHelper;
10+
import recommendation.exceptions.ModelNotReadyException;
11+
12+
import java.util.Collection;
13+
import java.util.List;
14+
import java.util.concurrent.locks.ReentrantReadWriteLock;
15+
import java.util.stream.Collectors;
16+
17+
public class RecommendationMlModel {
18+
19+
public static final String SPARK_APP_NAME = "Recommendation Engine";
20+
public static final String SPARK_MASTER = "local";
21+
22+
private ALS als = new ALS();
23+
private MatrixFactorizationModel model;
24+
25+
private RDDHelper rddHelper = new RDDHelper(new JavaSparkContext(SPARK_MASTER, SPARK_APP_NAME));
26+
27+
private ReentrantReadWriteLock mutex = new ReentrantReadWriteLock();
28+
29+
private boolean modelIsReady = false;
30+
31+
public boolean isModelReady() {
32+
return modelIsReady;
33+
}
34+
35+
public Double getInterestPrediction(Integer userId, Integer eventId) throws ModelNotReadyException {
36+
if (!modelIsReady)
37+
throw new ModelNotReadyException();
38+
mutex.readLock().lock();
39+
Double prediction = model.predict(userId, eventId);
40+
mutex.readLock().unlock();
41+
return prediction;
42+
}
43+
44+
public void createModel(Collection<InputRating> ratings) {
45+
trainModel(ratings);
46+
}
47+
48+
private void trainModel(Collection<InputRating> ratings) {
49+
JavaRDD<Rating> ratingRDD = rddHelper.getRddFromCollection(createSparkRating(ratings)).cache();
50+
if (ratingRDD.isEmpty())
51+
return;
52+
mutex.writeLock().lock();
53+
model = als.setRank(10).setIterations(10).run(ratingRDD);
54+
mutex.writeLock().unlock();
55+
modelIsReady = true;
56+
}
57+
58+
59+
private List<Rating> createSparkRating(Collection<InputRating> inputRatings) {
60+
return inputRatings
61+
.stream()
62+
.map(ir -> new Rating(ir.getUserId(), ir.getProductId(), ir.getProductId()))
63+
.collect(Collectors.toList());
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package recommendation.service;
2+
3+
import recommendation.data.InputRating;
4+
import recommendation.exceptions.ModelNotReadyException;
5+
import recommendation.model.RecommendationMlModel;
6+
7+
import java.util.Collection;
8+
9+
public class RecommendationService {
10+
11+
RecommendationMlModel recommendationMlModel = new RecommendationMlModel();
12+
13+
public RecommendationService(Collection<InputRating> ratings) {
14+
recommendationMlModel.createModel(ratings);
15+
}
16+
17+
public boolean isModelReady() {
18+
return recommendationMlModel.isModelReady();
19+
}
20+
21+
public Double getPrediction(Integer userId, Integer productId) throws ModelNotReadyException {
22+
return recommendationMlModel.getInterestPrediction(userId, productId);
23+
}
24+
25+
26+
}

0 commit comments

Comments
 (0)
Failed to load comments.