Skip to content

Commit

Permalink
[FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles …
Browse files Browse the repository at this point in the history
…example

This closes #968
  • Loading branch information
vasia committed Aug 7, 2015
1 parent 441ebf1 commit f1dd914
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
Expand Up @@ -46,15 +46,17 @@
public class MusicProfiles implements ProgramDescription { public class MusicProfiles implements ProgramDescription {


/** /**
* This example demonstrates how to mix the "record" Flink API with the * This example demonstrates how to mix the DataSet Flink API with the Gelly API.
* graph API. The input is a set <userId - songId - playCount> triplets and * The input is a set <userId - songId - playCount> triplets and
* a set of bad records,i.e. song ids that should not be trusted. Initially, * a set of bad records, i.e. song ids that should not be trusted.
* we use the record API to filter out the bad records. Then, we use the * Initially, we use the DataSet API to filter out the bad records.
* graph API to create a user -> song weighted bipartite graph and compute * Then, we use Gelly to create a user -> song weighted bipartite graph and compute
* the top song (most listened) per user. Then, we use the record API again, * the top song (most listened) per user.
* to create a user-user similarity graph, based on common songs, where two * Then, we use the DataSet API again, to create a user-user similarity graph,
* users that listen to the same song are connected. Finally, we use the * based on common songs, where users that are listeners of the same song
* graph API to run the label propagation community detection algorithm on * are connected. A user-defined threshold on the playcount value
* defines when a user is considered to be a listener of a song.
* Finally, we use the graph API to run the label propagation community detection algorithm on
* the similarity graph. * the similarity graph.
* *
* The triplets input is expected to be given as one triplet per line, * The triplets input is expected to be given as one triplet per line,
Expand Down Expand Up @@ -116,7 +118,13 @@ public static void main(String[] args) throws Exception {
* create an edge between each pair of its in-neighbors. * create an edge between each pair of its in-neighbors.
*/ */
DataSet<Edge<String, NullValue>> similarUsers = userSongGraph DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
.getEdges().groupBy(1) .getEdges()
// filter out user-song edges that are below the playcount threshold
.filter(new FilterFunction<Edge<String, Integer>>() {
public boolean filter(Edge<String, Integer> edge) {
return (edge.getValue() > playcountThreshold);
}
}).groupBy(1)
.reduceGroup(new CreateSimilarUserEdges()).distinct(); .reduceGroup(new CreateSimilarUserEdges()).distinct();


Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers, Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers,
Expand Down Expand Up @@ -241,33 +249,36 @@ public String getDescription() {


private static String topTracksOutputPath = null; private static String topTracksOutputPath = null;


private static int playcountThreshold = 0;

private static String communitiesOutputPath = null; private static String communitiesOutputPath = null;


private static int maxIterations = 10; private static int maxIterations = 10;


private static boolean parseParameters(String[] args) { private static boolean parseParameters(String[] args) {


if(args.length > 0) { if(args.length > 0) {
if(args.length != 5) { if(args.length != 6) {
System.err.println("Usage: MusicProfiles <input user song triplets path>" + System.err.println("Usage: MusicProfiles <input user song triplets path>" +
" <input song mismatches path> <output top tracks path> " " <input song mismatches path> <output top tracks path> "
+ "<output communities path> <num iterations>"); + "<playcount threshold> <output communities path> <num iterations>");
return false; return false;
} }


fileOutput = true; fileOutput = true;
userSongTripletsInputPath = args[0]; userSongTripletsInputPath = args[0];
mismatchesInputPath = args[1]; mismatchesInputPath = args[1];
topTracksOutputPath = args[2]; topTracksOutputPath = args[2];
communitiesOutputPath = args[3]; playcountThreshold = Integer.parseInt(args[3]);
maxIterations = Integer.parseInt(args[4]); communitiesOutputPath = args[4];
maxIterations = Integer.parseInt(args[5]);
} else { } else {
System.out.println("Executing Music Profiles example with default parameters and built-in default data."); System.out.println("Executing Music Profiles example with default parameters and built-in default data.");
System.out.println(" Provide parameters to read input data from files."); System.out.println(" Provide parameters to read input data from files.");
System.out.println(" See the documentation for the correct format of input files."); System.out.println(" See the documentation for the correct format of input files.");
System.out.println("Usage: MusicProfiles <input user song triplets path>" + System.out.println("Usage: MusicProfiles <input user song triplets path>" +
" <input song mismatches path> <output top tracks path> " " <input song mismatches path> <output top tracks path> "
+ "<output communities path> <num iterations>"); + "<playcount threshold> <output communities path> <num iterations>");
} }
return true; return true;
} }
Expand Down
Expand Up @@ -73,7 +73,7 @@ public void before() throws Exception {


@Test @Test
public void testMusicProfilesExample() throws Exception { public void testMusicProfilesExample() throws Exception {
MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, communitiesResultPath, MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath,
MusicProfilesData.MAX_ITERATIONS + ""}); MusicProfilesData.MAX_ITERATIONS + ""});
expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT; expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
} }
Expand Down

0 comments on commit f1dd914

Please sign in to comment.