Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@
public class MusicProfiles implements ProgramDescription {

/**
* This example demonstrates how to mix the "record" Flink API with the
* graph API. The input is a set <userId - songId - playCount> triplets and
* a set of bad records,i.e. song ids that should not be trusted. Initially,
* we use the record API to filter out the bad records. Then, we use the
* graph API to create a user -> song weighted bipartite graph and compute
* the top song (most listened) per user. Then, we use the record API again,
* to create a user-user similarity graph, based on common songs, where two
* users that listen to the same song are connected. Finally, we use the
* graph API to run the label propagation community detection algorithm on
* This example demonstrates how to mix the DataSet Flink API with the Gelly API.
* The input is a set <userId - songId - playCount> triplets and
* a set of bad records, i.e. song ids that should not be trusted.
* Initially, we use the DataSet API to filter out the bad records.
* Then, we use Gelly to create a user -> song weighted bipartite graph and compute
* the top song (most listened) per user.
* Then, we use the DataSet API again, to create a user-user similarity graph,
* based on common songs, where users that are listeners of the same song
* are connected. A user-defined threshold on the playcount value
* defines when a user is considered to be a listener of a song.
* Finally, we use the graph API to run the label propagation community detection algorithm on
* the similarity graph.
*
* The triplets input is expected to be given as one triplet per line,
Expand Down Expand Up @@ -116,7 +118,13 @@ public static void main(String[] args) throws Exception {
* create an edge between each pair of its in-neighbors.
*/
DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
.getEdges().groupBy(1)
.getEdges()
// filter out user-song edges that are below the playcount threshold
.filter(new FilterFunction<Edge<String, Integer>>() {
public boolean filter(Edge<String, Integer> edge) {
return (edge.getValue() > playcountThreshold);
}
}).groupBy(1)
.reduceGroup(new CreateSimilarUserEdges()).distinct();

Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers,
Expand Down Expand Up @@ -241,33 +249,36 @@ public String getDescription() {

private static String topTracksOutputPath = null;

private static int playcountThreshold = 0;

private static String communitiesOutputPath = null;

private static int maxIterations = 10;

private static boolean parseParameters(String[] args) {

if(args.length > 0) {
if(args.length != 5) {
if(args.length != 6) {
System.err.println("Usage: MusicProfiles <input user song triplets path>" +
" <input song mismatches path> <output top tracks path> "
+ "<output communities path> <num iterations>");
+ "<playcount threshold> <output communities path> <num iterations>");
return false;
}

fileOutput = true;
userSongTripletsInputPath = args[0];
mismatchesInputPath = args[1];
topTracksOutputPath = args[2];
communitiesOutputPath = args[3];
maxIterations = Integer.parseInt(args[4]);
playcountThreshold = Integer.parseInt(args[3]);
communitiesOutputPath = args[4];
maxIterations = Integer.parseInt(args[5]);
} else {
System.out.println("Executing Music Profiles example with default parameters and built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" See the documentation for the correct format of input files.");
System.out.println("Usage: MusicProfiles <input user song triplets path>" +
" <input song mismatches path> <output top tracks path> "
+ "<output communities path> <num iterations>");
+ "<playcount threshold> <output communities path> <num iterations>");
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void before() throws Exception {

@Test
public void testMusicProfilesExample() throws Exception {
MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, communitiesResultPath,
MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath,
MusicProfilesData.MAX_ITERATIONS + ""});
expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
}
Expand Down