# RelaTree - Social Graph Analytics

This notebook has been created to create a social graph based on the given data about user and their groups.

In [1]:
# Import libraries 
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = (
    '--jars lib/graphframes-release-0-5-0-assembly-0.5.0-spark2.1.jar pyspark-shell')
import sys
sys.path.append('./lib')
import graphframes as GF
import operator
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, size, lit, collect_list

In [2]:
''' 
Function: Create spark context
Parameters: app_name, executor_memory, no_of_executors
Returns: Spark SQLContext
'''
def createContext(app_name="RelaTree", executor_memory="2g", no_of_executors="4"):
    sparkConf = (SparkConf().setMaster("local").setAppName(app_name).set("spark.executor.memory", executor_memory).set("spark.executor.instances", no_of_executors))
    sparkContext = SparkContext(conf=sparkConf)
    sql_context = SQLContext(sparkContext)
    return sql_context

In [3]:
'''
Function: Create Spark dataframe from csv file
Parameters: file_path,sql_context
Returns: Pyspark Dataframe
'''
def loadData(file_path,sql_context):
    df = sql_context.read.format('com.databricks.spark.csv').options(header='true').load(file_path)
    return df

In [4]:
'''
Function: Provide stats for a Pyspark Dataframe
Parameters: Pyspark Dataframe
Returns: N/A
'''
def getStats(df):
    print("\nRow count: %d\n\nColumn Count: %d\n\nColumn headers: %s\n\nSample Data:\n" %(df.count(),len(df.columns),df.columns))
    df.show(5)  

In [5]:
'''
Function: Create vertices dataframe
Parameters: Pyspark Dataframe
Returns: Pyspark dataframe depciting vertices
'''
def createUserGraphVertices(df):
    print("Creating User Vertices DataFrame..")
    # Select user_Id [vertices] column from Spark dataframe
    df_users = df.select(['user_id'])
    df_users = df_users.selectExpr("user_id as id") 
    # Remove duplicate user_id entries and create vertices dataframe
    vertices = df_users.drop_duplicates()
    print("User Vertices DataFrame creation complete.")
    return vertices

In [6]:
'''
Function: Create edges dataframe
Parameters: Pyspark Dataframe
Returns: Pyspark dataframe depciting edges
'''
def createUserGraphEdges(df):
    # Create the edges dataframe
    print("Creating User Edges DataFrame..")
    edges = df.select(col('user_id').alias('src'),col('group_id')).join(df.select(col('user_id').alias('dst'),col('group_id')), on=['group_id'], how='outer')
    # Remove duplicate entries
    edges = edges.drop_duplicates()
    print("User Edges DataFrame creation complete.")
    return edges

In [7]:
'''
Function: Create graph
Parameters: Pyspark Dataframe - vertices, edges
Returns: GraphFrame
'''
def createGraph(vertices, edges):
    print("Creating graph..")
    # Generate the graph
    graph = GF.GraphFrame(vertices, edges)
    print("Graph creation complete.")
    return graph

In [8]:
'''
Function: Save graph to file
Parameters: GraphFrame
Returns: N/A
'''
def saveGraph(graph, name):
    # Save the graph to a file
    print("Saving graph to file..")
    graph.vertices.write.parquet('store/'+name+'Vertices.parquet')
    graph.edges.write.parquet('store/'+name+'Edges.parquet')
    print("Graph has been saved successfully.")

In [9]:
'''
Function: Load graph from file
Parameters: N/A
Returns: GraphFrame
'''
def loadGraph(context, name):
    # Load the graph from file
    print("\nLoading graph data..")
    vertices = context.read.parquet('store/'+name+'Vertices.parquet')
    edges = context.read.parquet('store/'+name+'Edges.parquet')
    print("\nGenerating graph..")
    graph = GF.GraphFrame(vertices, edges)
    print("\nGraph load complete.")
    return graph

In [10]:
'''
Function: Obtain the first connects of a given vertex
Parameters: GraphFrame, vertex label
Returns: GraphFrame of connected vertices and their edges
'''
def firstConnects(graph, vertex):
    first_connect_motifs = graph.find("(v1)-[e]->(v2)").filter("v1.id == '"+vertex+"'")
    return first_connect_motifs.select("v2.id","e.group_id")

