Following code loads the raw GDELT - GKG files available at "http://data.gdeltproject.org/gkg/index.html" and contructs a knowledge graph. Further, it performs link prediction to infer missing/potential links in the knowledge graph based on the CNGF link prediction algorithm described in this paper "The Algorithm of Link Prediction on Social Network" available at "https://www.hindawi.com/journals/mpe/2013/125123/"

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("EMT678-LinkPrediction").getOrCreate()

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql import Row
from graphframes import *
import re
from math import log
import operator

In [0]:
gkg_1207 = spark.read.csv("/Volumes/temp_gkg_yolo/default/gkg_files/20231110.gkg.csv",sep="\t",header=True)

In [0]:
gkg_1207.show(5)

+--------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    DATE|NUMARTS|COUNTS|              THEMES|           LOCATIONS|             PERSONS|       ORGANIZATIONS|                TONE|       CAMEOEVENTIDS|             SOURCES|          SOURCEURLS|
+--------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|20231110|      1|  NULL|TOURISM;WB_825_TO...|1#South Korea#KS#...|      aurojyoti bose|                NULL|-3.79746835443038...|1139481783,113948...|hotelnewsresource...|https://www.hotel...|
|20231110|      2|  NULL|TAX_FNCACT;TAX_FN...|5#Suffolk, Suffol...|                NULL| stores in lowestoft|-2.83018867924528...|1139282290,113928...|edp24.co.uk;eadt....|https://www.edp24...|
|20231110|      1|  NULL|TAX_F

Select the PERSONS, ORGANIZATIONS and THEMES columns from the master dataframe

In [0]:
gkg_reduced = gkg_1207.select("THEMES","PERSONS","ORGANIZATIONS")

Filtering the events to be based in India and contain ARMEDCONFLICT in the list of themes assigned to an event, an event can contain more than 1 theme

In [0]:
df_armed_conflict = gkg_reduced.filter(F.col("THEMES").like("%ARMEDCONFLICT%") & F.col("LOCATIONS").like("%India%"))

The get_top_results function defined below returns a list of the entities in a column with their corresponding count of occurences sorted in descending order of the count.

In [0]:
import pyspark.sql.functions as F
def get_top_results(df,column_name):
    split_df = df.select(F.split(F.col(column_name),";").alias("split_list")).select(F.explode(F.col("split_list")).alias("final_list"))
    top_list = split_df.groupBy("final_list").count().sort(F.desc("COUNT"))
    return top_list.select("final_list")

The filter_dataframe function defined below filters the dataframe to return only rows containing the entities passed through the 'filter_list' argument. 

In [0]:
from functools import reduce
from operator import or_
def filter_dataframe(df,filter_list):
    df = df.filter(reduce(or_, [df.PERSONS.rlike(s) for s in filter_list]))
    return df

The above mentioned functions are used to only get rows containing the top n "PERSONS". It can be noted here that this does not mean our graph only contains these n vertices, rather it contains all the vertices that these top n persons are connected to as well.

In [0]:
top_persons = get_top_results(df_armed_conflict,"PERSONS")

In [0]:
list = top_persons.select('final_list').rdd.flatMap(lambda x: x).collect()

In [0]:
top_20_persons_df = filter_dataframe(df_armed_conflict, list[:20])

The get_vertices_edges() function, returns the list of vertices and edges that we will use to construct our knowledge graph.
Steps:
1. Merge PERSONS AND ORGANIZATIONS columns to form a string of all vertices to be extracted from a given row(event).
2. Split the above formed string to get a list of all the vertices. This dataframe now contains a list of vertices and a list of THEMES that will connect each of these vertices.
3. Form a df_vertices dataframe by exploding the list of vertices and assigning an id to each row(now containing 1 vertex per row).
4. To generate each of our edges, I form a list of vertex-pairs from a given list of vertices by using a Python UDF
5. I then explode this list of vertex-pairs and the list of THEMES for each vertex-pair, to get the final df_edges dataframe that contains (vertex1,vertex2,theme) columns as the (source_node,destination_node,relationship) that forms an edge in the graph

