### Imports and Dgraph Client Initializations

In [None]:
import os
import json
import pandas as pd
import multiprocessing

import pydgraph
from python_graphql_client import GraphqlClient

from dotenv import load_dotenv

In [None]:
# the host or IP addr where your Dgraph alpha service is running
dgraph_addr = "localhost"
# load API keys, etc from .env file
load_dotenv()

In [None]:
# Dgraph DQL Client
client_stub = pydgraph.DgraphClientStub(addr='{}:9080'.format(dgraph_addr), options=[('grpc.max_receive_message_length', 1024*1024*1024)])
client = pydgraph.DgraphClient(client_stub)
print("pydgraph client, check version:", client.check_version())

# GraphQL client and admin client
gql_client = GraphqlClient(endpoint="http://{}:8080/graphql".format(dgraph_addr))
gql_admin_client = GraphqlClient(endpoint="http://{}:8080/admin".format(dgraph_addr))
data = gql_admin_client.execute(query="{health {status}}")
print("generic graphql client, check cluster health:", data['data']['health'][0])

In [None]:
%%time

# Get Record Counts
query = """
query {
    paradisePapers: aggregateRecord(filter: { sourceID: { eq: ParadisePapers } }) {count}
    panamaPapers: aggregateRecord(filter: { sourceID: { eq: PanamaPapers } }) {count}
    bahamasLeaks: aggregateRecord(filter: { sourceID: { eq: BahamasLeaks } }) {count}
    offshoreLeaks: aggregateRecord(filter: { sourceID: { eq: OffshoreLeaks} }) {count}
    pandoraPapers: aggregateRecord(filter: { sourceID: { eq: PandoraPapers} }) {count}
    total: aggregateRecord() {count}
}
"""
data = gql_client.execute(query=query)
print(json.dumps(data, indent=2))

### Dgraph Query Defs

In [None]:
def update_node(nodes: dict, key: str, value: dict):
    if not key in nodes:
        nodes[key] = {}
    for k, v in value.items():
        if not isinstance(v, list):
            nodes[key][k] = v

def extract_dict(nodes: dict, edges: list, data: dict, parent: dict = None, name: str = None):
    """Recursively extract nodes and edges from a dict created from the result of a Dgraph query.

    Nodes (vertices) from the query must have an ``id`` field in order to be recognized
    as a node. Optionally, if a ``type`` field is present (either as a list or a string),
    the type will be applied to the node. Nodes encountered in more than one place in the
    query result will be merged.

    Edges are automatically extracted from the query result. If a node has an an id and a parent,
    a relationship is made. The relationship predicate name is assigned as the edge type.
    """
    if isinstance(data, dict):
        # ignore the Dgraph 'extensions' field
        if name == "extensions":
            return
        # id is a special field, we use it to identify nodes
        if "id" in data:
            update_node(nodes, data['id'], data)
            # if we have a parent, add an edge
            if parent and "id" in parent:
                edges.append(
                    {"src": parent["id"], "dst": data["id"], "type": name})
        # recurse into the dict
        for key, value in data.items():
            if isinstance(value, dict):
                extract_dict(nodes, edges, value, data, key)
            elif isinstance(value, list) and len(value) > 0:
                # if the list is named 'type', assign it to the node
                if key == "type":
                    update_node(nodes, data["id"], {"type": value[0]})
                    continue
                # else, recurse into the list if it contains dicts
                if isinstance(value[0], dict):
                    for v in value:
                        extract_dict(nodes, edges, v, data, key)
                # if the list is of scalars, assign it to the node
                else:
                    nodes[data['id']][key] = value



In [None]:
recordQuery = """
query ($queryRecordOffset: Int, $queryRecordFirst: Int) {
  queryRecord(offset: $queryRecordOffset, first: $queryRecordFirst) {
    id: nodeID
    type: __typename
    name
    sourceID
    hasAddress {
      id: nodeID
    }
    hasOfficer {
      id: nodeID
    }
    hasIntermediary {
      id: nodeID
    }
    connectedTo {
      id: nodeID
    }
  }
}
"""