In [11]:
'''
Function: Delete dataframe to free memory
Parameters: List of DataFrames
Returns: N/A
'''
def cleanUp(df_list):
    for df in df_list:
        del df
    print("\nDataFrame clean up complete.")

In [12]:
'''
Function: Get users connected by the given group
Parameters: Graphframe, group_id
Returns: Set of users
'''
def getUsersOfAGroup(graph, group):
    print("\nGetting Users..")
    edges = graph.edges.filter("group_id = '"+group+"'").collect()
    users = set()
    for row in edges:
        users.add(row.src)
    return users

In [None]:
'''
Function: Add first connected users to the given user set
Parameters: Graphframe, users set
Returns: Set of users
'''
def addFirstConnects(graph, users):
    print("\nAdding first connects..")
    newUsers = set()
    for user in users:
        v = firstConnects(graph, user)
        v = v.collect()
        for row in v:
            newUsers.add(row.id)
    users = users.union(newUsers)
    return users

In [14]:
'''
Function: Get all the groups associated with the given users
Parameters: Graphframe, users set
Returns: Dictionary of Groups with their counts frequency
'''
def getGroupsAssociatedToUsers(graph, users):
    print("\nGetting groups..")
    groups = dict()
    for user in users:
        g = graph.edges.filter("src = '"+user+"'").collect()
        for row in g:
            key = row.group_id
            if key in groups:
                groups[key] += 1
            else:
                groups[key] = 1
    total = 0
    for key in groups.keys():
        total += groups[key]
    for key in groups.keys():
        groups[key] = groups[key] / total
    return groups

In [15]:
'''
Function: Get all the channels associated with the given groups
Parameters: Pyspark Dataframe, groups dict
Returns: Dictionary of Channels with their counts frequency weighted based on groups
'''
def getChannelsAssociatedToGroups(df, groups):
    print("\nGetting channels..")
    channels = dict()
    for group in groups.keys():
        c = df.filter("group_id = '"+group+"'").collect()
        for row in c:
            key = row.channel_id
            if key in channels:
                channels[key] += groups[group]
            else:
                channels[key] = groups[group]
    total = 0
    for key in channels.keys():
        total += channels[key]
    for key in channels.keys():
        channels[key] = channels[key] / total
    return channels

In [16]:
'''
Function: Get all the channels associated with the given group along with their respective importance factor
Parameters: Pyspark Dataframe, Graphframe, group_id
Returns: Dictionary of Channels with their counts frequency weighted based on groups
'''
def getChannelsForGroup(df, graph, group):
    users = getUsersOfAGroup(graph, group)
    users = addFirstConnects(graph, users)
    groups = getGroupsAssociatedToUsers(graph, users)
    channels = getChannelsAssociatedToGroups(df, groups)
    return channels

In [17]:
'''
Function: Create channel vertices dataframe
Parameters: Pyspark Dataframe
Returns: Pyspark dataframe depciting channel vertices
'''
def createChannelGraphVetrices(df):
    print("Creating Channel Vertices DataFrame..")
    # Select channel_id [vertices] column from Spark dataframe
    df_channels = df.select(['channel_id'])
    df_channels = df_channels.selectExpr('channel_id as id') 
    # Remove duplicate user_id entries and create vertices dataframe
    vertices = df_channels.drop_duplicates()
    print("Channel Vertices DataFrame creation complete.")
    return vertices