In [0]:
def get_vertices_edges(df):
    df = df.withColumn("merged", F.concat_ws(";", df["ORGANIZATIONS"], df["PERSONS"]))
    df_merged = df.select("merged","THEMES")
    df_merged = df_merged.select(F.split(F.col("merged"),";").alias("merged"),"THEMES")
    df_vertices = df_merged.select(F.explode(F.col("merged"))).distinct()
    df_vertices = df_vertices.withColumn("id",F.monotonically_increasing_id())
    df_combinations = df_merged.withColumn('merged',generate_pairs(df_merged['merged']))
    df_combinations = df_combinations.select(F.explode(F.col("merged")).alias("edges"),"THEMES")
    df_edges = df_combinations.select("edges",F.explode(F.split(F.col("THEMES"),";")))
    return df_vertices,df_edges

I used the UDF generate_pairs defined below to generate a list of pairs given a list of strings. 
(eg) - ['Roger Federer','Lebron James','Michael Schumacher','Niki Lauda'] --> [['Roger Federer','Lebron James'],['Roger Federer','Lebron James'],['Roger Federer','Michael Schumacher'],['Roger Federer','Niki Lauda'],['Lebron James','Michael Schumacher'],['Lebron James','Niki Lauda'],['Michael Schumacher','Niki Lauda']]

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import itertools

@udf(returnType=ArrayType(ArrayType(StringType())))
def generate_pairs(x):
    return [list(pair) for pair in itertools.combinations(x, 2)]

In [0]:
df_vertices,df_edges = get_vertices_edges(top_20_persons_df)

Renaming the columns in df_edges to be compatible with the graphframes api

In [0]:
df_edges = df_edges.withColumn("src",F.col("edges")[0]).withColumn("dst",F.col("edges")[1]).withColumnRenamed("col","relationship")

In [0]:
df_vertices.show(10)

+--------------------+---+
|                 col| id|
+--------------------+---+
|association of so...|  0|
|     ayman al-safadi|  1|
|       amrit vatikas|  2|
|bharatiya janata ...|  3|
|    syed zafar islam|  4|
|      krishna madiga|  5|
|         white house|  6|
|     charles q brown|  7|
|azadi ka amrit ma...|  8|
|indian defense mi...|  9|
+--------------------+---+
only showing top 10 rows



In [0]:
df_edges = df_edges.drop("edges")

In [0]:
df_edges.show(10,truncate=False)

+-----------------------------+-------------------------+-----------+
|relationship                 |src                      |dst        |
+-----------------------------+-------------------------+-----------+
|TAX_FNCACT                   |palestinian prisoner club|arab league|
|TAX_FNCACT_SECRETARY         |palestinian prisoner club|arab league|
|TAX_FNCACT_SECRETARY_OF_STATE|palestinian prisoner club|arab league|
|TAX_ETHNICITY                |palestinian prisoner club|arab league|
|TAX_ETHNICITY_PALESTINIANS   |palestinian prisoner club|arab league|
|KILL                         |palestinian prisoner club|arab league|
|CRISISLEX_T03_DEAD           |palestinian prisoner club|arab league|
|CRISISLEX_CRISISLEXREC       |palestinian prisoner club|arab league|
|CRISISLEX_T02_INJURED        |palestinian prisoner club|arab league|
|TAX_WORLDMAMMALS             |palestinian prisoner club|arab league|
+-----------------------------+-------------------------+-----------+
only showing top 10 

In [0]:
df_vertices.count()

1311

In [0]:
print(df_edges.count())

13274146


Importing the graphframes package and creating the graph

In [0]:
from graphframes import *

In [0]:
graph = GraphFrame(df_vertices, df_edges)

The following is the implementation of the CNGF algorithm. It computes a similarity score between each vertex in the graph. These similarity scores can later be used to predict links between vertices based on a given threshold of similarty

In [0]:
def get_neighbours(vertex):
    """
    Given a node of the graph, this function finds all it's neighbours. Since
    it is designed for undirected graphs, the neighbours of the node are found
    in both directions.

    :param vertex: Any node of the global scope graph.
    :return: The list of neighbours of the graph.
    """
    neighbours1 = graph.edges.filter("src = '{}'".format(vertex)).select(
        "dst").distinct()
    neighbours2 = graph.edges.filter("dst = '{}'".format(vertex)).select(
        "src").distinct()
    neighbours = neighbours1.union(neighbours2)
    return neighbours.rdd.map(lambda row: row.dst).collect()


