# Bitcoin graph analysis with pyspark

## Objective

In this notebook, I explored ways to analyze a large graph using PySpark's functionalities

This code runs on Databricks' free-tier DataFrame API. <br>
The dbc file located in the directory contains the source code of thie notebook. <br>
Details on how to set up the API can be found at : https://docs.google.com/document/d/e/2PACX-1vR9XytsxrXNpUVm1VBX3v1JwerfAMK97v_9Wy6GFNCucxd5Izgu7lmj9CFLIDDWBjN7NgRVm4pZJNj9/pub

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark import SparkContext
from pyspark.sql.types import *


### Data Preparation

In [5]:
# Create a Spark Session
spark = SparkSession.builder.getOrCreate()

# Read CSV file
#df = spark.read.csv("/FileStore/tables/examplegraph.csv", header = False, inferSchema = True)
df = spark.read.csv("/FileStore/tables/bitcoinotc.csv", header = False, inferSchema = True)

# count the number of rows
print("number of rows before drop_duplicates()" , df.select("_c0").count())

# Eliminate duplicate rows
df = df.dropDuplicates()

# counte the number of rows
print("number of rows after drop_duplicates()", df.select("_c0").count())

# change the column names
df = df.select(col('_c0').alias('source'),col('_c1').alias('target'),col('_c2').alias('weight'))
# show the table
df.show(10)


d
 ### Task 1: Filter the graph such that only nodes containing an edge weight >= 5 are preserved.

In [7]:
# Filter out the rows that are less than 5 weight values
df = df.filter(df.weight >= 5)


## Task : Analyze the graph to find the nodes with the highest weighted-in-degree, weighted-out-degree, and weighted-total-degree using DataFrame operations.

### Sub-Task 1: find node with highest weighted-in-degree, if two or more nodes have the same weighted-in-degree, report the one with the lowest node id

In [10]:
# Group by the target and order by 1. sum of the weights in a descreasinig manner 2. node in an increasing manner
df_in = df.groupBy("target").agg({"weight":"sum"}).withColumnRenamed("sum(weight)","weighted_in_degree").orderBy(desc("weighted_in_degree"),asc("target"))

# show the top 10 rows of the weighted-in-degree
df_in.show(10)

# show the top 1 row of the weighted-in-degree
df_in.show(1)

# Store the values
top_in_node, top_in_weight = df_in.collect()[0]


### Sub-Task 2: find node with highest weighted-out-degree, if two or more nodes have the same weighted-out-degree, report the one with the lowest node id

In [12]:
# Group by the target and order by 1. sum of the weights in a descreasinig manner 2. node in an increasing manner
df_out = df.groupBy("source").agg({"weight":"sum"}).withColumnRenamed("sum(weight)","weighted_out_degree").orderBy(desc("weighted_out_degree"), asc("source"))

# show the top 10 rows of the weighted-out-degree
df_out.show(10)

# show the top row of the weighted-out-degree
df_out.show(1)

# store the values
top_out_node, top_out_weight = df_out.collect()[0]

### Sub-Task 3: find node with highest weighted-total degree, if two or more nodes have the same weighted-total-degree, report the one with the lowest node id

In [14]:
# Join the above two dataframes 
df_join = df_in.join(df_out, df_in.target == df_out.source)

# Show the joined dataframe
df_join.show(5)

# Create a new column with the sum of the two weights
df_total = df_join.withColumn('weight_sum', df_join.weighted_in_degree + df_join.weighted_out_degree).orderBy(desc("weight_sum"), asc("target"))

# Show the highest weightest-total degree
df_total.show(1)

# Store the values
top_tot_node, top_tot_weight = df_total.select('target','weight_sum').collect()[0]

### Sub-Task 4: Store the values into a dataframe

In [16]:
"""
  Create a dataframe to store your results
  Schema: 3 columns, named: 'v', 'd', 'c' where:
  'v' : vertex id
  'd' : degree calculation (an integer value.  one row with highest weighted-in-degree, a row w/ highest weighted-out-degree, a row w/ highest weighted-total-degree )
  'c' : category of degree, containing one of three string values:
                                                  'i' : weighted-in-degree
                                                  'o' : weighted-out-degree                                                
                                                  't' : weighted-total-degree
  - Your output should contain exactly three rows.  
  - Your output should contain exactly the column order specified.
  - The order of rows does not matter.

  A correct output would be:

  v,d,c
  4,15,is
  2,20,o
  2,30,t

  whereas:
  - Node 2 has highest weighted-out-degree with a value of 20
  - Node 4 has highest weighted-in-degree with a value of 15
  - Node 2 has highest weighted-total-degree with a value of 30

"""

In [17]:
# Define a sparkcontext
if not sc:
  sc = SparkContext(appName = "Answer")

# Create a field
field = [StructField("v",IntegerType(), True), StructField("d", IntegerType(), True), StructField("c", StringType(), True)]

# Create a schema
schema = StructType(field)

# Create a dataframe using the above field
answer_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)

# Create the lists of answers
ans1 = [(top_in_node, top_in_weight, 'i')]
ans2 = [(top_out_node, top_out_weight, 'o')]
ans3 = [(top_tot_node, top_tot_weight,' t')]
answers = [ans1, ans2, ans3]

for ans in answers:
  rdd = sc.parallelize(ans)
  answer_df_temp = spark.createDataFrame(rdd, schema)
  answer_df = answer_df.union(answer_df_temp)
  


In [18]:
answer_df.show()

In [19]:
path = "/FileStore/tables/answer_bitcoin.csv"
answer_df.write.save(path, format = "csv", header = True)

In [20]:
spark.read.csv(path, header = True).show()