In [18]:
'''
Function: Create channel edges dataframe
Parameters: Pyspark Dataframe
Returns: Pyspark dataframe depciting channel edges
'''
def createChannelGraphEdges(df):
    # Create the edges dataframe
    print("Creating Channel Edges DataFrame..")
    channel_group = df.select(col('channel_id').alias('src'),col('group_id')).join(df.select(col('channel_id').alias('dst'),col('group_id')), on=['group_id'], how='outer')
    channel_group_filtered = channel_group.filter(channel_group.src != channel_group.dst)
    channel_group_count = channel_group_filtered.groupby(['src','dst']).count()
    channel_edges = channel_vertices.select(col('id').alias('src')).crossJoin(channel_vertices.select(col('id').alias('dst')))
    channel_edges = channel_edges.filter(channel_edges.src != channel_edges.dst)
    channel_edges_weighted = channel_edges.withColumn("init_weight", lit(1))
    channel_edge_weighted_joined = channel_edges_weighted.join(channel_group_count, on=['src','dst'], how='left_outer')
    channel_edge_weighted_joined = channel_edge_weighted_joined.na.fill(0)
    channel_edge_weighted_final = channel_edge_weighted_joined.withColumn('weight', channel_edge_weighted_joined['init_weight']+channel_edge_weighted_joined['count'])
    channel_edge_weighted_final = channel_edge_weighted_final.select('src','dst','weight')
    print("Channel Edges DataFrame creation complete.")
    return (channel_edge_weighted_final)

In [28]:
'''
Function: Get recommendations for the given group
Parameters: User Graphframe, Channel Graphframe, Pyspark Dataframe, group_id, k (number of recommendations)
Returns: List of tuples with channel_id and recommendation score
'''
def graph_recommender(user_graph, channel_graph, df, group, k):
    print("\nFinding Recommendations..")
    channels = getChannelsForGroup(df, user_graph, group)
    recommendations = {}
    for channel in channels.keys():
        channel_motif = channel_graph.find("(v1)-[e]->(v2)").filter("v1.id == '"+channel+"'")
        direct_neighbors = channel_motif.select("v2.id","e.weight").collect()
        for row in direct_neighbors:
            if row.id not in channels:
                recommendations[row.id]=recommendations.get(row.id,0)+(row.weight*channels[channel])
    print("\nRecommendations found.")
    return (sorted(recommendations.items(), key=operator.itemgetter(1),reverse=True))[:k]

In [20]:
if __name__ == "__main__":
    # Create Spark context
    sql_context = createContext(app_name="RelaTree", executor_memory="12g", no_of_executors="8")
    
    # Load group to member data into a Pyspark Dataframe
#     df_group_members = loadData('data/group_members.csv',sql_context)
    
    # Load group to channel data into a Pyspark Dataframe
    df_group_channels = loadData('data/group_channel.csv',sql_context)
    
    # Get stats on group members and group channels dataframes
#     getStats(df_group_members)
#     getStats(df_group_channels)
    
    # Create vertices dataframe
#     user_vertices = createUserGraphVertices(df_group_members)
#     channel_vertices = createChannelGraphVetrices(df_group_channels)

    # Create edges dataframe
#     user_edges = createEdges(df_group_members)
#     channel_edges = createChannelGraphEdges(df_group_channels)
    
    # Get stats on vertices and edges dataframes
#     getStats(user_vertices)
#     getStats(user_edges)
#     getStats(channel_vertices)
#     getStats(channel_edges)
    
    # Create Graph
#     duta_user_graph = createGraph(vertices,edges)
#     duta_channel_graph = createGraph(channel_vertices,channel_edges)
    
    # Clean up memory
#     cleanUp([user_vertices, user_edges, channel_vertices, channel_edges, df_group_members])
    
    # Save Graph to file 
#     saveGraph(duta_user_graph, 'userGraph)
#     saveGraph(duta_channel_graph, 'channelGraph')    
    
    # Load graph from file
    duta_user_graph = loadGraph(sql_context, 'userGraph')
    duta_channel_graph = loadGraph(sql_context, 'channelGraph')
    


Loading graph data..

Generating graph..

Graph load complete.

Loading graph data..

Generating graph..

Graph load complete.


In [None]:
result = graph_recommender(duta_user_graph, duta_channel_graph, df_group_channels,'0000105d4a97a48cbd1a0968695c8f97f54ffe23' , 11)


Finding Recommendations..

Getting Users..

Adding first connects..

Getting groups..


In [None]:
result

In [26]:
channels = getChannelsForGroup(df_group_channels, duta_user_graph, 'c141ef4d309b251fff0d46dc9e70a8f37f8d83dd')


Getting Users..

Adding first connects..

Getting groups..

Getting channels..


In [27]:
channels

{'promo': 0.5, 'cricket': 0.5}