From c0c8463521912d021c392c2c5edc254fee267eb8 Mon Sep 17 00:00:00 2001 From: vasia Date: Fri, 31 Jul 2015 22:12:18 +0200 Subject: [PATCH] [FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example --- .../flink/graph/example/MusicProfiles.java | 41 ++++++++++++------- .../test/example/MusicProfilesITCase.java | 2 +- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java index a5352162719d7..0fc45bdc9120b 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -46,15 +46,17 @@ public class MusicProfiles implements ProgramDescription { /** - * This example demonstrates how to mix the "record" Flink API with the - * graph API. The input is a set triplets and - * a set of bad records,i.e. song ids that should not be trusted. Initially, - * we use the record API to filter out the bad records. Then, we use the - * graph API to create a user -> song weighted bipartite graph and compute - * the top song (most listened) per user. Then, we use the record API again, - * to create a user-user similarity graph, based on common songs, where two - * users that listen to the same song are connected. Finally, we use the - * graph API to run the label propagation community detection algorithm on + * This example demonstrates how to mix the DataSet Flink API with the Gelly API. + * The input is a set triplets and + * a set of bad records, i.e. song ids that should not be trusted. + * Initially, we use the DataSet API to filter out the bad records. + * Then, we use Gelly to create a user -> song weighted bipartite graph and compute + * the top song (most listened) per user. + * Then, we use the DataSet API again, to create a user-user similarity graph, + * based on common songs, where users that are listeners of the same song + * are connected. A user-defined threshold on the playcount value + * defines when a user is considered to be a listener of a song. + * Finally, we use the graph API to run the label propagation community detection algorithm on * the similarity graph. * * The triplets input is expected to be given as one triplet per line, @@ -116,7 +118,13 @@ public static void main(String[] args) throws Exception { * create an edge between each pair of its in-neighbors. */ DataSet> similarUsers = userSongGraph - .getEdges().groupBy(1) + .getEdges() + // filter out user-song edges that are below the playcount threshold + .filter(new FilterFunction>() { + public boolean filter(Edge edge) { + return (edge.getValue() > playcountThreshold); + } + }).groupBy(1) .reduceGroup(new CreateSimilarUserEdges()).distinct(); Graph similarUsersGraph = Graph.fromDataSet(similarUsers, @@ -241,6 +249,8 @@ public String getDescription() { private static String topTracksOutputPath = null; + private static int playcountThreshold = 0; + private static String communitiesOutputPath = null; private static int maxIterations = 10; @@ -248,10 +258,10 @@ public String getDescription() { private static boolean parseParameters(String[] args) { if(args.length > 0) { - if(args.length != 5) { + if(args.length != 6) { System.err.println("Usage: MusicProfiles " + " " - + " "); + + " "); return false; } @@ -259,15 +269,16 @@ private static boolean parseParameters(String[] args) { userSongTripletsInputPath = args[0]; mismatchesInputPath = args[1]; topTracksOutputPath = args[2]; - communitiesOutputPath = args[3]; - maxIterations = Integer.parseInt(args[4]); + playcountThreshold = Integer.parseInt(args[3]); + communitiesOutputPath = args[4]; + maxIterations = Integer.parseInt(args[5]); } else { System.out.println("Executing Music Profiles example with default parameters and built-in default data."); System.out.println(" Provide parameters to read input data from files."); System.out.println(" See the documentation for the correct format of input files."); System.out.println("Usage: MusicProfiles " + " " - + " "); + + " "); } return true; } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java index 0410d417472e7..5aa9f26f3785a 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java @@ -73,7 +73,7 @@ public void before() throws Exception { @Test public void testMusicProfilesExample() throws Exception { - MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, communitiesResultPath, + MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath, MusicProfilesData.MAX_ITERATIONS + ""}); expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT; }