In [1]:
#references:
  #https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html
  #https://en.wikipedia.org/wiki/Graph_theory
  #https://developer.ibm.com/clouddataservices/2016/07/15/intro-to-apache-spark-graphframes/
  #https://docs.databricks.com/spark/latest/graph-analysis/graphframes/graph-analysis-tutorial.html
  #https://docs.databricks.com/spark/latest/graph-analysis/graphframes/index.html
  #https://towardsdatascience.com/an-introduction-to-spark-graphframe-with-examples-analyzing-the-wikipedia-link-graph-67e58c20a107

In [2]:
from pyspark.sql import *
from graphframes import *
from pyspark.sql.functions import col
from pyspark.sql.functions import *

In [3]:
# all illicit transactions
df_vertices_elicit = spark.sql("SELECT T1.txId, T2.class from elliptic_txs_features_2_csv T1 inner join elliptic_txs_classes_csv T2 on T1.txId = T2.txId WHERE trim(T2.class) = '1' ")
df_vertices_elicit = df_vertices_elicit.withColumnRenamed("txId","id")
df_edges_elicit = spark.sql("SELECT T3.txId1 as src, T3.txID2 as dst from elliptic_txs_edgelist_csv T3")
g_elicit = GraphFrame(df_vertices_elicit, df_edges_elicit)


In [4]:
# all transactions, but only selected columns
df_vertices_all = spark.sql("SELECT T1.txId, T2.class from elliptic_txs_features_2_csv T1 inner join elliptic_txs_classes_csv T2 on T1.txId = T2.txId")
df_vertices_all = df_vertices_all.withColumnRenamed("txId","id")
df_edges_all = spark.sql("SELECT T3.txId1 as src, T3.txID2 as dst from elliptic_txs_edgelist_csv T3")
g_all = GraphFrame(df_vertices_all, df_edges_all)


In [5]:
display(g_all.vertices)


id,class
230425980,unknown
5530458,unknown
232022460,unknown
232438397,2
230460314,unknown
230459870,unknown
230333930,unknown
230595899,unknown
232013274,unknown
232029206,2


In [6]:
display(g_all.edges)

src,dst
230425980,5530458
232022460,232438397
230460314,230459870
230333930,230595899
232013274,232029206
232344069,27553029
36411953,230405052
34194980,5529846
3881097,232457116
230409257,32877982


In [7]:
# count the number of vertices
g_all.vertices.count()

In [8]:
#Label Propagation Algorithm (LPA): Detect communities in a graph, outputted as label
result_all = g_all.labelPropagation(maxIter=5)
result_all.show()
result_all.registerTempTable("result_all_temp")

In [9]:
# count the unique labels
label_count = spark.sql("select distinct label from result_all_temp")
label_count.count()

In [10]:
# prediction algorithm
# select label and the highest counted id classes for that label
# if unknown counts > other counts (class 1 and class 2), we prioritize and rank other counts first (class 1 and class 2)

result_classify = spark.sql("select distinct label, class, count(class) as count ,ROW_NUMBER() OVER (PARTITION BY label order by label) as Row from result_all_temp group by label, class order by label, count desc")
result_classify.registerTempTable("result_classify_temp")
step1 = result_classify.count()
result_classify_1 = spark.sql("select distinct label, class, count, ROW_NUMBER() OVER (PARTITION BY label order by label) as Row from result_classify_temp where class <> 'unknown' order by label, count desc")
result_classify_1.registerTempTable("result_classify_1_temp")
step1_1 = result_classify_1.count()
result_classify_1 = spark.sql("select distinct label, class as predicted_class from result_classify_1_temp where Row = '1'")
result_classify_1.registerTempTable("result_classify_1_temp")
step1_2 = result_classify_1.count()
result_classify = spark.sql("select distinct label, class as predicted_class from result_classify_temp where Row = '1'")
result_classify.registerTempTable("result_classify_temp")
step2 = result_classify.count()