In [0]:
def get_subgraph(list_vertices):
    """
    Given a set of vertices, this function calculates a subgraph from the
    original graph.

    :param list_vertices: A set of vertices for which subgraph needs to be
    created.

    :return: The subgraph
    """

    # Find all the edges between all the nodes given in the list
    edge_motif = graph.find("(a)-[e]->(b)").filter(col("a.id").isin(
        list_vertices)).filter(col("b.id").isin(list_vertices))
    edge_select = edge_motif.select("e.src", "e.dst")

    # Create the subgraph from the new edges and return it
    return GraphFrame(graph.vertices, edge_select)

In [0]:
def get_guidance(subgraph_degree, original_degree):
    """
    Calculates the guidance of a node given it's original degree and subgraph
    degree.

    :param subgraph_degree: The degree of the node in the subgraph.
    :param original_degree: The degree of the node in the original graph.

    :return: The guidance of the node.
    """
    log_original_degree = log(original_degree)
    return subgraph_degree/log_original_degree

In [0]:
def calculate_similarity(subgraph_degrees):
    """
    Given a list of subgraph degrees, this function calls the guidance
    function and calculates the similarity of a particular node with all it's
    non-connected nodes.

    :param subgraph_degrees: A list of lists containing the non connected node
    and degrees of common neighbours from the subgraph.

    :return: A dictionary of similarity of each non-connected node
    """
    similarity_dict = []
    for nc_node in subgraph_degrees:
        similarity = 0
        for common_node in nc_node[1]:
            # Getting the degree of the common neighbour node from the original
            # graph
            original_degree = graph.degrees.filter("id = '{}'".format(
                common_node.id)).select("degree").collect()

            # Getting the degree of the common neighbour node from the subgraph
            sub_degree = common_node.degree

            # Calling the function to calculate guidance for the common
            # neighbour node
            guidance = get_guidance(sub_degree, original_degree[0].degree)

            # Adding the guidance to the similarity of the non-connected node
            similarity += guidance

        similarity_dict.append((nc_node[0], similarity))
    return similarity_dict

In [0]:
def node_processing():
    """
    Takes the graph object from global scope and processes each node to find
    all non-connected nodes and then find the similarity using the cngf
    algorithm.

    :return: The similarity of each node with all it's non connected nodes.
    """

    # Get the list of all nodes of the graph
    graph_similarity = []
    vertices_list = [i.id for i in graph.vertices.collect()]

    for root_node in vertices_list:
        print ("Vertex " + str(root_node))

        # Get the neighbours of the node
        root_neighbours = set(get_neighbours(root_node))

        # Get the set of non-connected nodes by removing the node and the
        # neighbours of the node from the list of nodes.
        not_connected_nodes = set(vertices_list).difference(
            set(root_neighbours)).difference({root_node})

        subgraph_degrees = []

        for nc_node in not_connected_nodes:

            # Get neighbour of the non-connected node
            node_neighbours = set(get_neighbours(nc_node))

            # Get the common neighbours by taking the intersion of neighbours
            # of both the nodes.
            common_neighbours = root_neighbours.intersection(node_neighbours)

            if common_neighbours:
                # Create a set of all the vertices for which the subgraph needs
                # to be created, i.e., the common neighbours, the root node and
                # the non-connected node.
                subgraph_vertices = common_neighbours.union({nc_node},
                                                            {root_node})

                # Call the function to create the subgraph
                subgraph = get_subgraph(subgraph_vertices)

                # Find the degrees of the common neighbours from the subgraph
                common_neighbours_degrees = subgraph.degrees.filter(
                    col("id").isin(common_neighbours)).collect()
                subgraph_degrees.append((nc_node, common_neighbours_degrees))

        # Call the function to calculate the similarity of each non-connected
        # node with the current node.
        similarity = sorted(calculate_similarity(subgraph_degrees),
                            key=operator.itemgetter(1), reverse=True)
        graph_similarity.append((root_node, similarity))
        print(similarity)
    return graph_similarity

The following code runs the CNGF algorithm and returns the list of similarity scores for each graph pair. The DBU's used after running the algorithm for vertex 0 was 11 and I was incurring charges on AWS to tune of $6.5  . At this point I interuppted the notebook to avoid being charged more than I could afford. Thus, my solution is still incomplete. 

The (interuppted) function spawned more than a 1500 jobs, each with only 1 stage per job(for 1 vertex). I think it can thus be inferred that my implemented solution is not parallelizable and does not effectively utilize sparks capabilites. Further analysis is needed to more effectively describe the problem. 

In [0]:
graph_similarity = node_processing()
print(graph_similarity)

Vertex 0
