In [1]:
from functools import reduce
from pyspark.sql.functions import col, lit, when
from graphframes import *
import datetime

In [2]:
url = "jdbc:postgresql://cpdb-databricks.cgod7egsd6vr.us-east-2.rds.amazonaws.com/cpdb"
user = "data_sci"
pwd = "dataSci4lyf"
driver = "org.postgresql.Driver"

reader = spark.read.format("jdbc")\
  .option("driver", driver)\
  .option("url", url)\
  .option("user", user)\
  .option("password", pwd)\

def cpdp_read(query):
  return reader.option("dbtable", query).load()

# Graph Construction

## Query for Officer Subset

In [5]:
officer_subset_query = """
(SELECT o.id, o.first_name, o.last_name, o.birth_year, o.appointed_date,
        date_part('year', '2018-01-01'::DATE) - o.birth_year as estimated_age,
        ('2018-01-01'::DATE - o.appointed_date) / 365 as years_on_force,
        COUNT(a.id) as allegation_count,
         (SELECT da.incident_date
             FROM data_officerallegation oa
             JOIN data_allegation da on oa.allegation_id = da.id
              JOIN data_allegationcategory category on oa.allegation_category_id = category.id
             WHERE o.id = oa.officer_id
              AND category.category NOT IN ('Operation/Personnel Violations',
                     'Lockup Procedures',
                     'Traffic',
                     'Supervisory Responsibilities',
                     'Unknown',
                     'Medical')
             LIMIT 1
             OFFSET 10
          ) as repeater_start_date
  FROM data_officer o
  LEFT JOIN data_officerallegation a on o.id = a.officer_id
  WHERE active = 'Yes'
      AND appointed_date BETWEEN '2000-01-01' AND '2007-12-31'
  GROUP BY o.id
  ORDER BY years_on_force DESC) officer_subset
"""
officer_subset_df = cpdp_read(officer_subset_query)
officer_subset_df.createOrReplaceTempView("officer_subset")

## Query for repeaters
- not in the officer subset

In [7]:
repeaters_query = """
(WITH officer_subset AS (
SELECT o.id, o.first_name, o.last_name, o.birth_year, o.appointed_date,
        date_part('year', '2018-01-01'::DATE) - o.birth_year as estimated_age,
        ('2018-01-01'::DATE - o.appointed_date) / 365 as years_on_force,
        COUNT(a.id) as allegation_count,
         (SELECT da.incident_date
             FROM data_officerallegation oa
             JOIN data_allegation da on oa.allegation_id = da.id
              JOIN data_allegationcategory category on oa.allegation_category_id = category.id
             WHERE o.id = oa.officer_id
              AND category.category NOT IN ('Operation/Personnel Violations',
                     'Lockup Procedures',
                     'Traffic',
                     'Supervisory Responsibilities',
                     'Unknown',
                     'Medical')
             LIMIT 1
             OFFSET 10
          ) as repeater_start_date
  FROM data_officer o
  LEFT JOIN data_officerallegation a on o.id = a.officer_id
  WHERE active = 'Yes'
      AND appointed_date BETWEEN '2000-01-01' AND '2007-12-31'
  GROUP BY o.id
  ORDER BY years_on_force DESC) 
  SELECT
           officer.id as officer_id,
           officer.appointed_date as appointed_date,
           (SELECT da.incident_date
               FROM data_officerallegation oa
               JOIN data_allegation da on oa.allegation_id = da.id
                JOIN data_allegationcategory category on oa.allegation_category_id = category.id
               WHERE officer.id = oa.officer_id
                AND category.category NOT IN ('Operation/Personnel Violations',
                       'Lockup Procedures',
                       'Traffic',
                       'Supervisory Responsibilities',
                       'Unknown',
                       'Medical')
               LIMIT 1
               OFFSET 10
            ) as repeater_start_date
    FROM data_officer officer
    WHERE officer.id NOT IN (SELECT id FROM officer_subset)
  ) repeaters
"""
repeaters_df = cpdp_read(repeaters_query) 
repeaters_df.createOrReplaceTempView("repeaters")

## create vertices
- vertices include officer in the subset and all repeaters

In [9]:
# create vertices
verts = []
for row in officer_subset_df.rdd.collect():
  verts.append((row.id, row.appointed_date, row.repeater_start_date))

for row in repeaters_df.where(col("repeater_start_date").isNotNull()).rdd.collect():
  verts.append((row.officer_id, row.appointed_date, row.repeater_start_date))
  
vertices = sqlContext.createDataFrame(verts, ['id', 'appointed_date', 'repeater_start_date'])
vertices.write.mode('overwrite').save("/cp4_q1/vertices")

