# Preprocessing

## Setup for Local User or Standalone Spark

In [None]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
# !tar xf spark-3.4.1-bin-hadoop3.tgz
# !pip install -q findspark
# 
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
# 
# import findspark
# findspark.init()
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.master("local[*]").getOrCreate()

## Setup for Spark Client via Kubernetes Cluster

In [None]:
import socket
from pyspark import SparkConf, SparkSession
from pyspark.sql.functions import struct
from neo4j import GraphDatabase

sparkConf = SparkConf()

host = socket.gethostname()
hostname = socket.gethostbyname(host)

print(f"using host {host} and hostname {hostname}")

sparkConf.setMaster("k8s://https://kubernetes.default.svc:443")
sparkConf.setAppName("Preprocessing")
sparkConf.set("spark.kubernetes.namespace", "tgnnapp")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "app-tgnnapp-spark")
sparkConf.set("spark.kubernetes.authenticate.executor.serviceAccountName", "app-tgnnapp-spark")
sparkConf.set("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
sparkConf.set("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")
sparkConf.set("spark.kubernetes.driver.pod.name", host)
sparkConf.set("spark.executor.instances", "2")
sparkConf.set("spark.kubernetes.container.image", "fotisofo/tgnn-spark:3.4.1-debian-11-r4--rev1")
sparkConf.set("spark.submit.deployMode", "client")
sparkConf.set("spark.driver.host", hostname)

sc = SparkSession.builder.config(conf=sparkConf).getOrCreate()

## Fetch Data

In [None]:
# ==========Fetch data with neo4j==========

# Define the Neo4j server connection details
uri = "bolt://app-tgnnapp-neo4j:7687"
user = "<username>"
password = "<password>"

# Define the query
query = "MATCH (u:User)-[r:INTERACTED]->(v:User) RETURN u.id AS user_u, v.id AS user_v, r.timestamp AS timestamp"

# Create driver and fetch the results
driver = GraphDatabase.driver(uri, auth=(user, password))
with driver.session() as session:
    result = session.run(query)
    data = [{'user_u': record['user_u'], 'user_v': record['user_v'], 'timestamp': record['timestamp']} for record in result]

# Create dataframe from fetched data
df = sc.createDataFrame(data).select(struct("*").alias("p"))

# ==========Or Fetch data directly from csv file==========
import pandas as pd
url = '<csv_file>'
df = sc.read.options(header='true', inferSchema='true', delimiter=',').csv(url)

df.printSchema()

## Data Cleaning

In [None]:
df = df.dropna(subset=["user_u", "user_v", "timestamp"])
print(f"Number of rows after removing missing values: {df.count()}")

In [None]:
df = df.dropDuplicates()
print(f"Number of rows after removing duplicate values: {df.count()}")

In [None]:
df = df.filter(df["timestamp"] >= 0)
print(f"Number of rows after removing erroneous timestamps: {df.count()}")

In [None]:
df = df.sort("timestamp")

## Group timestamps into N-hour timesteps

In [None]:
from pyspark.sql import functions as F

min_timestamp = df.agg(F.min("timestamp")).collect()[0][0]

# Group the timestamp column into 6-hour(or N-hour) timesteps
df_with_timesteps = df.withColumn("timestep", ((F.col("timestamp") - min_timestamp) / 21600).cast("integer"))

# Get the number of timesteps generated
max_timestep = df_with_timesteps.agg(F.max("timestep")).collect()[0][0]
print("Maximum value in timestep column:", max_timestep)

### Additional grouping

In [None]:
# For the college dataset we will group further timesteps with 1 interaction into larger chunks
from pyspark.sql import Window

# First, compute the interactions per timestep
interactions_per_timestep = df_with_timesteps.groupBy("timestep").agg(F.count("*").alias("num_interactions"))

# Compute the mean interactions
mean_interactions = interactions_per_timestep.agg(F.avg("num_interactions")).collect()[0][0]

# Join the interactions count back to the original DataFrame
df_with_timesteps = df_with_timesteps.join(interactions_per_timestep, on="timestep", how="left")

# Create the cumulative sum column and bucket IDs
windowSpec = Window.orderBy("timestep")
df_with_cumsum = df_with_timesteps.withColumn("cumulative_sum", F.sum("num_interactions").over(windowSpec))
df_with_buckets = df_with_cumsum.withColumn("bucket", (F.col("cumulative_sum") / mean_interactions).cast("int"))

# Define the window specification
bucketWindowSpec = Window.orderBy("bucket").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Create the 'bucket_id' column
df_with_buckets = df_with_buckets.withColumn("bucket_id", F.dense_rank().over(bucketWindowSpec))
print(df_with_buckets.select('bucket').distinct().count())

# Filter down unwanted columns
df_with_buckets = df_with_buckets[['user_u', 'user_v', 'timestamp',  'timestep', 'bucket_id']]

## Create Affinity Matrix Visualisation

In [None]:
from pyspark.sql.functions import col, sum
import matplotlib.pyplot as plt
import seaborn as sns

# Get number of bucket_ids into a list
timesteps_list = df_with_buckets.select("bucket_id").distinct().rdd.flatMap(lambda x: x).collect()

# Create an empty list to hold dataframes for each timestep
matrices = []

for ts in range(2,6): # get 4 matrices
    subset = df_with_buckets.filter(col("bucket_id") == ts)

    # Pivot dataframe and sum num_interactions to populate matrix
    pivot_df = subset.groupBy("user_u").pivot("user_v").agg(sum("num_interactions")).fillna(0)

    matrices.append(pivot_df)

for i, matrix in enumerate(matrices[:5]):
    # Convert to Pandas DataFrame
    pd_matrix = matrix.toPandas().set_index('user_u')

    # Plotting
    plt.figure(figsize=(10, 8))
    sns.heatmap(pd_matrix, cmap='rocket', linewidths=0.5)
    plt.title(f"Affinity Matrix for Timestep {i+2}")
    plt.show()

## Affinity Matrix Function

In [None]:
from pyspark.sql import DataFrame

# Create affinity matrix function
def create_affinity_matrix(bucket_df: DataFrame) -> DataFrame:
    # Count interactions per pair
    interactions_per_pair = bucket_df.groupBy('user_u', 'user_v').agg(F.count("*").alias('interactions_count'))

    # Calculate the total interactions for proportional scaling
    total_interactions = interactions_per_pair.groupBy().sum("interactions_count").collect()[0][0]

    # Apply proportional scaling
    interactions_scaled = interactions_per_pair.withColumn("scaled_interactions", F.col("interactions_count") / total_interactions)

    return interactions_scaled

bucket_ids = df_with_buckets.select("bucket_id").distinct().rdd.flatMap(lambda x: x).collect()

## Total Modulation Function

In [None]:
def modularity(graph_df, cluster_df):
    """
    Compute modularity of a directed graph after clustering.

    graph_df: DataFrame with columns ['src', 'dst', 'weight']
    cluster_df: DataFrame with columns ['id', 'cluster']

    Returns: Modularity score
    """
    m = graph_df.agg({"weight": "sum"}).collect()[0][0]

    graph_df = graph_df.withColumn("src", col("src").cast("long"))
    graph_df = graph_df.withColumn("dst", col("dst").cast("long"))

    # Joining graph with cluster assignments
    src_cluster_df = cluster_df.withColumnRenamed('id', 'src_id').withColumnRenamed('cluster', 'src_cluster')
    dst_cluster_df = cluster_df.withColumnRenamed('id', 'dst_id').withColumnRenamed('cluster', 'dst_cluster')

    df = graph_df.join(src_cluster_df, col('src') == col('src_id'), 'inner')
    df = df.join(dst_cluster_df, col('dst') == col('dst_id'), 'inner')

    # Computing delta function for pairs in the same cluster
    df = df.withColumn('delta', (col('src_cluster') == col('dst_cluster')).cast('int'))

    # Computing the out-degree and in-degree products
    out_degree = (graph_df.groupBy('src').agg({"weight": "sum"}).withColumnRenamed('src', 'node').withColumnRenamed('sum(weight)', 'out_weight'))
    in_degree = (graph_df.groupBy('dst').agg({"weight": "sum"}).withColumnRenamed('dst', 'node').withColumnRenamed('sum(weight)', 'in_weight'))

   # Outer join to keep all nodes
    degrees = out_degree.join(in_degree, 'node', 'outer')

    # Handle potential null values
    degrees = degrees.fillna(0, subset=['out_weight', 'in_weight'])
    degrees = degrees.withColumn('degree_product', col('out_weight') * col('in_weight'))

    df = df.join(degrees.withColumnRenamed('node', 'src'), 'src', 'inner')

    # Computing the modularity components for each edge
    df = df.withColumn('modularity', (col('weight') - col('degree_product') / m) * col('delta'))

    # Summing over all edges to get the total modularity
    modularity_sum = df.agg({"modularity": "sum"}).collect()[0][0]
    total_modularity = modularity_sum / m if modularity_sum is not None else 0

    return total_modularity

## Generate Features, Clustering and Modularity Calculation

In [None]:
from pyspark.ml.clustering import PowerIterationClustering
from pyspark.sql import Row

cumulative_data = None
max_cumulative_buckets = 3
interactions_list = []
modularities = []

bucket_ids_iter = sorted(bucket_ids)

for bucket in bucket_ids_iter:
    # Create a cumulative df of max_cumulative_buckets
    lower_bound = bucket - max_cumulative_buckets + 1
    cumulative_data = df_with_buckets.filter((F.col("bucket_id") >= lower_bound) & (F.col("bucket_id") <= bucket))
    
    print(cumulative_data.select("bucket_id").distinct().collect())
    
    # Create the affinity matrix
    affinity_df = create_affinity_matrix(cumulative_data)
    adjusted_df = affinity_df.withColumnRenamed("user_u", "src").withColumnRenamed("user_v", "dst").withColumnRenamed("scaled_interactions", "weight")
    
    # Apply Power Iteration Clustering
    pic = PowerIterationClustering(k=3, maxIter=20, initMode="degree", weightCol="weight")
    
    assignments  = pic.assignClusters(adjusted_df)
    
    # Get modularity of the clustered data for the first 100 samples
    if (bucket <= 100):
        # Get modularity for 
        current_modularity = modularity(adjusted_df, assignments)  
        modularities.append(current_modularity)
    
    interactions_with_clusters = adjusted_df.join(assignments.withColumnRenamed("id", "src").withColumnRenamed("cluster", "src_cluster"), on="src") \
                                        .join(assignments.withColumnRenamed("id", "dst").withColumnRenamed("cluster", "dst_cluster"), on="dst") \
                                        .select("src", "src_cluster", "dst", "dst_cluster")
    
    
    # Convert the interactions with clusters to a list and append to the master list
    current_interactions = interactions_with_clusters.rdd.map(lambda row: Row(src_id=row.src, src_cluster=row.src_cluster, dst_id=row.dst, dst_cluster=row.dst_cluster, bucket_id=bucket)).collect()
    interactions_list.extend(current_interactions)


# Convert the list to DataFrame
interactions_df = sc.createDataFrame(interactions_list)
interactions_df.show()
# Save DataFrame to CSV
# interactions_df.write.csv("/data/CollegeMsgClusters", header=True) # multiple nodes
interactions_df.coalesce(1).write.csv("/data/CollegeMsgClustersFull", header=True) # single node

## Visualise Modularity

In [None]:
# Plotting
mod_list = sorted(bucket_ids)[:100]
plt.plot(mod_list, modularities, marker='o')
plt.xlabel('Bucket ID')
plt.ylabel('Modularity')
plt.title('Modularity over each bucket')
plt.grid(True)
plt.show()