In [17]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

# install findspark using pip
!pip install -q findspark

# install pyspark
!pip3 install pyspark==3.2.0

# install graphframes
!pip3 install graphframes



In [18]:
!cp -v /content/graphframes-0.8.2-spark3.2-s_2.12.jar $SPARK_HOME/jars/

'/content/graphframes-0.8.2-spark3.2-s_2.12.jar' -> '/content/spark-3.2.0-bin-hadoop3.2/jars/graphframes-0.8.2-spark3.2-s_2.12.jar'


In [19]:
#import the packages
from pyspark import *
from pyspark.sql import *
from graphframes import *
import findspark
import pandas as pd

findspark.init()

# Start a Spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [20]:
import psycopg2

In [21]:
# access the postgresql server
conn = psycopg2.connect(
    host="codd04.research.northwestern.edu",
    port = "5433",
    database="postgres",
    user="cpdbstudent",
    password="DataSci4AI")

In [22]:
cursor = conn.cursor()

In [23]:
edges_query = '''--Co-accusals for officers with DAM allegations earning above average salary
DROP TABLE IF EXISTS da_category_ids;
CREATE TEMP TABLE da_category_ids AS (
    SELECT id
    FROM data_allegationcategory
    WHERE data_allegationcategory.category = 'Drug / Alcohol Abuse' OR data_allegationcategory.category = 'Medical' or allegation_name LIKE 'Medical Roll%'
    OR data_allegationcategory.category_code IN ('08J', '024', '003', '003A', '003B', '003C', '003D', '003E'));

DROP TABLE IF EXISTS da_cohort;
CREATE TEMP TABLE da_cohort AS (
    SELECT DISTINCT officer_id
    FROM data_officerallegation
    WHERE allegation_category_id IN (SELECT * from da_category_ids));

DELETE FROM da_cohort c
USING data_salary s WHERE s.officer_id = c.officer_id
AND s.salary < (SELECT AVG(salary) FROM data_salary);

DROP TABLE IF EXISTS class_example;
CREATE TEMP TABLE class_example AS (
SELECT da1.officer_id src, da2.officer_id dst, COUNT(DISTINCT da1.allegation_id) relationship
FROM data_officerallegation da1
JOIN data_officerallegation da2 ON da1.allegation_id = da2.allegation_id AND da1.officer_id < da2.officer_id
GROUP BY da1.officer_id, da2.officer_id ORDER BY count(*) DESC);

DROP TABLE IF EXISTS edgesq1;
CREATE TEMP TABLE edgesq1 AS (
SELECT src,dst,relationship FROM class_example
JOIN da_cohort ON da_cohort.officer_id = class_example.src);

select * from edgesq1
'''

In [24]:
nodes_query = '''
--Co-accusals for officers with DAM allegations earning above average salary
DROP TABLE IF EXISTS da_category_ids;
CREATE TEMP TABLE da_category_ids AS (
    SELECT id
    FROM data_allegationcategory
    WHERE data_allegationcategory.category = 'Drug / Alcohol Abuse' OR data_allegationcategory.category = 'Medical' or allegation_name LIKE 'Medical Roll%'
    OR data_allegationcategory.category_code IN ('08J', '024', '003', '003A', '003B', '003C', '003D', '003E'));

DROP TABLE IF EXISTS da_cohort;
CREATE TEMP TABLE da_cohort AS (
    SELECT DISTINCT officer_id
    FROM data_officerallegation
    WHERE allegation_category_id IN (SELECT * from da_category_ids));

DELETE FROM da_cohort c
USING data_salary s WHERE s.officer_id = c.officer_id
AND s.salary < (SELECT AVG(salary) FROM data_salary);

DROP TABLE IF EXISTS class_example;
CREATE TEMP TABLE class_example AS (
SELECT da1.officer_id src, da2.officer_id dst, COUNT(DISTINCT da1.allegation_id) relationship
FROM data_officerallegation da1
JOIN data_officerallegation da2 ON da1.allegation_id = da2.allegation_id AND da1.officer_id < da2.officer_id
GROUP BY da1.officer_id, da2.officer_id ORDER BY count(*) DESC);

DROP TABLE IF EXISTS edgesq1;
CREATE TEMP TABLE edgesq1 AS (
SELECT src,dst,relationship FROM class_example
JOIN da_cohort ON da_cohort.officer_id = class_example.src);

DROP TABLE IF EXISTS withoutRelation;
CREATE TEMP TABLE withoutRelation AS (
select id,first_name || ' ' || last_name officer_name from data_officer where id in
(select src from edgesq1) and id in (select dst from edgesq1));

select withoutRelation.id,withoutRelation.officer_name,data_salary.salary from withoutRelation
join data_salary on withoutRelation.id=data_salary.id;





'''

In [25]:
import pandas as pd

In [26]:
cursor.execute(edges_query)
edges = cursor.fetchall()
print("shape is: " + str(len(edges))) # 17465

df_edges = pd.DataFrame(edges)
colnames = [desc[0] for desc in cursor.description]
df_edges.columns = colnames

print(df_edges)

shape is: 6288
        src    dst  relationship
0         9  12641             1
1       200   3133             1
2       200    241             1
3       200    368             1
4       200    381             1
...     ...    ...           ...
6283  30952  31341             1
6284  30952  32074             1
6285  31438  32405             1
6286  31464  32032             1
6287  32527  33722             2

[6288 rows x 3 columns]


In [27]:
cursor.execute(nodes_query)
nodes = cursor.fetchall()
print("shape is: " + str(len(nodes))) # 17465

df_nodes = pd.DataFrame(nodes)
colnames = [desc[0] for desc in cursor.description]
df_nodes.columns = colnames