#removed edges from query:
#    sameAs {
#      id: nodeID
#    }
#    sameNameAs {
#      id: nodeID
#    }
#    similarTo {
#      id: nodeID
#    }


In [None]:
import threading
import concurrent.futures
import time

def query(offset, first):
    variables = {
        "queryRecordOffset": offset,
        "queryRecordFirst": first
    }
    data = gql_client.execute(query=recordQuery, variables=variables)
    return data, offset

def load_all_nodes_and_edges(nodes: dict, edges: list):
    count_query = """
    query {
      total: aggregateRecord {
        count
      }
    }
    """
    data = gql_client.execute(query=count_query)
    totalRecords = data['data']['total']['count']
    start = time.time()

    print("Loading nodes and edges using", int(multiprocessing.cpu_count()/2), "cores...")
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=int(multiprocessing.cpu_count()/2))
    f = []
    step = 25000
    lock = threading.Lock()
    for i in range(0, totalRecords-1, step):
        f.append(executor.submit(query, i, step))
    for r in concurrent.futures.as_completed(f):
        data = r.result()
        with lock:
            print("retrieved", data[1]+1, "thru", data[1]+step, "record count:", len((data[0]['data']['queryRecord'])))
            extract_dict(nodes, edges, data[0])
        
    end = time.time()
    print('frames and edges loaded in', end - start, 'seconds')
    print('node count', len(nodes))
    print('edges count', len(edges))

### Perform Graph Analysis

In [None]:
%%time

nodes = {}
edges = []
load_all_nodes_and_edges(nodes, edges)

In [None]:
import networkx as nx

edges_df = pd.DataFrame(edges)
print(edges_df.sample(3))
G = nx.from_pandas_edgelist(
    edges_df,
    source="src",
    target="dst",
    edge_key="type",
    create_using=nx.DiGraph()
)
print(G)
print("Network density:", "%.8f" % nx.density(G))
try:
    print("Diameter:", nx.diameter(G))
except nx.NetworkXError as e:
    print("Error gettting diameter", e)

In [None]:
nodes_df = pd.DataFrame.from_dict(nodes, orient = 'index')
nodes_df.sample(3)

In [None]:
#find top 10 nodes by degree
sorted_deg = sorted(G.degree, key=lambda x: x[1], reverse=True)
for n in range(10):
    nodeID = sorted_deg[n][0]
    print(n+1, nodeID, nodes[nodeID]['name'], ', type:', nodes[nodeID]['type'], ", degrees:", sorted_deg[n][1])

In [None]:
#find top 10 nodes by centrality
central = nx.degree_centrality(G)
sorted_central = sorted(central.items(), key=lambda x: x[1], reverse=True)
for n in range(10):
    nodeID = sorted_central[n][0]
    print(n+1, nodeID, nodes[nodeID]['name'], ', type:', nodes[nodeID]['type'], ", centrality:", '{:.8f}'.format(central[nodeID]))

In [None]:
%%time

# find top 10 entities by pagerank
pageranks = nx.pagerank(G)
sorted_pr = sorted(pageranks.items(), key=lambda x: x[1], reverse=True)
for n in range(10):
    nodeID = sorted_pr[n][0]
    print(n+1, nodeID, nodes[nodeID]['name'], ', type:', nodes[nodeID]['type'], ", pagerank:", '{:.8f}'.format(pageranks[nodeID]))

In [None]:
# find all paths in graph G that have four or more edges
lp_list = []
sp = dict(nx.all_pairs_shortest_path(G))
for key, value in sp.items():
    for v in value.items():
        if len((v[1])) >= 5:
               lp_list.append(v[1])

lp_list = sorted(lp_list)
lp_list

### Graph Visualization

In [None]:
import graphistry

PASSWORD = os.getenv("GRAPHISTRY_PASSWORD")
USERNAME = os.getenv("GRAPHISTRY_USERNAME")