In [11]:
# join back the ids based on community_label and write the predicted_class too
# some predict_class is unknown because there is no single id in this community that is being labelled. 

predicted_results = spark.sql("select t1.id, t1.class as true_class, case when t3.predicted_class is not null then t3.predicted_class else t2.predicted_class end as predicted_class, t1.label as community_label from result_all_temp t1 left join result_classify_temp t2 on t1.label = t2.label left join result_classify_1_temp t3 on t1.label = t3.label")
predicted_results.registerTempTable("predicted_results_temp")
display(predicted_results)

id,true_class,predicted_class,community_label
106720346,2,2,26
288156180,unknown,2,8589935171
288063176,unknown,2,8589935171
288730455,2,2,8589935171
3319918,unknown,unknown,25769804400
4577737,unknown,unknown,25769804400
30149876,unknown,2,34359738893
29197197,unknown,2,34359738893
23692743,unknown,2,34359738893
29137428,unknown,2,34359738893


In [12]:
# find the number of predictable results
predicted_results = spark.sql("select count(*) as predicted_results from predicted_results_temp where predicted_class <> 'unknown'")
predicted_results.show()

In [13]:
# find the number of unpredictable results
unpredictable_results = spark.sql("select count(*) as unpredictable_results from predicted_results_temp where predicted_class = 'unknown'")
unpredictable_results.show()

In [14]:
# get the details of the unpredictable results
unpredictable_results_details = spark.sql("select distinct * from predicted_results_temp where predicted_class = 'unknown'")
unpredictable_results_details.show()

In [15]:
# spot check the above table to see if the selected spot check label actually has no labelled ids in the original dataset

spot_check = spark.sql("select * from result_all_temp where label = '25769804400'")
display(spot_check)


id,class,label
3319918,unknown,25769804400
4577737,unknown,25769804400


In [16]:
accuracy_check = spark.sql("select true_class, predicted_class, case when true_class = predicted_class then 'correct' else 'incorrect' end as result from predicted_results_temp where true_class <> 'unknown' ")
accuracy_check.show()

In [17]:
accuracy_check.groupby(accuracy_check.true_class, accuracy_check.predicted_class).pivot("result").count().show()

In [18]:
accuracy = (3832+41582) / (3832+41582 + 713 + 437) 
accuracy

In [19]:
# motif - finding

results_motif = g_elicit.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
results_motif = results_motif.filter("A.id != C.id")
results_motif = results_motif.select("A", "C")
display(results_motif)

A,C
"List(163586078, 1)","List(163586130, 1)"
"List(73154554, 1)","List(73435939, 1)"
"List(257368355, 1)","List(165066411, 1)"
"List(194287965, 1)","List(355617472, 1)"
"List(272429368, 1)","List(299475654, 1)"
"List(355110068, 1)","List(270265736, 1)"
"List(94184658, 1)","List(98734847, 1)"
"List(272088740, 1)","List(141128581, 1)"
"List(272145560, 1)","List(299475637, 1)"
"List(299475639, 1)","List(277797691, 1)"


In [20]:
#PageRank: Identify important vertices in a graph
ranks_all = g_all.pageRank(maxIter = 5)
display(ranks_all.vertices.select("id","pagerank").orderBy(desc("pagerank")))

id,pagerank
99409352,138.03714775539876
340075634,108.61768412914914
43388675,103.71660470177844
2773281,101.66551993266972
225859042,98.00527121357078
91882349,94.83536432740785
30276715,92.9052736407056
191638565,92.07190979072938
121801433,87.03893597747518
73405590,84.39442956729077


In [21]:
degrees_all = g_all.degrees.sort(desc("degree"))
display( degrees_all )

id,degree
2984918,473
89273,289
43388675,284
68705820,247
30699343,241
96576418,239
225859042,212
279187194,211
234890810,199
196107869,188