print(df_nodes)

shape is: 132
        id       officer_name  salary
0    12967        Sonia Irwin   54672
1    27872   John Summerville   72810
2    19484          M L Moore   89130
3     4416     Jerome Chapman   83220
4     3993    Rodney Carriger   78012
..     ...                ...     ...
127   3087        Henry Brown   61518
128   3267        Gary Bulava   74178
129   8821  Gwendolyn Flowers   75372
130  22087    Charles Pearson   96648
131  19167         Ryan Milot   75372

[132 rows x 3 columns]


In [28]:
edges_ = spark.createDataFrame(df_edges)

In [29]:
nodes = spark.createDataFrame(df_nodes)

In [30]:
cpdb = GraphFrame(nodes, edges_)

In [31]:
cpdb.vertices.show()

+-----+-----------------+------+
|   id|     officer_name|salary|
+-----+-----------------+------+
|12967|      Sonia Irwin| 54672|
|27872| John Summerville| 72810|
|19484|        M L Moore| 89130|
| 4416|   Jerome Chapman| 83220|
| 3993|  Rodney Carriger| 78012|
| 9723|       Teri Gates| 77238|
|18670|   Gerald Meachum| 77514|
|29807|   Richard Walker| 77514|
|29499|        Leo Velez| 66906|
|15847|  Frederick Layne| 79980|
|23395|   Samuel Ramirez| 61932|
|15516|      John Labiak| 62742|
|21871|     Albert Parks| 73632|
|19154|    Raymond Mills| 75816|
| 9489|   Willie Ganison| 57426|
|27672|   Baxter Streets| 80724|
|20310|      Edward Nega| 78012|
|24929|     Rick Runnels| 68262|
|26184|Edward Shenberger| 77238|
|22770|   Thomas Policky| 66924|
+-----+-----------------+------+
only showing top 20 rows



In [32]:
cpdb.edges.show()

+---+-----+------------+
|src|  dst|relationship|
+---+-----+------------+
|  9|12641|           1|
|200| 3133|           1|
|200|  241|           1|
|200|  368|           1|
|200|  381|           1|
|200|  441|           1|
|200|  607|           1|
|200|  645|           1|
|200|  828|           1|
|200|  843|           1|
|200| 1101|           1|
|200| 1210|           1|
|200| 1230|           1|
|200| 1331|           1|
|200| 3850|           1|
|200| 3927|           1|
|200| 4122|           1|
|200| 4844|           1|
|200| 4910|           1|
|200| 4993|           1|
+---+-----+------------+
only showing top 20 rows



In [33]:
cpdb.vertices.sort(['id'],ascending=True).show()

+----+---------------+------+
|  id|   officer_name|salary|
+----+---------------+------+
| 927|    John Atkins| 70656|
|1649|  Donald Becton|103590|
|3087|    Henry Brown| 61518|
|3267|    Gary Bulava| 74178|
|3993|Rodney Carriger| 78012|
|4034| Robert Carroll| 92430|
|4416| Jerome Chapman| 83220|
|5256|    John Conway| 66906|
|5513|   James Cotton| 82878|
|5683|   Victor Creed|117894|
|5820|     Jimmy Cruz| 69684|
|5966|  Joseph Curtin| 61566|
|7000|   Milton Dixon| 78012|
|7005|Vincent Dobbins| 59412|
|7007|  Darrell Dobbs| 60600|
|7099|   Gus Domenech| 80400|
|7455| Thaddeus Dudek| 47604|
|7572|    Kevin Dunne| 70260|
|8015|   Robert Ervin| 82008|
|8045|Victor Escobedo| 78360|
+----+---------------+------+
only showing top 20 rows



In [34]:
tc_cpdb = cpdb.triangleCount()
tc_cpdb.select("id", "count").show()

+-----+-----+
|   id|count|
+-----+-----+
|12967|    0|
|19484|    5|
|15847|    0|
|21871|    4|
|27872|    0|
| 9723|    1|
|24929|    0|
|19154|   55|
| 4416|   55|
|18670|    0|
|15516|    0|
| 9489|   91|
|26184|    0|
|29807|   21|
|27672|    0|
| 3993|    0|
|23395|    0|
|22770|   36|
|17375|    0|
|20310|    0|
+-----+-----+
only showing top 20 rows



In [35]:
pr_cpdb = cpdb.pageRank(resetProbability=0.15, tol=0.01)
#look at the pagerank score for every vertex
pr_cpdb.vertices.orderBy('pagerank', ascending=False).show()
pr_cpdb.edges.show()

+-----+-------------------+------+------------------+
|   id|       officer_name|salary|          pagerank|
+-----+-------------------+------+------------------+
|23005|         Paul Price| 72480| 4.641676118133547|
|22480|     Edwin Phillips| 80724|3.2732836013158644|
|19154|      Raymond Mills| 75816| 2.553781304879127|
|18758|     Erskin Melchor| 73116|2.1062113854673212|
|31438|        James Young| 59412| 1.982905201028162|
|17964|  Michael Mc Carthy| 64992| 1.911034774119742|
|31464|      Michael Ytsen|102978|1.8209790813684303|
|18359|     John Mc Knight| 78450|1.8046105552641556|
|27342|   Michael Stannish| 78006|1.6802670614445063|
|12641|     Charles Howard| 75726| 1.637883922913204|
|19750|       Edward Moses| 74628| 1.633363054803198|
|30313|        George Weir| 70656| 1.633363054803198|
|27518|   Tyrone Stevenson| 73296| 1.633363054803198|
|22751|Sterling Poindexter| 59412| 1.633363054803198|
|29056|     Charles Turner| 70656| 1.633363054803198|
|13066|    Gregory Jackson| 