graphistry.register(api=3, username=USERNAME, password=PASSWORD)

g = graphistry.nodes(nodes_df, 'id').edges(edges_df, 'src', 'dst').bind(point_title='name')
g2 = g.encode_point_color('type', categorical_mapping={'Entity': '#DB3B3B', 'Intermediary': '#E99233', 'Officer': '#6DB364', 'Address': '#F7D82F'}, default_mapping='gray')
g2.plot()

### Closest Path Calculation

In [None]:
closest_dql_query = """
query closest($from: string, $to: string) {
  FROM as var(func: eq(Record.nodeID, $from))
  TO as var(func: eq(Record.nodeID, $to))
    
  P as shortest(from: uid(FROM), to: uid(TO)) {
    Record.hasAddress
    Record.addressFor
    Record.hasIntermediary
    Record.intermediaryFor
    Record.hasOfficer
    Record.officerFor
    Record.sameAs
    Record.sameNameAs
    Record.similarTo
    Record.connectedTo
  }
    
  path(func: uid(P)) {
   uid
   Record.nodeID
   Record.name
   <dgraph.type>
  }
}
"""

from_node = '82000955'
to_node = '81027090'

#variables = {'$from': sorted_pr[0][0], '$to': sorted_pr[2][0]}
variables = {'$from': from_node, '$to': to_node}
txn = client.txn(read_only=True)
try:
    res = txn.query(query=closest_dql_query, variables=variables)
    paths = json.loads(res.json)
    print(json.dumps(paths, indent=2))
finally:
    txn.discard()
    
for path in paths['path']:
    print(path)


In [None]:
import ipycytoscape

graph_data = {"nodes": [], "edges": []}
# find the nodes
for idx, path in enumerate(paths['path']):
    entity_type = path['dgraph.type'][0]
    #graph_data['nodes'].append({"data": {"id": path['Record.nodeID'], "label": path['Record.name'], "tooltip": "<div style='background-color:white'>foo</div>"}, "classes": entity_type})
    graph_data['nodes'].append({"data": {"id": path['Record.nodeID'], "label": path['Record.name'], "type": entity_type}, "classes": entity_type})
    if idx < len(paths['path'])-1:
        graph_data['edges'].append({"data": {"uid": path['uid'], "source": path['Record.nodeID'], "target": paths['path'][idx+1]['Record.nodeID']}})

def find_edge_type(d: dict):
    uid = d['uid']
    for key, entry in d.items():
        if isinstance(entry, dict):
            for edge in graph_data['edges']:
                if edge['data']['uid'] == uid:
                    edge['data']['label'] = key[7:]
            find_edge_type(entry)
 
        
# recursively find the edge types
find_edge_type(paths['_path_'][0])
                           
print(graph_data)

In [None]:
from pprint import pformat

cyto_styles = [
    {'selector': 'node[type = "Address"]', 'style': {
        'font-family': 'helvetica',
        'font-size': '10px',
        'label': 'data(label)',
        'background-color': 'blue'}},
    {'selector': 'node[type = "Entity"]', 'style': {
        'font-family': 'helvetica',
        'font-size': '10px',
        'label': 'data(label)',
        'background-color': 'green'}},
    {'selector': 'node[type = "Intermediary"]', 'style': {
        'font-family': 'helvetica',
        'font-size': '10px',
        'label': 'data(label)',
        'background-color': 'yellow'}},
    {'selector': 'node[type = "Officer"]', 'style': {
        'font-family': 'helvetica',
        'font-size': '10px',
        'label': 'data(label)',
        'background-color': 'purple'}},
    {'selector': 'node[type = "Other"]', 'style': {
        'font-family': 'helvetica',
        'font-size': '10px',
        'label': 'data(label)',
        'background-color': '#999999'}},
    {'selector': 'node.flagged','style': {
        'border-color': 'red',
        'border-width': '4px'}},    
    {'selector': 'node:parent',
        'css': {
            'background-opacity': 0.333
        }
    },
    {'selector': 'edge', 'style': {
        'width': 3,
        'font-size': '9px',
        'line-color': '#9dbaea',
        'target-arrow-shape': 'triangle',
        'target-arrow-color': '#9dbaea',
        'curve-style': 'bezier',
        'label': 'data(label)'
    }
}]