In [10]:
# pull in data_officerallegations
cpdp_read('data_officerallegation').createOrReplaceTempView('data_officerallegation')


## Query for officers first allegations
- For officers within the subset

In [12]:
# load df with first allegations
first_allegations_query = """
(WITH officer_subset AS (
SELECT o.id, o.first_name, o.last_name, o.birth_year, o.appointed_date,
        date_part('year', '2018-01-01'::DATE) - o.birth_year as estimated_age,
        ('2018-01-01'::DATE - o.appointed_date) / 365 as years_on_force,
        COUNT(a.id) as allegation_count,
         (SELECT da.incident_date
             FROM data_officerallegation oa
             JOIN data_allegation da on oa.allegation_id = da.id
              JOIN data_allegationcategory category on oa.allegation_category_id = category.id
             WHERE o.id = oa.officer_id
              AND category.category NOT IN ('Operation/Personnel Violations',
                     'Lockup Procedures',
                     'Traffic',
                     'Supervisory Responsibilities',
                     'Unknown',
                     'Medical')
             LIMIT 1
             OFFSET 10
          ) as repeater_start_date
  FROM data_officer o
  LEFT JOIN data_officerallegation a on o.id = a.officer_id
  WHERE active = 'Yes'
      AND appointed_date BETWEEN '2000-01-01' AND '2007-12-31'
  GROUP BY o.id
  ORDER BY years_on_force DESC)
  SELECT da.id as allegation_id,
           o.id as officer_id,
           dc.category as category,
           da.incident_date as incident_date
    FROM officer_subset o
    JOIN data_officerallegation oa ON oa.officer_id = o.id
    JOIN data_allegation da on oa.allegation_id = da.id
    JOIN data_allegationcategory dc on oa.allegation_category_id = dc.id
    WHERE da.incident_date = (
        SELECT d.incident_date
        FROM data_officerallegation
        JOIN data_allegation d on data_officerallegation.allegation_id = d.id
        WHERE officer_id = o.id
        ORDER BY d.incident_date LIMIT 1)
        AND dc.category NOT IN ('Operation/Personnel Violations',
                       'Lockup Procedures',
                       'Traffic',
                       'Supervisory Responsibilities',
                       'Unknown',
                       'Medical')) first_allegations
  """
first_allegations_df = cpdp_read(first_allegations_query)
first_allegations_df.createOrReplaceTempView('first_allegations')

## Create Edges
- edges are officers first allegation co-accused with a repeater
- **NOTE**: This takes a very long time (~3.5) hours. Do not run, we have provode code below to load the data

In [14]:
edges = []
for row in first_allegations_df.rdd.collect():
  co_df = spark.sql("SELECT id, allegation_id, officer_id FROM data_officerallegation WHERE allegation_id = " + str(row.allegation_id) + " AND NOT officer_id = " + str(row.officer_id))
  if co_df.count() > 0:
    for co in co_df.rdd.collect():
      repeater = spark.sql("SELECT * FROM repeaters WHERE officer_id = " + str(co.officer_id) + " AND repeater_start_date < '" + str(row.incident_date) + "'").rdd
      if not repeater.isEmpty():
        edges.append((row.officer_id, repeater.first().officer_id, row.allegation_id, row.category))
        
edges_df = sqlContext.createDataFrame(edges, ['src', 'dst', 'allegation_id', 'allegation_category'])
edges_df.write.mode('overwrite').save("/cp4_q1/edges")

## Load the vertices and edges from disk
- run this instead of the one above

In [16]:
from pyspark import SparkFiles

edges_url = "https://nw-databricks.s3.amazonaws.com/wise-lobsters/cp4/q1_edges.csv"
spark.sparkContext.addFile(edges_url)
s3_edges_df = spark.read.csv("file://"+SparkFiles.get("q1_edges.csv"), header=True, inferSchema= True)

vertices_url = "https://nw-databricks.s3.amazonaws.com/wise-lobsters/cp4/q1_vertices.csv"
spark.sparkContext.addFile(vertices_url)
s3_vertices_df = spark.read.csv("file://"+SparkFiles.get("q1_vertices.csv"), header=True, inferSchema= True)

# vertices_df = spark.read.load("/cp4_q1/vertices")
# edges_df = spark.read.load("/cp4_q1/edges")

g = GraphFrame(s3_vertices_df, s3_edges_df)

# Analysis and Graph Exploration

## in and out-degrees

In [19]:
display(g.outDegrees)

id,outDegree
3015,1
9691,1
21121,3
4509,1
4191,2
3034,1
11582,1
2157,1
12253,2
14442,1


In [20]:
display(edges_df)

