***Instructions to run the colab file: *** In case of any Java error, one should always first try to restart the runtime from 'Runtime' dropdown menu and rerun the cells because Spark session could have been disrupted. 

In [None]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark

# install pyspark
!pip3 install pyspark==3.2.0

# install graphframes
!pip3 install graphframes

Collecting pyspark==3.2.0
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 69.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=8e9e0e3621c9be0af70efcafbdf8837bd6d297169e47ab11d549796e5e910b7f
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Collecting nose
  Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[K     |████████████████████████████████| 154 kB 6.8 MB/s 
In

Download the graphframes jar file from: [Graphframe jar file:](https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar)

Upload it in the Google Colab Files folder. Can be found in the left pane of this window.

In [None]:
!cp -v /content/graphframes-0.8.2-spark3.2-s_2.12.jar $SPARK_HOME/jars/

'/content/graphframes-0.8.2-spark3.2-s_2.12.jar' -> '/content/spark-3.2.0-bin-hadoop3.2/jars/graphframes-0.8.2-spark3.2-s_2.12.jar'


In [None]:
#import the packages
from pyspark import *
from pyspark.sql import *
from graphframes import *
import findspark
import pandas as pd
import psycopg2
import networkx as nx
import operator
from pyspark.sql.functions import monotonically_increasing_id 

# ----- Connect to the PostgreSQL Database -------
conn = psycopg2.connect(
    host="codd01.research.northwestern.edu",
    database="postgres",
    user="cpdbstudent",
    password="DataSci4AI")

findspark.init()

# Start a Spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

  """)


## **Q1. Network Analysis of officers and investigators to study the influence of their relationship on disciplinary action for unsustained allegations**

In [None]:
# --------- Q1: Community Detection ---------
# Extract tables from CPDB Database
# Vertices: Investigators i and Officers o
# Edges: Investigator i investigating in an allegation accusing an officer o
edge_query = "SELECT CAST(di.investigator_id as varchar) || '_i' as src, CAST(d.officer_id as varchar) || '_o' as dst, di.allegation_id as relationship FROM data_investigatorallegation di JOIN (SELECT allegation_id, officer_id FROM data_allegation a JOIN data_officerallegation d on a.crid = d.allegation_id WHERE final_finding = 'UN' OR final_finding = 'EX' OR final_finding = 'NS' AND a.is_officer_complaint = FALSE) d on di.allegation_id = d.allegation_id"
edges_df = pd.read_sql_query(edge_query, con=conn)

vertex_query = "SELECT CAST(di.investigator_id as varchar) || '_i' as id, d.first_name || ' ' || d.last_name as name FROM data_investigatorallegation di join data_investigator d on di.investigator_id = d.id union SELECT CAST(doa.officer_id as varchar) || '_o' as inv_id, first_name || ' ' || last_name as name FROM data_officerallegation doa JOIN data_officer o on doa.officer_id = o.id"
vertices_df = pd.read_sql_query(vertex_query, con=conn)

df_1 = vertices_df[vertices_df.id.isin(edges_df.dst)]
df_2 = vertices_df[vertices_df.id.isin(edges_df.src)]
frames = [df_1, df_2]
  
vertices_df = pd.concat(frames)

# Create Spark dataframes
edges = spark.createDataFrame(edges_df)
vertices = spark.createDataFrame(vertices_df)

print(' Unique vertices in our graph: ', vertices_df.id.nunique())
print(' Unique edges in our graph: ', edges_df.relationship.nunique())
print(' Total number of investigators in our graph: ', edges_df.src.nunique())
print(' Total number of officers in our graph: ', edges_df.dst.nunique())

 Unique vertices in our graph:  17353
 Unique edges in our graph:  41210
 Total number of investigators in our graph:  3011
 Total number of officers in our graph:  14342


In [None]:
# Create the graph
graph1 = GraphFrame(vertices, edges)

In [None]:
# Display vertices
graph1.vertices.show()

+-------+-----------------+
|     id|             name|
+-------+-----------------+
|10000_o|Timothy Gilliland|
|10001_o|    David Gillott|
|10002_o|      Debra Gills|
|10005_o|   Anthony Gillum|
|10007_o|   Gregory Gilmer|
|10012_o|   Horace Gilmore|
|10014_o|  Patrick Gilmore|
|10017_o|    Steve Gilmour|
| 1001_o|    Michael Ayala|
|10020_o|    John Giltmier|
|10022_o|    Beth Giltmier|
|10024_o|   Steven Gimenez|
|10029_o|  Joseph Giorango|
| 1002_o|     Fausto Ayala|
|10030_o|   Caryl Giordano|
|10031_o|  Nicola Giordano|
|10033_o|Vito Giovannielli|
|10036_o|    Lamont Gipson|
|10038_o|    Darius Gipson|
|10043_o|   Robert Girardi|
+-------+-----------------+
only showing top 20 rows



In [None]:
# Display edges
graph1.edges.show()

+------+-------+------------+
|   src|    dst|relationship|
+------+-------+------------+
|4487_i|31001_o|      260473|
|2985_i|16043_o|     1022748|
|3224_i|16043_o|     1022748|
|3208_i|19438_o|     1025931|
|3104_i|30366_o|     1042143|
|2163_i|30366_o|     1042143|
|1725_i|30366_o|     1042143|
|3166_i|30366_o|     1042143|
|4134_i|20053_o|      276924|
|3026_i|29568_o|      264936|
|4146_i|19921_o|      268878|
|1630_i| 8690_o|     1081748|
|1736_i|14058_o|     1081476|
|1471_i|14058_o|     1081476|
|1440_i|14058_o|     1081476|
|1498_i|14058_o|     1081476|
|1736_i|24844_o|     1081476|
|1471_i|24844_o|     1081476|
|1440_i|24844_o|     1081476|
|1498_i|24844_o|     1081476|
+------+-------+------------+
only showing top 20 rows



### **Count the number of co-occurrences of every distinct pair of investigators and officers**

In [None]:
series = edges_df[['src','dst']].value_counts()
series.head(15)

src     dst    
3886_i  23841_o    24
2538_i  32166_o    24
3886_i  8562_o     23
2538_i  12478_o    21
3886_i  3454_o     20
3278_i  2725_o     20
3886_i  18076_o    17
3836_i  8386_o     16
3278_i  32164_o    16
3886_i  27778_o    15
2815_i  25039_o    15
3836_i  8620_o     15
2512_i  31837_o    14
3278_i  21615_o    14
3836_i  18205_o    13
dtype: int64

In [None]:
edges_df[edges_df['src']=='3886_i'].nunique()
# edges_df[edges_df['dst']=='3277_o'][edges_df['src']=='3886_i']

src              1
dst             51
relationship    54
dtype: int64

### **Connected Components**

In [None]:
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate(SparkConf())
sc.setCheckpointDir('/content/sample_data')


In [None]:
# This cell takes approx 3 min to finish running
result = graph1.connectedComponents()
result.show()

+-------+-----------------+---------+
|     id|             name|component|
+-------+-----------------+---------+
|10000_o|Timothy Gilliland|        0|
|10001_o|    David Gillott|        0|
|10002_o|      Debra Gills|        0|
|10005_o|   Anthony Gillum|        0|
|10007_o|   Gregory Gilmer|        0|
|10012_o|   Horace Gilmore|        0|
|10014_o|  Patrick Gilmore|        0|
|10017_o|    Steve Gilmour|        0|
| 1001_o|    Michael Ayala|        0|
|10020_o|    John Giltmier|        0|
|10022_o|    Beth Giltmier|        0|
|10024_o|   Steven Gimenez|        0|
|10029_o|  Joseph Giorango|        0|
| 1002_o|     Fausto Ayala|        0|
|10030_o|   Caryl Giordano|        0|
|10031_o|  Nicola Giordano|        0|
|10033_o|Vito Giovannielli|        0|
|10036_o|    Lamont Gipson|        0|
|10038_o|    Darius Gipson|        0|
|10043_o|   Robert Girardi|        0|
+-------+-----------------+---------+
only showing top 20 rows



In [None]:
# Check the number of unique components present in the graph
result_df = result.toPandas()
print('There are {} connected components in our graph.'.format(result_df.component.nunique()))
# result_df.groupby('component').head()

There are 51 connected components in our graph.


In [None]:
# Check the number of nodes within a component.
result_df.component.value_counts().head(10)

0                17240
438086664195         4
824633720857         3
721554505808         3
274877907024         3
292057776162         3
206158430212         3
1125281431556        3
180388626439         3
180388626441         3
Name: component, dtype: int64

### **Outdegrees of graph1**

In [None]:
x = graph1.outDegrees
x.orderBy('outDegree', ascending=False).show()

+------+---------+
|    id|outDegree|
+------+---------+
|1438_i|     2497|
|1657_i|     2093|
|2376_i|     1888|
|2472_i|     1593|
|1715_i|     1500|
|2970_i|     1485|
|2994_i|     1475|
|1595_i|     1448|
|2985_i|     1440|
|3183_i|     1358|
|3270_i|     1286|
|3062_i|     1200|
|2724_i|     1182|
|3028_i|     1163|
|1419_i|     1018|
|2375_i|     1018|
|1667_i|      955|
|1688_i|      949|
|3204_i|      914|
|1676_i|      877|
+------+---------+
only showing top 20 rows



## **Q2. Centrality Analysis of police officers co-accused in unsustained allegations**

In [None]:
# --------- Q2: Central Analysis ---------
# Extract tables from CPDB Database
pd_trr_trr = pd.read_sql_query("select officer_id, officer_unit_id from trr_trr", con=conn)
pd_data_officer = pd.read_sql_query("select id, first_name || ' ' || last_name officer_name, unsustained_count from data_officer", con=conn)
edges_query = "SELECT da1.officer_id src, da2.officer_id dst, COUNT(DISTINCT da1.allegation_id) relationship FROM data_officerallegation da1 JOIN data_officerallegation da2 ON da1.allegation_id = da2.allegation_id AND da1.officer_id < da2.officer_id GROUP BY da1.officer_id, da2.officer_id ORDER BY count(*) DESC;"

In [None]:
# Vertices of the graph: Polices officers and its count of unsustained allegations
sp_officers_vertices = spark.createDataFrame(pd_data_officer)

In [None]:
# Edges of the graph: Relationship between officers reported in same allegations
df_edges = pd.read_sql_query(edges_query, con=conn)

In [None]:
sp_edges = spark.createDataFrame(df_edges)
sp_edges.show()

+-----+-----+------------+
|  src|  dst|relationship|
+-----+-----+------------+
|12478|32166|          53|
| 8562|27778|          47|
| 1553|10724|          43|
| 2725|21703|          41|
| 3605|14442|          41|
|12479|20713|          40|
| 8658|13788|          38|
|14045|15502|          37|
| 2369| 7015|          36|
| 1553|16699|          35|
|12074|12825|          35|
| 8852|31119|          35|
|15360|23239|          34|
|13361|20150|          34|
| 8562|23841|          32|
| 8562|18206|          32|
|32265|32347|          32|
|31882|32401|          32|
|31119|32336|          32|
|27415|32172|          31|
+-----+-----+------------+
only showing top 20 rows



In [None]:
g = GraphFrame(sp_officers_vertices, sp_edges)

### **Triangle Count Algorithm: to check connectivity of officers within the network**

In [None]:
triangle = g.triangleCount()
lenght = triangle.count()
triangle = triangle.orderBy('count', ascending=False)
triangle = triangle.select("*").withColumn("index", monotonically_increasing_id())
triangle = triangle.select("*").withColumn("percentile", (lenght-monotonically_increasing_id())/lenght)
triangle.show()

+-----+-----+------------------+-----------------+-----+------------------+
|count|   id|      officer_name|unsustained_count|index|        percentile|
+-----+-----+------------------+-----------------+-----+------------------+
|32118| 6315|     Terence Davis|             19.0|    0|               1.0|
|32117| 3033|    Raimondo Brown|              8.0|    1| 0.999971866647911|
|32073| 3744|    Derek Campbell|              5.0|    2|0.9999437332958222|
|27855|18042|     Donald Mc Coy|              9.0|    3|0.9999155999437332|
|27823|  441|   Fernando Alonzo|             12.0|    4|0.9998874665916444|
|23900|21530|Michael Overstreet|             36.0|    5|0.9998593332395554|
|23518|27349|   Charles Stanton|              2.0|    6|0.9998311998874666|
|23499| 5180|    Stephen Conner|              4.0|    7|0.9998030665353776|
|23487| 5667|     Jerry Crawley|             14.0|    8|0.9997749331832888|
|23477|16747|     Evetta Lundin|              4.0|    9|0.9997467998311999|
|23475| 8844

In [None]:
unsustained_count = triangle.na.drop()
unsustained_count = unsustained_count.orderBy('unsustained_count', ascending=False)
unsustained_count.show()

+-----+-----+-----------------+-----------------+-----+------------------+
|count|   id|     officer_name|unsustained_count|index|        percentile|
+-----+-----+-----------------+-----------------+-----+------------------+
| 1251| 8562|  Jerome Finnigan|            112.0|  588| 0.983457588971726|
|  244|28805|  Charles Toussas|             87.0| 2106|0.9407511605007737|
| 1402|17816|       Edward May|             87.0|  567|0.9840483893655929|
|  244|10890|     James Grubbs|             79.0| 2103|0.9408355605570403|
|   94|21837|       Joe Parker|             78.0| 4689|0.8680827120551414|
|  329|29033|Jerome Turbyville|             76.0| 1529|0.9569841046560698|
|  129| 4807|  Maurice Clayton|             75.0| 3703|0.8958221972147982|
|19322| 8138|      Glenn Evans|             74.0|   14|0.9996061330707554|
|  118|13788|  Broderick Jones|             73.0| 4001|0.8874384582923055|
| 1369| 2356|      Harold Bone|             67.0|  572|0.9839077226051484|
|  125|13391|   Tyrone Je

In [None]:
# Summary table of triangle count
dataframe_un = triangle.toPandas()
percentile_list = [0.0] * 6
count = [0] * 6
triangle_list = [0] * 6

for i,row in dataframe_un.iterrows():
  unsustained_count = row['unsustained_count']
  percentile = row['percentile']
  trian = row['count']

  if unsustained_count >= 50:
    count[5]=count[5]+1
    percentile_list[5] = percentile_list[5] + percentile
    triangle_list[5] = triangle_list[5] + trian

  elif unsustained_count >= 40 and unsustained_count < 50:
    count[4]=count[4]+1
    percentile_list[4] = percentile_list[4] + percentile
    triangle_list[4] = triangle_list[4] + trian

  elif unsustained_count >= 30 and unsustained_count < 40:
    count[3]=count[3]+1
    percentile_list[3] = percentile_list[3] + percentile
    triangle_list[3] = triangle_list[3] + trian

  elif unsustained_count >= 20 and unsustained_count < 30:
    count[2]=count[2]+1
    percentile_list[2] = percentile_list[2] + percentile
    triangle_list[2] = triangle_list[2] + trian

  elif unsustained_count >= 10 and unsustained_count < 20:
    count[1]=count[1]+1
    percentile_list[1] = percentile_list[1] + percentile
    triangle_list[1] = triangle_list[1] + trian

  elif unsustained_count < 10:
    count[0]=count[0]+1
    percentile_list[0] = percentile_list[0] + percentile
    triangle_list[0] = triangle_list[0] + trian

summary = pd.DataFrame()
percentile_avg = []
range_i = []
triangle_i = []
iteration = [0,1,2,3,4,5]

for k in iteration: 
  percent_aux = percentile_list[k]/ count[k]
  range_aux = str(10*k)+" - "+str(10*(k+1))
  triangle_aux = triangle_list[k]/ count[k]
  range_i.append(range_aux)
  percentile_avg.append(percent_aux)
  triangle_i.append(triangle_aux)

summary['range'] = range_i
summary['percentile avg'] = percentile_avg
summary['triangle count avg'] = triangle_i

print(summary)


     range  percentile avg  triangle count avg
0   0 - 10        0.468222          205.646450
1  10 - 20        0.825785          629.895242
2  20 - 30        0.876647          759.583587
3  30 - 40        0.904187          992.107477
4  40 - 50        0.934042         1660.311475
5  50 - 60        0.938401         1956.477273


### **Motif Finding technique: to analyse the extent of influence of bad behaviour in the network**

In [None]:
# Select subgraph when relationship > 0
paths = g.find("(a)-[e]->(b)")\
  .filter("e.relationship > 0")

# "paths" contains vertex info. Extract the edges.
e2 = paths.select("e.src", "e.dst", "e.relationship")
e3 = paths.select("e.src", "e.dst", "e.relationship")

# Construct the subgraph
g2 = GraphFrame(g.vertices, e2)
g3 = GraphFrame(g.vertices, e3)

In [None]:
# Search for police officers that spread misconduct
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(c); (c)-[e3]->(d)")
motifs.show()
motifs.select("a").distinct().count()

+--------------------+-------------+--------------------+-------------+--------------------+---------------+--------------------+
|                   a|            e|                   b|           e2|                   c|             e3|                   d|
+--------------------+-------------+--------------------+-------------+--------------------+---------------+--------------------+
|{113, Donna Adams...|{113, 200, 1}|{200, Richard Agu...|{200, 241, 1}|{241, Willie Aker...|{241, 28412, 5}|{28412, Timothy T...|
|{113, Donna Adams...|{113, 200, 1}|{200, Richard Agu...|{200, 241, 1}|{241, Willie Aker...|{241, 18069, 4}|{18069, James Mc ...|
|{113, Donna Adams...|{113, 200, 1}|{200, Richard Agu...|{200, 241, 1}|{241, Willie Aker...| {241, 5782, 3}|{5782, Willie Cro...|
|{113, Donna Adams...|{113, 200, 1}|{200, Richard Agu...|{200, 241, 1}|{241, Willie Aker...|{241, 26437, 2}|{26437, Theresa S...|
|{113, Donna Adams...|{113, 200, 1}|{200, Richard Agu...|{200, 241, 1}|{241, Willie Aker..

17215

### **OutDegrees of officer nodes**

In [None]:
# outDegrees is the number of edges directed out of a vertex in a directed graph
outDegrees = g.outDegrees.sort(['outDegree'],ascending=[0])
outDegrees.show()

df_outDegrees = outDegrees.toPandas()
new_df = df_outDegrees.merge(pd_data_officer, left_on='id', right_on='id')
new_df


+----+---------+
|  id|outDegree|
+----+---------+
|3033|      332|
| 441|      330|
|3744|      313|
|6315|      300|
|8138|      290|
|2375|      266|
|5667|      260|
|5180|      258|
| 200|      243|
| 127|      241|
|9821|      226|
|8844|      225|
|2369|      224|
|1932|      222|
|3220|      214|
| 373|      208|
| 843|      203|
|1907|      203|
|4844|      202|
|1509|      201|
+----+---------+
only showing top 20 rows



Unnamed: 0,id,outDegree,officer_name,unsustained_count
0,3033,332,Raimondo Brown,8.0
1,441,330,Fernando Alonzo,12.0
2,3744,313,Derek Campbell,5.0
3,6315,300,Terence Davis,19.0
4,8138,290,Glenn Evans,74.0
...,...,...,...,...
18838,2800,1,John Bribiesca,1.0
18839,13812,1,Curtis Jones,0.0
18840,3376,1,Marcellus Burke,0.0
18841,17961,1,John Mc Carthy,0.0


In [None]:
# Summary table of outDegrees

unsustained_list = [0.0] * 6
count_officers = [0] * 6
outdegree_list = [0] * 6

for i,row3 in new_df.iterrows():
  unsustained_c = row3['unsustained_count']
  outdeg = row3['outDegree']

  if unsustained_c >= 50:
    count_officers[5] = count_officers[5]+1
    outdegree_list[5] = outdegree_list[5] + outdeg

  elif unsustained_c >= 40 and unsustained_c < 50:
    count_officers[4] = count_officers[4]+1
    outdegree_list[4] = outdegree_list[4] + outdeg

  elif unsustained_c >= 30 and unsustained_c < 40:
    count_officers[3] = count_officers[3]+1
    outdegree_list[3] = outdegree_list[3] + outdeg

  elif unsustained_c >= 20 and unsustained_c < 30:
    count_officers[2] = count_officers[2]+1
    outdegree_list[2] = outdegree_list[2] + outdeg

  elif unsustained_c >= 10 and unsustained_c < 20:
    count_officers[1] = count_officers[1]+1
    outdegree_list[1] = outdegree_list[1] + outdeg

  elif unsustained_c < 10:
    count_officers[0] = count_officers[0]+1
    outdegree_list[0] = outdegree_list[0] + outdeg

summary_outdegree = pd.DataFrame()
unsustained_avg = []
range_j = []
outdegree_i = []
iteration = [0,1,2,3,4,5]

for k in iteration: 
  outdegree_aux = outdegree_list[k]/ count_officers[k]
  range_aux = str(10*k)+" - "+str(10*(k+1))

  range_j.append(range_aux)
  outdegree_i.append(outdegree_aux)

summary_outdegree['Unsustained range'] = range_j
summary_outdegree['Outdegree avg'] = outdegree_i

print(summary_outdegree)

  Unsustained range  Outdegree avg
0            0 - 10       9.543477
1           10 - 20      20.999209
2           20 - 30      27.201550
3           30 - 40      32.773585
4           40 - 50      43.866667
5           50 - 60      55.500000