cytoscapeobj = ipycytoscape.CytoscapeWidget()
cytoscapeobj.graph.add_graph_from_json(graph_data)
cytoscapeobj.set_layout(name='cola', nodeSpacing=10, edgeLengthVal=10)
cytoscapeobj.set_style(cyto_styles)
#display
cytoscapeobj

### Full text query

In [None]:
%%time

ft_query = """
query ($filter: EntityFilter) {
  queryEntity(filter: $filter) {
    id: nodeID
    type: __typename
    name
  }
}
"""
variables = {
    "filter": {
        "name": {
            "anyoftext": "live"
        }
    }
}
data = gql_client.execute(query=ft_query, variables=variables)
for res in data['data']['queryEntity']:
    print(res['name'])
        

### Search via Geo-coordinates

In [None]:
geo_query = """
query ($filter: AddressFilter) {
  queryAddress(filter: $filter) {
    nodeID
    name
    location {
      latitude
      longitude
    }
    addressFor {
      nodeID
      __typename
      name
    }
  }
}"""
variables = {
  "filter": {
    "has": "location"
  }
}

data = gql_client.execute(query=geo_query, variables=variables)

addresses_df = pd.json_normalize(data['data']['queryAddress'])

def extract_names(l):
    name = ''
    for entry in l:
        name += entry['__typename'] + ": " + entry['name'] + ", "
    return name[0:len(name)-2]

addresses_df = addresses_df.rename(columns={"location.latitude": "lat", "location.longitude": "lon"})
addresses_df['addressFor'] = addresses_df['addressFor'].apply(lambda val: extract_names(val))
addresses_df.sample(5)


In [None]:
import bokeh.io
from bokeh.plotting import gmap
from bokeh.models import ColumnDataSource, GMapOptions
from bokeh.io import output_file, show
from bokeh.models import HoverTool
from bokeh.resources import INLINE
bokeh.io.output_notebook(INLINE)

lat = 39.116386
lng = -99.299591
google_map_options = GMapOptions(lat = lat, lng = lng, map_type = "hybrid", zoom = 4)

hover = HoverTool(
        tooltips = [
            ('address', '@name'),
            ('addressFor', '@addressFor'), 
        ]
    )

google_maps_key = os.getenv("GOOGLE_MAPS_KEY")
google_map = gmap(google_maps_key, google_map_options, title="US Addresses", 
                  tools=[hover, 'reset', 'wheel_zoom', 'pan'], width=1200, height=640)
source = ColumnDataSource(addresses_df)
google_map.square(x="lon", y="lat", size=8, fill_color="red", fill_alpha=0.7, source=source)
show(google_map)


In [None]:
%%time

# Query for addresses near point

# syracuse ny
lat = 43.088947
lng = -76.154480
# los angeles
lat = 34.098907
lng = -118.327759


miles = 50
meters = miles * 1609
variables = {
  "filter": {
    "location": {
      "near": {
        "coordinate": {
          "latitude": lat,
          "longitude": lng
        },
        "distance": meters
      }
    }
  }
}

data = gql_client.execute(query=geo_query, variables=variables)

addresses_df = pd.json_normalize(data['data']['queryAddress'])

addresses_df = addresses_df.rename(columns={"location.latitude": "lat", "location.longitude": "lon"})
addresses_df['addressFor'] = addresses_df['addressFor'].apply(lambda val: extract_names(val))

google_map_options = GMapOptions(lat = lat, lng = lng, map_type = "hybrid", zoom = 10)

hover = HoverTool(
        tooltips = [
            ('address', '@name'),
            ('addressFor', '@addressFor'), 
        ]
    )