src,dst,allegation_id,allegation_category
5360,5193,24634,Verbal Abuse
26466,9998,21295,Illegal Search
26466,2307,21295,Illegal Search
26466,15599,21295,Illegal Search
19875,8063,18973,Illegal Search
19875,9620,18973,Illegal Search
10321,17611,37416,Domestic
29600,18180,30508,Verbal Abuse
29600,3216,30508,Verbal Abuse
8358,9634,69971,Illegal Search


In [21]:
display(s3_edges_df)

src,dst,allegation_id,allegation_category
5360,5193,24634,Verbal Abuse
26466,9998,21295,Illegal Search
26466,2307,21295,Illegal Search
26466,15599,21295,Illegal Search
19875,8063,18973,Illegal Search
19875,9620,18973,Illegal Search
10321,17611,37416,Domestic
29600,18180,30508,Verbal Abuse
29600,3216,30508,Verbal Abuse
8358,9634,69971,Illegal Search


## PageRank

In [23]:
result= g.pageRank(resetProbability=0.15,tol=0.01)
display(result.vertices)

id,appointed_date,repeater_start_date,pagerank
19984,2004-04-26,,0.9558666095706946
18624,2006-07-31,2010-04-20T00:00:00.000+0000,0.9558666095706946
22384,2004-04-26,,0.9558666095706946
30168,2005-08-29,,0.9558666095706946
31760,1999-10-25,2010-10-20T00:00:00.000+0000,0.9558666095706946
24904,2004-11-29,2009-08-12T00:00:00.000+0000,0.9558666095706946
6400,1986-10-13,1992-09-28T00:00:00.000+0000,0.9558666095706946
19144,1977-01-03,2007-04-09T00:00:00.000+0000,0.9558666095706946
24384,2004-11-29,,0.9558666095706946
752,2006-11-27,,0.9558666095706946


## Highest In-degree network visualization

#### officers with highest in-degrees (most co-accusals with first time allegations)

In [26]:
result_Degree = g.inDegrees.orderBy('inDegree', ascending=False).filter(col("inDegree") > 3)
display(result_Degree)

id,inDegree
31631,7
20125,6
16812,6
22235,5
14696,5
13303,5
27933,5
4530,5
11190,5
4638,5


In [27]:
# display(result_Degree.select('id'))
ids = [row.id for row in result_Degree.select('id').collect()]
# print(ids)
sub_result = g.edges.filter(col("dst").isin(ids))
node_ids = [row.src for row in sub_result.select('src').collect()]
nodes = g.filterVertices(col('id').isin(node_ids))
display(nodes.ver)
# print(nodes.count)

src,dst,allegation_id,allegation_category


#### Plot the whole network

In [29]:
#Step 3 : Plot using networkx 
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
print(type(first_allegations_df))
# first_allegations_df = first_allegations_df.toPandas()

edges_pdf = g.edges.toPandas()

G = nx.from_pandas_edgelist(edges_pdf,'src','dst', edge_attr='allegation_id')
fig, ax = plt.subplots(figsize=(50,50))
nx.draw(G,node_color='#A0CBE2',edge_color='#BB0000',width=2,edge_cmap=plt.cm.Blues,with_labels=True)
display(fig)

#### Plot a subgrah of highest in-degree officers
- 3 levels deep
- formations of networks can clearly be seen

In [31]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

result_Degree = g.inDegrees.orderBy('inDegree', ascending=False).filter(col("inDegree") > 1)
                                                                       
ids = [row.id for row in result_Degree.select('id').collect()]
# print(ids)
sub_result = g.edges.filter(col("dst").isin(ids))
node_ids = [row.src for row in sub_result.select('src').collect()]
level2_nodes = g.filterVertices(col('id').isin(node_ids)).vertices

ids_level2 = [row.id for row in level2_nodes.select('id').collect()]
level2_edges = g.edges.filter(col("dst").isin(node_ids))

level2_nodes_ids = [row.src for row in level2_edges.select('src').collect()]
level3_nodes = g.filterVertices(col('id').isin(level2_nodes_ids)).vertices
ids_level3 = [row.id for row in level3_nodes.select('id').collect()]
level3_edges = g.edges.filter(col("dst").isin(level2_nodes_ids))


subgraph_edges = sub_result.union(level2_edges).union(level3_edges)

nodes_edges_pdf = subgraph_edges.toPandas()

G = nx.from_pandas_edgelist(nodes_edges_pdf,'src','dst', edge_attr='allegation_id')
fig, ax = plt.subplots(figsize=(40,40))
nx.draw(G,node_color='#A0CBE2',edge_color='#BB0000',width=2,edge_cmap=plt.cm.Blues,with_labels=True)
display(fig)