google_map = gmap(google_maps_key, google_map_options, title="Addresses near Syracuse NY", 
                  tools=[hover, 'reset', 'wheel_zoom', 'pan'], width=1200, height=640)
source = ColumnDataSource(addresses_df)
google_map.square(x="lon", y="lat", size=12, fill_color="red", fill_alpha=0.7, source=source)
show(google_map)

### Mutating Dgraph

In [None]:
def is_flagged(node):
    return 'flagged' in node and len(node['flagged']) > 0
                                 
def convert_to_cyto_objs(nodes, edges):
    graph_data = {"nodes": [], "edges": []}
    # find the nodes
    for node in nodes.items():
        node = node[1]
        entity_type = node['type']
        classes = ''
        if is_flagged(node):
            classes = 'flagged'
        graph_data['nodes'].append({"data": {"id": node['id'], "label": node['name'], "type": entity_type, "flagged": is_flagged(node)}, "classes": classes})
    for edge in edges:
        graph_data['edges'].append({"data": {"source": edge['src'], "target": edge['dst'], "label": edge['type']}})
    return graph_data


In [None]:
%%time

recurse_query = """
{
	q(func: eq(Record.nodeID, "236724")) @recurse(depth: 5) {
        # predicates to return for each recurse
        id: Record.nodeID
        name: Record.name
        type: <dgraph.type>
        
        # predicates to loop through
        addressFor: Record.addressFor(first: 30)
        hasOfficer: Record.hasOfficer
        hasIntermediary: Record.hasIntermediary
        connectedTo: RecordRecord.connectedTo  
    }
}
"""

nodes = {}
edges = []

txn = client.txn(read_only=True)
try:
    res = txn.query(query=recurse_query)
    results = json.loads(res.json)
    extract_dict(nodes, edges, results)
finally:
    txn.discard()

recurse_viz = ipycytoscape.CytoscapeWidget()
recurse_viz.set_layout(name='cola', nodeSpacing=20, edgeLengthVal=10)
recurse_viz.set_style(cyto_styles)
cyto_obj = convert_to_cyto_objs(nodes, edges)
recurse_viz.graph.add_graph_from_json(cyto_obj)
#display
recurse_viz

In [None]:
# Update the GraphQL schema with 'flagged' predicate
!curl --data-binary '@./schema-flagged.graphql' http://localhost:8080/admin/schema

In [None]:
email = "matthew.mcneely@gmail.com"

flagged_mutation = """
mutation ($input: UpdateRecordInput!) {
  updateRecord(input: $input) {
    numUids
    record {
      flagged
    }
  }
}
"""

def record_click(node):
    nodeID = node['data']['id']
    variables = {
      "input": {
        "filter": {
          "nodeID": {
            "eq": nodeID
          }
        },
        "set": {
          "flagged": [email]
        }
      }
    }
    data = gql_client.execute(query=flagged_mutation, variables=variables)
    print(data)

recurse_viz.on('node', 'click', record_click)


In [None]:
recurse_query = """
{
	q(func: eq(Record.nodeID, "236724")) @recurse(depth: 5) {
        # predicates to return for each recurse
        id: Record.nodeID
        name: Record.name
        type: <dgraph.type>
        flagged: Record.flagged
        
        # predicates to loop through
        addressFor: Record.addressFor(first: 30)
        hasOfficer: Record.hasOfficer
        hasIntermediary: Record.hasIntermediary
        connectedTo: RecordRecord.connectedTo  
    }
}
"""

nodes = {}
edges = []

txn = client.txn(read_only=True)
try:
    res = txn.query(query=recurse_query)
    results = json.loads(res.json)
    extract_dict(nodes, edges, results)
finally:
    txn.discard()

recurse_viz = ipycytoscape.CytoscapeWidget()
recurse_viz.set_layout(name='cola', nodeSpacing=20, edgeLengthVal=10)
recurse_viz.set_style(cyto_styles)
cyto_obj = convert_to_cyto_objs(nodes, edges)
recurse_viz.graph.add_graph_from_json(cyto_obj)
#display
recurse_viz