<a href="https://colab.research.google.com/github/BarGinger/DIS-Assignment/blob/main/Src/dis_notebook_08_11_24.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 1st cell - Install requirements
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
!pip install graphframes
!pip install sparkmeasure==0.24
!pip install matplotlib seaborn
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u422-b05-1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [2]:
# 2nd cell - Import libraries
import pyspark
from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.sql.functions import (
    col,
    udf,
    row_number,
    countDistinct,
    collect_list,
    struct,
    count,
    sum,
    avg,
    expr,
    percentile_approx,
    max as spark_max,
    explode,
    round,
    rand,
    monotonically_increasing_id,
    array,
    lit,
    broadcast,
    lag,
    pandas_udf,
    PandasUDFType,
    least,
    greatest
)
import pyspark.sql.functions as F
from sparkmeasure import StageMetrics
from pyspark.sql.types import (
    StringType, IntegerType, BinaryType, DoubleType,
    ArrayType, StructType, StructField, LongType, TimestampType
)
from pyspark.sql import Window
from datetime import datetime, timedelta
from graphframes import GraphFrame
from scipy.sparse import csr_matrix, vstack, hstack
import numpy as np
import pandas as pd
import pickle
import base64
from sparkmeasure import StageMetrics # for resources monitoring
from functools import wraps
import time
import matplotlib.pyplot as plt
import seaborn as sns
import re
import random
from operator import truediv
from google.colab import files
from itertools import combinations
from scipy.sparse.linalg import inv
from scipy.sparse import identity
from sklearn.metrics.pairwise import cosine_similarity
import shutil

In [3]:
# 3rd cell - Initialize Spark session
spark = SparkSession.builder \
  .appName("PhoneCallsCommunityDetection") \
  .master("local[*]") \
  .config("spark.jars.packages", "ch.cern.sparkmeasure:spark-measure_2.12:0.24,graphframes:graphframes:0.8.2-spark3.1-s_2.12") \
  .config("spark.executor.memory", "20G") \
  .config("spark.driver.memory", "50G") \
  .config("spark.executor.memoryOverhead", "1G") \
  .config("spark.default.parallelism", "100") \
  .config("spark.sql.shuffle.partitions", "10") \
  .config("spark.driver.maxResultSize", "2G") \
  .getOrCreate()

# Initialize StageMetrics
stagemetrics = StageMetrics(spark)

# Optional: Set logging level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

# Set a checkpoint directory for Spark
spark.sparkContext.setCheckpointDir("/tmp/spark-checkpoints")

In [None]:
# 4th cell - Generate datasets - PLEASE only run this if datasets folder is empty / does not exists
def generate_communities(spark, num_communities, community_size_range, density=0.3, extra_factor=1.5):
    """
    Generate isolated communities with controlled sizes and connections.
    Ensures enough connections for the sample count by using an extra factor.
    """
    communities = []
    for community_id in range(num_communities):
        size = random.randint(community_size_range[0], community_size_range[1])
        base_id = community_id * 1000
        community_clients = [(community_id, base_id + i, base_id + j)
                             for i in range(size) for j in range(i + 1, size) if random.random() < density * extra_factor]
        communities.extend(community_clients)

    return spark.createDataFrame(communities, ["community_id", "client1", "client2"])

def generate_call_times(communities_df, calls_per_connection_range, duration_range, base_time, num_samples):
    """
    Generate call start and end times for each client connection, ensuring total number of samples matches `num_samples`.
    """
    calls_df = communities_df.withColumn(
        "num_calls",
        F.expr(f"floor(rand() * ({calls_per_connection_range[1]} - {calls_per_connection_range[0]} + 1)) + {calls_per_connection_range[0]}")
    ).withColumn(
        "call_id", F.monotonically_increasing_id()
    ).withColumn(
        "calls", F.expr("sequence(1, num_calls)")
    ).select("client1", "client2", "call_id", F.explode("calls").alias("call_num"))

    def generate_times():
        start_time = base_time + timedelta(minutes=random.randint(0, 1440))
        duration = random.randint(duration_range[0], duration_range[1])
        end_time = start_time + timedelta(minutes=duration)
        return start_time.strftime('%y%m%d%H%M'), end_time.strftime('%y%m%d%H%M')

    time_udf = F.udf(lambda: generate_times(), "struct<Start_Time:string, End_Time:string>")
    calls_df = calls_df.withColumn("call_times", time_udf())

    # Ensure consistent schema for the final DataFrame
    calls_df = calls_df.select(
        "client1", "client2", calls_df["call_times.Start_Time"].alias("Start_Time"), calls_df["call_times.End_Time"].alias("End_Time")
    )

    # Limit to the specified number of samples
    final_calls_df = calls_df.limit(num_samples)

    # Retry generation if the sample count isn't met
    while final_calls_df.count() < num_samples:
        additional_df = calls_df.limit(num_samples - final_calls_df.count()).select("client1", "client2", "Start_Time", "End_Time")
        final_calls_df = final_calls_df.union(additional_df).limit(num_samples)

    return final_calls_df

# Function to delete all generated datasets
def delete_generated_datasets():
    folder_path = "/content/datasets/"
    deleted_files = []

    if os.path.exists(folder_path):
        # Loop through each item in the folder
        for item in os.listdir(folder_path):
            item_path = os.path.join(folder_path, item)
            # Check if it's a directory and remove it with shutil.rmtree
            if os.path.isdir(item_path):
                shutil.rmtree(item_path)
            else:
                os.remove(item_path)
            deleted_files.append(item)

        # Print the results
        if deleted_files:
            print("Deleted the following items:")
            for item in deleted_files:
                print(item)
        else:
            print("No files found in the folder to delete.")
    else:
        print("The folder does not exist.")

def save_dataset(dataset, filename):
    """
    Save the generated dataset to a temporary directory and then move to the final directory.

    Parameters:
        dataset (DataFrame): The DataFrame to save, containing generated call data.
        filename (str): Base name for the dataset file.
    """
    # Define the directories
    final_dir = "/content/datasets"
    temp_dir = f"{final_dir}/{filename}_temp"
    final_path = os.path.join(final_dir, filename)

    # Create directories if they don't exist
    os.makedirs(final_dir, exist_ok=True)

    # Write to the temporary directory
    dataset.write.mode("overwrite").option("header", "true").csv(temp_dir)

    # Move the content to the final directory
    if os.path.exists(final_path):
        shutil.rmtree(final_path)
    shutil.move(temp_dir, final_path)

    # Clean up by removing the temporary directory
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)

    print(f"Dataset saved as {final_path}")
    # files.download(final_path)
    return final_path


def export_datasets_to_computer(folder_path="/content/datasets"):
    """
    Compresses and exports each dataset in the specified folder to download to the local computer.

    Parameters:
        folder_path (str): The path to the folder containing datasets.
    """
    # Check if the folder exists
    if not os.path.exists(folder_path):
        print(f"The folder '{folder_path}' does not exist.")
        return

    # Iterate over each file/directory in the folder
    for item in os.listdir(folder_path):
        item_path = os.path.join(folder_path, item)

        # If it's a directory (dataset in CSV format is usually saved as a directory)
        if os.path.isdir(item_path):
            # Create a zip file of the dataset directory
            zip_filename = f"{item}.zip"
            shutil.make_archive(item_path, 'zip', item_path)
            print(f"Compressed '{item}' as '{zip_filename}'.")

            # Download the zip file
            files.download(f"{item_path}.zip")
        else:
            # Download individual files if they are directly in the folder
            files.download(item_path)

    print("All datasets have been exported to your computer.")


if __name__ == "__main__":
    delete_generated_datasets()
    # spark = create_spark_session()

    # Define parameter configurations with num_samples
    parameter_sets = [
        {"num_communities": 5, "community_size_range": (3, 5), "calls_per_connection_range": (1, 2), "duration_range": (30, 120), "density": 0.5, "num_samples": 50},
        {"num_communities": 10, "community_size_range": (5, 7), "calls_per_connection_range": (1, 3), "duration_range": (30, 180), "density": 0.4, "num_samples": 100},
        {"num_communities": 100, "community_size_range": (5, 100), "calls_per_connection_range": (1, 5), "duration_range": (30, 380), "density": 0.4, "num_samples": 5000},
        {"num_communities": 5000, "community_size_range": (5, 100), "calls_per_connection_range": (1, 5), "duration_range": (30, 380), "density": 0.4, "num_samples": 50000}
    ]

    base_time = datetime(2024, 1, 1)

    for i, params in enumerate(parameter_sets):
        print(f"\nGenerating dataset for configuration {i + 1}: {params}")

        # Generate communities and call times
        communities_df = generate_communities(
            spark,
            num_communities=params["num_communities"],
            community_size_range=params["community_size_range"],
            density=params["density"],
            extra_factor=2  # Generate more potential connections initially
        )
        calls_df = generate_call_times(
            communities_df,
            calls_per_connection_range=params["calls_per_connection_range"],
            duration_range=params["duration_range"],
            base_time=base_time,
            num_samples=params["num_samples"]
        )

        # Save dataset and print information
        filename = f"dataset_config_{i + 1}"
        final_path = save_dataset(calls_df, filename)
        parameter_sets[i]['dataset_name'] = filename
        parameter_sets[i]['csv_filename'] = final_path

    df_datasets = pd.DataFrame(parameter_sets)
    dataset_metadata_file_path = "dataset_metadata.csv"
    df_datasets.to_csv(dataset_metadata_file_path, index=False)
    files.download(dataset_metadata_file_path)
    # Run the function to export all datasets
    export_datasets_to_computer("/content/datasets")

In [4]:
# 5th cell - Initialize resource monitoring

# Monitor CPU, Memory and running time
def track_stage(stage_name):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            print(f"Starting {stage_name}")
            stagemetrics.begin()  # Begin collecting metrics for this stage

            result = func(*args, **kwargs)  # Run the actual function

            stagemetrics.end()  # Stop collecting metrics for this stage

            time.sleep(15)

            # Generate metrics DataFrame
            print(f"Completed {stage_name}\n")
            df_metrics_all = stagemetrics.create_stagemetrics_DF()
            df_metrics_agg = stagemetrics.aggregate_stagemetrics_DF()
            # Add stage_name column and join metrics and memory DataFrames
            df_metrics_agg = df_metrics_agg.withColumn("stage_name", pyspark.sql.functions.lit(stage_name))
            df_metrics_agg = df_metrics_agg.withColumn("dataset", pyspark.sql.functions.lit(dataset_file_path))
            df_metrics_agg.show(truncate=False)

            # Set write mode based on the stage
            if "Stage 1" in stage_name and clear_csv:
                write_mode = "overwrite"
                header = "true"
            else:
                write_mode = "append"
                header = "true"

            # Write metrics to CSV with appropriate mode and header settings
            df_metrics_agg.coalesce(1).write \
                .mode(write_mode) \
                .option("header", header) \
                .csv(f"{dataset_name}_stage_metrics")

            return result
        return wrapper
    return decorator

In [14]:
# 6th cell - All the utilities functions for the project

# Convert YYMMDDHHMM to a proper datetime object
def calculate_duration_minutes(start_time, end_time):
  """
  Calculate the duration between two times in minutes.

  Parameters:
  -----------
  start_time : str
      The start time in HH:MM:SS format.
  end_time : str
      The end time in HH:MM:SS format.

  Returns:
  --------
  duration_minutes : float
      The duration between start_time and end_time in minutes.
  """
  start_datetime = convert_to_datetime(start_time)
  end_datetime = convert_to_datetime(end_time)
  duration = end_datetime - start_datetime
  duration_minutes = duration.total_seconds() / 60
  return duration_minutes

def convert_to_datetime(time_str):
  """
  Convert a time string in '%y%m%d%H%M' format to a datetime object.

  Parameters:
  -----------
  time_str : str
      The time string in '%y%m%d%H%M' format.

  Returns:
  --------
  datetime_obj : datetime.datetime
      The datetime object representing the given time string.
  """
  # Use datetime.datetime.strptime to parse the time string
  # This is the correct way to use strptime, avoiding the AttributeError
  return datetime.strptime(str(time_str), '%y%m%d%H%M')

# Define UDF for calculating duration in DDHHMM format
def calculate_duration_string(start_time, end_time):
    start_dt = convert_to_datetime(start_time)
    end_dt = convert_to_datetime(end_time)
    duration = end_dt - start_dt

    days = duration.days
    hours, remainder = divmod(duration.seconds, 3600)
    minutes = remainder // 60
    return f'{days:02d}{hours:02d}{minutes:02d}'


'''Decorator and Function Definition:
The @pandas_udf decorator marks this function as a Pandas UDF (User Defined Function) that will be applied on grouped data.
GROUPED_MAP tells Spark that the function will receive a DataFrame for each group (grouped by community_id).
The schema defines the expected output structure of the function, which is a DataFrame with community_id
and a binary field containing the serialized matrix.
The function converts the connections (edges) between clients into a CSR matrix and serializes it for storage.'''

# Define the schema for the Pandas UDF output
schema = StructType([
    StructField("community_id", IntegerType(), True),
    StructField("csr_matrix", BinaryType(), True)
])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def create_csr_matrix_from_edges(members_df, weight):
    """
    Creates a serialized CSR matrix from a Spark DataFrame for each community.

    Args:
        members_df: Spark DataFrame with 'community_id' and 'members' columns.
        weight: boolean if to take the weights into account or not

    Returns:
        DataFrame with 'community_id' and a serialized CSR matrix as binary data.
    """

    # Extract the community ID (assuming it's consistent within the group)
    community_id = members_df['community_id'].iloc[0]
    '''Since each members_df contains data for a single community (due to groupBy operation),
    the function retrieves the community_id from the first row.
    This ID will be included in the output so that each serialized CSR matrix can be linked back
    to its respective community.'''
    # Explode the members array to get each connection in separate rows
    exploded_df = members_df.explode("members").dropna().reset_index(drop=True)
    exploded_df = pd.DataFrame({
        'Client1': exploded_df['members'].apply(lambda x: x['Client1']),
        'Client2': exploded_df['members'].apply(lambda x: x['Client2']),
        'total_duration_minutes': exploded_df['members'].apply(lambda x: x['total_duration_minutes'])
    })
    '''Flattening and Extracting Connection Data:
    The members_df contains a column with a list of connections (pairs of clients and call durations).
    The function uses explode to convert this list into individual rows, making it easier to work with each connection.
    It then creates a new DataFrame, exploded_df, with separate columns for Client1, Client2, and duration_minutes
    extracted from the connection data.
    This simplifies further processing by ensuring each row represents a single call between two clients.'''
    # Get unique clients and create a mapping to indices
    unique_clients = sorted(pd.concat([exploded_df['Client1'], exploded_df['Client2']]).unique())
    client_to_index = {client: i for i, client in enumerate(unique_clients)}
    num_clients = len(unique_clients)

    # Extract data for CSR matrix
    rows = exploded_df['Client1'].map(client_to_index).values
    cols = exploded_df['Client2'].map(client_to_index).values
    if use_weights:
      data = exploded_df['total_duration_minutes'].values #if weight else [1] * len(rows)
    else:
      data = [1] * len(rows)

    # Create CSR matrix
    csr = csr_matrix((data, (rows, cols)), shape=(num_clients, num_clients))
    '''Serializing the CSR Matrix: The function uses Python’s pickle module to serialize the CSR matrix.
    This converts the matrix into a binary format, allowing it to be stored or transferred efficiently.
    Serialization is necessary because Spark DataFrames cannot directly store complex Python objects like CSR matrices.'''
    # Serialize CSR matrix to binary format
    serialized_csr = pickle.dumps(csr)

    # Return as DataFrame
    return pd.DataFrame({"community_id": [community_id], "csr_matrix": [serialized_csr]})

# prompt: print csr_matrix_result pretty
def pretty_print_csr_matrix(csr_matrix_result):
  """Prints a CSR matrix in a readable format."""

  rows, cols = csr_matrix_result.nonzero()
  data = csr_matrix_result.data

  df = pd.DataFrame({
      'Row': rows,
      'Col': cols,
      'Value': data
  })

  print(df)

# Padding and calculating DeltaCon similarity
def pad_csr_matrix(csr, max_shape):
    current_rows, current_cols = csr.shape
    max_rows, max_cols = max_shape
    if current_rows < max_rows:
        additional_rows = csr_matrix((max_rows - current_rows, current_cols))
        csr = vstack([csr, additional_rows])
    if current_cols < max_cols:
        additional_cols = csr_matrix((csr.shape[0], max_cols - current_cols))
        csr = hstack([csr, additional_cols])
    return csr

# Pad CSR matrices and calculate DeltaCon similarity using Spark DataFrame operations
def process_csr_matrices(df, max_size):
    def pad_and_calculate(row):
        csr_matrix_padded = pad_csr_matrix(pickle.loads(row['csr_matrix']), max_size)
        serialized_csr = pickle.dumps(csr_matrix_padded)
        return (row['community_id'], serialized_csr)

    return df.rdd.map(pad_and_calculate).toDF(["community_id", "csr_matrix"])

def normalize_matrix(matrix):
    """
    Normalize the matrix values to the range [0, 1].

    Parameters:
    matrix : csr_matrix
        Sparse matrix to normalize.

    Returns:
    csr_matrix
        Normalized sparse matrix.
    """
    data = matrix.data
    if len(data) == 0:  # Handle empty matrices
        return matrix
    min_val = np.min(data)
    max_val = np.max(data)
    normalized_data = (data - min_val) / (max_val - min_val) if max_val > min_val else data
    return matrix.__class__((normalized_data, matrix.indices, matrix.indptr), shape=matrix.shape)

def frobenius_norm(csr_1, csr_2):
    """
    Compute Frobenius norm between two sparse matrices.

    Parameters:
    csr_1, csr_2 : csr_matrix
        Sparse adjacency matrices of the graphs.

    Returns:
    float
        Frobenius norm distance between the graphs.
    """
    # csr_1 = log_transform_matrix(csr_1)
    # csr_2 = log_transform_matrix(csr_2)
    csr_1 = normalize_matrix(csr_1)
    csr_2 = normalize_matrix(csr_2)
    assert csr_1.shape == csr_2.shape, "Adjacency matrices must have the same dimensions."
    diff = csr_1 - csr_2
    return np.sqrt((diff.power(2)).sum())

def frobenius_sim(csr_1, csr_2):
    """
    Adds a similarity column to the DataFrame based on Frobenius distance.

    Parameters:
    df (DataFrame): Input DataFrame containing 'frobenius_distance' column.

    Returns:
    DataFrame: A DataFrame with an additional 'similarity' column.
    """
    dist=frobenius_norm(csr_1, csr_2)
    return 1 / (1 + dist)


def deltacon_similarity(csr_1, csr_2, epsilon=0.5):
    # Ensure both matrices are of the same size
    assert csr_1.shape == csr_2.shape, "Adjacency matrices must be of the same size for comparison."
    I = identity(csr_1.shape[0])
    D1 = csr_1.sum(axis=1).A.flatten()
    D1 = csr_matrix((D1, (range(csr_1.shape[0]), range(csr_1.shape[0]))))
    D2 = csr_2.sum(axis=1).A.flatten()
    D2 = csr_matrix((D2, (range(csr_2.shape[0]), range(csr_2.shape[0]))))

    S1 = inv(I + epsilon**2 * D1 - epsilon * csr_1)
    S2 = inv(I + epsilon**2 * D2 - epsilon * csr_2)
    frobenius_norm = np.sqrt(((S1 - S2).power(2)).sum())
    return 1 / (1 + frobenius_norm)

# Define function to calculate Frobenius similarity
def calculate_frobenius_similarity(grouped_df):
    community_id_1 = grouped_df.iloc[0]['community_id']
    community_id_2 = grouped_df.iloc[0]['community_id_2']
    csr_1 = pickle.loads(grouped_df.iloc[0]['csr_matrix'])
    csr_2 = pickle.loads(grouped_df.iloc[0]['csr_matrix_2'])
    similarity_score_f = frobenius_sim(csr_1, csr_2)
    return pd.DataFrame([{
        "community_id_1": community_id_1,
        "community_id_2": community_id_2,
        "frobenius_similarity": similarity_score_f
    }])

# Define function to calculate DeltaCon similarity
def calculate_deltacon_similarity(grouped_df):
    community_id_1 = grouped_df.iloc[0]['community_id']
    community_id_2 = grouped_df.iloc[0]['community_id_2']
    csr_1 = pickle.loads(grouped_df.iloc[0]['csr_matrix'])
    csr_2 = pickle.loads(grouped_df.iloc[0]['csr_matrix_2'])
    similarity_score_d = deltacon_similarity(csr_1, csr_2)
    return pd.DataFrame([{
        "community_id_1": community_id_1,
        "community_id_2": community_id_2,
        "deltacon": similarity_score_d
    }])

# Comparison function for structural and weight-based similarities
def cosine_sim(csr_1, csr_2):
    # Compute cosine similarity
    cosine_sim = cosine_similarity(csr_1, csr_2)
    return cosine_sim


def calculate_similarities(subgroup_community_members):
  """
  Comparing CSR matrices to detect similarity
  """

  max_size = result.rdd.map(lambda row: pickle.loads(row['csr_matrix']).shape).reduce(lambda x, y: (max(x[0], y[0]), max(x[1], y[1])))

  padded_result_true = process_csr_matrices(subgroup_community_members, max_size)
  padded_result_false = process_csr_matrices(subgroup_community_members, max_size)

  # Step 1: Compute Frobenius Similarity (using padded_result_true)

  # Rename columns from df2 to remove ambiguity for Frobenius similarity calculation
  padded_result_true_renamed = padded_result_true.select(
      col("community_id").alias("community_id_2"),
      col("csr_matrix").alias("csr_matrix_2")
  )

  # Cross join to compare every community for Frobenius similarity
  cross_joined_df_frobenius = padded_result_true.alias("df1").crossJoin(padded_result_true_renamed.alias("df2")) \
      .filter(col("df1.community_id") < col("df2.community_id_2"))

  # Define schema for Frobenius similarity output
  frobenius_similarity_schema = StructType([
      StructField("community_id_1", IntegerType(), True),
      StructField("community_id_2", IntegerType(), True),
      StructField("frobenius_similarity", DoubleType(), True)
  ])

  # Apply Frobenius similarity calculation
  frobenius_similarity_df = cross_joined_df_frobenius.select(
      "df1.community_id", "df2.community_id_2", "df1.csr_matrix", "df2.csr_matrix_2"
  ).groupBy("community_id", "community_id_2") \
      .applyInPandas(calculate_frobenius_similarity, schema=frobenius_similarity_schema)

  #Step 2: Compute DeltaCon Similarity (using padded_result_false)

  # Rename columns from df2 to remove ambiguity for DeltaCon similarity calculation
  padded_result_false_renamed = padded_result_false.select(
      col("community_id").alias("community_id_2"),
      col("csr_matrix").alias("csr_matrix_2")
  )

  # Cross join to compare every community for DeltaCon similarity
  cross_joined_df_deltacon = padded_result_false.alias("df1").crossJoin(padded_result_false_renamed.alias("df2")) \
      .filter(col("df1.community_id") < col("df2.community_id_2"))

  # Define schema for DeltaCon similarity output
  deltacon_similarity_schema = StructType([
      StructField("community_id_1", IntegerType(), True),
      StructField("community_id_2", IntegerType(), True),
      StructField("deltacon", DoubleType(), True)
  ])

  # Apply DeltaCon similarity calculation
  deltacon_similarity_df = cross_joined_df_deltacon.select(
      "df1.community_id", "df2.community_id_2", "df1.csr_matrix", "df2.csr_matrix_2"
  ).groupBy("community_id", "community_id_2") \
      .applyInPandas(calculate_deltacon_similarity, schema=deltacon_similarity_schema)

  # Step 3: Join Results and Calculate Final Similarity Score
  # Join the Frobenius and DeltaCon similarity DataFrames
  combined_similarity_df = frobenius_similarity_df.join(
      deltacon_similarity_df,
      on=["community_id_1", "community_id_2"],
      how="inner"
  )

  # Calculate the final similarity score as an average of Frobenius and DeltaCon similarities
  final_similarity_df = combined_similarity_df.withColumn(
      "final_similarity",
      expr("0.5 * frobenius_similarity + 0.5 * deltacon")
  )

  # Show the final results
  final_similarity_df.show(truncate=False)
  return final_similarity_df

  @pandas_udf(schema_similarity, PandasUDFType.GROUPED_MAP)
  def calculate_similarity(df):
      csr_1 = pickle.loads(df['csr_matrix_1'].iloc[0])
      csr_2 = pickle.loads(df['csr_matrix_2'].iloc[0])
      similarity = deltacon_similarity(csr_1, csr_2)
      return pd.DataFrame({"community_id_1": [df['community_id_1'].iloc[0]], "community_id_2": [df['community_id_2'].iloc[0]], "similarity": [similarity]})

  cross_joined = cross_joined.select(
      col("df1.community_id").alias("community_id_1"),
      col("df2.community_id").alias("community_id_2"),
      col("df1.csr_matrix").alias("csr_matrix_1"),
      col("df2.csr_matrix").alias("csr_matrix_2")
  )

  similarities = cross_joined.groupBy("community_id_1", "community_id_2").apply(calculate_similarity)

  similarities.show(truncate=False)


def create_adaptive_buckets(df, columns, min_size=2):
    """
    Create adaptive buckets for specified columns based on natural grouping of close values.

    Parameters:
        df (DataFrame): The input DataFrame with community statistics.
        columns (list): List of column names to bucketize.
        min_size (int): Minimum number of communities required in each bucket.

    Returns:
        DataFrame: The DataFrame with additional columns for each adaptive bucket.
    """
    for column in columns:
        bucket_col = f"{column}_bucket"

        # Calculate approximate quantiles for balanced bucketing
        quantiles = [i / min_size for i in range(min_size + 1)]
        boundaries = df.approxQuantile(column, quantiles, 0.05)

        # Materialize boundaries into discrete bucket assignments
        bucket_expr = F.when(F.col(column) <= boundaries[1], 0)
        for i in range(1, len(boundaries) - 1):
            bucket_expr = bucket_expr.when((F.col(column) > boundaries[i]) & (F.col(column) <= boundaries[i + 1]), i)

        # Assign buckets to each row based on column values
        df = df.withColumn(bucket_col, bucket_expr)

        # Debug: Verify if the bucket column was created successfully
        print(f"Verifying creation of {bucket_col} column")
        df.select(column, bucket_col).show(truncate=False)

    return df

# Step 2: Apply cross join within each bucket and calculate similarities
def calculate_similarity_within_buckets(df, columns):
    """
    Calculate similarities between communities within the same buckets for specified columns.

    Parameters:
        df (DataFrame): The input DataFrame with bucket columns for each specified attribute.
        columns (list): List of column names for which buckets have been created.

    Returns:
        DataFrame: The DataFrame with similarity calculations for each pair within the same bucket.
    """
    df_groups = None

    # Register UDFs for similarity calculations
    compare_structural_similarity_udf = F.udf(lambda csr_1, csr_2: compare_weighted_structural_similarity(csr_1, csr_2), DoubleType())
    compare_weighted_similarity_udf = F.udf(lambda csr_1, csr_2: compare_weighted_structural_similarity(csr_1, csr_2), DoubleType())

    # Create cross join within each bucket combination
    bucket_columns = [f"{col}_bucket" for col in columns]

    # Debug: Check bucket columns in DataFrame
    print("Columns in DataFrame before cross join:")
    print(df.columns)

    for bucket_combination in df.select(bucket_columns).distinct().collect():
        # Filter the DataFrame based on the current bucket combination
        filter_condition = F.lit(True)
        for i, bucket_col in enumerate(bucket_columns):
            filter_condition &= (F.col(bucket_col) == getattr(bucket_combination, bucket_col))
        bucket_df = df.filter(filter_condition)

        # Only proceed if there are at least two communities in the bucket
        if bucket_df.count() >= 2:
            print("This is the bucket df:")
            bucket_df.show(truncate=False)

            # Perform a cross join within this bucket
            cross_joined = bucket_df.alias("a").crossJoin(bucket_df.alias("b")) \
                .filter(F.col("a.community_id") < F.col("b.community_id")) \
                .select(
                    F.col("a.community_id").alias("community_id_1"),
                    F.col("b.community_id").alias("community_id_2"),
                    compare_structural_similarity_udf(F.col("a.csr_matrix"), F.col("b.csr_matrix")).alias("unweighted_similarity_score"),
                    compare_weighted_similarity_udf(F.col("a.csr_matrix"), F.col("b.csr_matrix")).alias("weighted_similarity_score")
                )

            # Append to df_groups
            if df_groups is None:
                df_groups = cross_joined
            else:
                df_groups = df_groups.union(cross_joined)

    return df_groups

In [17]:
# 7th cell - driver of the application run all the functions for a given dataset

@track_stage("Stage 1: Reading the calls dataset")
def read_csv_to_dataframe(file_path= 'toy_dataset.csv'):
  """
  Read dataset from given path into a Spark DataFrame.
  Parameters:
    -----------
    file_path : str
        The name of the given dataset (unigrams or bigrams or both).

    Returns:
    --------
    df_dataset : DataFrame
        A DataFrame of calls with the given dataset info.
  """
  df_dataset = spark.read.csv(file_path, header=True, inferSchema=True)

  # convert start - end times to duration
  # 1st - Register the UDFs in Spark
  calculate_duration_minutes_udf = udf(calculate_duration_minutes, DoubleType())

  # 2nd - use udfs to add columns for duration in minutes
  df_dataset = df_dataset.withColumn('duration_minutes', calculate_duration_minutes_udf(col('Start_Time'), col('End_Time')))

  #3rd - Adjust Client1 and Client2 to ensure Client1 is the smaller value and Client2 the larger
  df_dataset = df_dataset.withColumn("Client1_min", least(col("Client1"), col("Client2"))) \
       .withColumn("Client2_max", greatest(col("Client1"), col("Client2"))) \
       .drop("Client1", "Client2") \
       .withColumnRenamed("Client1_min", "Client1") \
       .withColumnRenamed("Client2_max", "Client2")


  # 4th - Aggregate total duration for each unique pair (Client1, Client2)
  df_aggregated = df_dataset.groupBy("Client1", "Client2") \
    .agg(F.sum('duration_minutes').alias("total_duration_minutes"))

  # Join the aggregated total duration back to the original DataFrame
  df_dataset = df_dataset.drop('duration_minutes') \
        .join(df_aggregated, on=["Client1", "Client2"], how="left")


  print("The following dataframe has been read from the CSV file:")
  df_dataset.show()
  return df_dataset

@track_stage("Stage 2: Preprocessing and creating the graph")
def create_graph_from_dataframe(df_dataset):
  """
  Create graph in GraphFrame from the calls in the current dataset.
  Parameters:
    -----------
    df_dataset : DataFrame
        A DataFrame of calls with the given dataset info.

    Returns:
    --------
    df_dataset : DataFrame
        A DataFrame of calls with the given dataset info.
  """

  # Create Graph using GraphFrames for community detection
  vertices = df_dataset.selectExpr("Client1 as id").union(df_dataset.selectExpr("Client2 as id")).distinct()
  edges = df_dataset.selectExpr("Client1 as src", "Client2 as dst", "total_duration_minutes as weight")
  # Cache vertices and edges
  vertices.cache()
  edges.cache()

  # Create a GraphFrame
  g = GraphFrame(vertices, edges)

  # Find connected components (communities) using GraphFrames
  connected_components_result = g.connectedComponents()

  # Create a mapping from original community IDs to sequential ones
  community_mapping = connected_components_result.select("component").distinct() \
      .orderBy("component") \
      .withColumn("new_id", row_number().over(Window.orderBy("component"))) \
      .cache()

  # Join the result (community IDs) with the original dataframe and map to new sequential IDs
  df_with_communities = df_dataset.join(connected_components_result, df_dataset['Client1'] == connected_components_result['id'], 'inner') \
      .join(community_mapping, connected_components_result['component'] == community_mapping['component'], 'inner') \
      .drop(connected_components_result['id']) \
      .drop(community_mapping['component']) \
      .withColumnRenamed('new_id', 'community_id')

  # Calculate the number of unique clients (community size) per community
  community_sizes = df_with_communities.select('community_id', 'Client1').union(df_with_communities.select('community_id', 'Client2')) \
      .distinct() \
      .groupBy('community_id').agg(countDistinct('Client1').alias('community_size'))

  # Merge the community sizes into the main DataFrame
  df_final = df_with_communities.join(community_sizes, 'community_id')

  # Get list of tuples for each community member by considering both Client1 and Client2
  community_members = df_final.select("community_id", "Client1", "Client2", "total_duration_minutes") \
    .distinct() \
    .orderBy("Client1") \
    .groupBy("community_id") \
    .agg(F.collect_list(F.struct(
        F.col("Client1"),
        F.col("Client2"),
        F.col("total_duration_minutes")
    )).alias("members")) \
    .orderBy("community_id")

  # Show the final DataFrame with community IDs, duration, and community sizes
  print("\nFinal DataFrame with Sequential Community IDs:")
  df_final.select('Client1',
                  'Client2',
                  'total_duration_minutes',
                  'community_id',
                  'community_size') \
      .orderBy("community_id") \
      .show()

  # Show the list of community members as tuples
  print("\nCommunity Members with Sequential IDs:")
  community_members.show(truncate=False)

  # Save results to CSV files
  # Save the main analysis results
  df_final.select('Client1',
                  'Client2',
                  'total_duration_minutes',
                  'community_id',
                  'community_size') \
      .orderBy("community_id") \
      .write.mode("overwrite").option("header", "true") \
      .csv(f"{dataset_name}_community_analysis_results")

  # Save community members in a flattened format
  df_final.select('community_id',
                  'Client1',
                  'Client2',
                  'total_duration_minutes') \
      .distinct() \
      .orderBy("community_id") \
      .write.mode("overwrite").option("header", "true") \
      .csv(f"{dataset_name}_community_members_results")

  # Optionally, if you want to save additional community statistics
  community_stats = df_final.groupBy('community_id') \
      .agg(
          countDistinct('Client1', 'Client2').alias('unique_members'),
          count('*').alias('total_calls'),
          sum('total_duration_minutes').alias('sum_duration_minutes'),
          avg('total_duration_minutes').alias('avg_call_duration'),
          percentile_approx('total_duration_minutes', 0.25).alias('duration_25th_percentile'),
          percentile_approx('total_duration_minutes', 0.5).alias('median_call_duration'),
          percentile_approx('total_duration_minutes', 0.75).alias('duration_75th_percentile')
      ) \
      .orderBy('community_id')

  community_stats.write.mode("overwrite") \
      .option("header", "true") \
      .csv(f"{dataset_name}_community_statistics_results")

  print("This is the community stats:")
  community_stats.show(truncate=False)
  return df_final, community_members, community_stats

# Create CSR adjacency matrices for each community and serialize them
@track_stage("Stage 3: Creating CSR matrices")
def format_members_to_csr_matrix(community_members, community_stats):
  """
  Create CSR adjacency matrices for each community and serialize them.

  Parameters:
    community_members: Dataframe
    A dataframe of a specific community's members
    community_stats: Dataframe
    A dataframe of all the communities statistics
  """
  use_weights=True
  # Use the function to generate a serialized CSR matrix for each community and show the results
  result_true = community_members.groupBy("community_id").apply(create_csr_matrix_from_edges)
  print(f"This is the csr formating results, weight = {use_weights}:")
  result_true.show(truncate=False)
  use_weights=False
  result_false = community_members.groupBy("community_id").apply(create_csr_matrix_from_edges)
  print(f"This is the csr formating results, weight = {use_weights}:")
  result_false.show(truncate=False)

  # Join the community statistics dataframe and the csr_matrix dataframe
  # for final analysis
  df_community_stats_csr = community_stats.join(df_csr_matrix_result,
                                                on='community_id', how='inner')
  print("This is the statsitcs and csr dataframe joined:")
  df_community_stats_csr.show(truncate=False)

  return df_community_stats_csr


# Function to create similarity-based subgroups by comparing multiple columns
@track_stage("Stage 4: Calculate similarities between communities")
def create_similarity_subgroups(df, columns, tolerances):
    """
    Create similarity-based subgroups based on specified columns and tolerances, then apply a custom function.

    Parameters:
        df (DataFrame): The Spark DataFrame with community data.
        columns (list): List of column names to consider for similarity.
        tolerances (dict): Dictionary specifying the tolerance (± range) for each column.
    """
    # Collect the DataFrame into a list of rows
    communities = df.collect()

    # Initialize a list to store similarity groups
    similarity_groups = []
    df_groups = None

    # Compare each pair of communities
    for i, j in combinations(range(len(communities)), 2):
        community_i = communities[i]
        community_j = communities[j]

        # Check if the communities are similar based on all specified columns and tolerances
        is_similar = all(
            abs(community_i[column] - community_j[column]) <= tolerances[column]
            for column in columns
        )

        # If they are similar, add them to the same group
        if is_similar:
            found_group = False
            for group in similarity_groups:
                if community_i.community_id in group or community_j.community_id in group:
                    group.add(community_i.community_id)
                    group.add(community_j.community_id)
                    found_group = True
                    break
            if not found_group:
                similarity_groups.append({community_i.community_id, community_j.community_id})

    # Create a DataFrame for each subgroup and apply the custom function
    for group in similarity_groups:
        subgroup_df = df.filter(F.col("community_id").isin(group))
        subgroup_cross_joined = calculate_similarities(subgroup_df)
        # Initialize or append to df_groups
        if df_groups is None:
            df_groups = subgroup_cross_joined
        else:
            df_groups = df_groups.union(subgroup_cross_joined)


    # export all found groups
    if df_groups:
      df_groups.write \
                  .mode("overwrite") \
                  .option("header", header) \
                  .csv(f"{dataset_name}_df_groups.csv")
    else:
      print("No groups found!")
    return df_groups

def export_similarity_groups(final_similarity_df, df_clients_info):
    # Set the similarity threshold
    similarity_threshold = 0.55

    # Filter similar pairs and define vertices and edges
    similar_pairs = final_similarity_df.filter(F.col("final_similarity") >= similarity_threshold)
    vertices = similar_pairs.select("community_id_1").union(similar_pairs.select("community_id_2")).distinct() \
        .withColumnRenamed("community_id_1", "id")
    edges = similar_pairs.select(
        F.col("community_id_1").alias("src"),
        F.col("community_id_2").alias("dst")
    )

    # Create the graph and find connected components (clusters of communities)
    g = GraphFrame(vertices, edges)
    connected_components = g.connectedComponents()

    # Group communities by connected component (cluster) and assign group numbers
    grouped_communities = connected_components.groupBy("component") \
        .agg(F.collect_list("id").alias("community_group")) \
        .withColumn("group_number", F.row_number().over(Window.orderBy("component")))

    # Explode community groups to get individual community IDs with group numbers
    exploded_communities = grouped_communities \
        .select("group_number", F.explode("community_group").alias("community_id")) \
        .join(df_clients_info, on="community_id", how="left") \
        .select(
            "group_number",
            "community_id",
            "Client1",
            "Client2",
            "Start_Time",
            "End_Time"
        )

    # Format the output text lines with group and community information
    output_df = exploded_communities \
        .withColumn("line", F.concat_ws(", ", "Client1", "Client2", "Start_Time", "End_Time")) \
        .groupBy("group_number", "community_id") \
        .agg(F.collect_list("line").alias("community_lines")) \
        .groupBy("group_number") \
        .agg(
            F.concat_ws("\n",
                        F.lit("Group " + F.col("group_number").cast("string") + ":"),
                        F.concat_ws("\n",
                                    F.col("community_id").cast("string"),
                                    F.concat_ws("\n", "community_lines")
                                    )
                        )
        )

    # Write to text file in distributed manner
    output_path = f"{dataset_name}_similarity_groups.txt"
    output_df.select("formatted_output").write.text(output_path)
    print(f"Exported similarity groups to: {output_path}")

In [18]:
# 8th cell - Read datasets dataframe, and iterate over each one to create communities and form similarity groups
# Read the csv
df_datasets = pd.read_csv("dataset_metadata.csv")
# Print a peak of the dataset
print("These are the found datasets")
df_datasets.head(10)

# set global params
clear_csv = False
dataset_file_path = "toy_dataset.csv"
dataset_name = "toy_dataset"
use_weights = False

for i, dataset in df_datasets.iterrows():
  print(f"Starting to process {i+1} dataset with the following params: \n{dataset}")
  # step 1 - read the dataset
  dataset_file_path = dataset["csv_filename"]
  # Get the base name from the path (e.g., 'file.txt' from '/path/to/file.txt')
  basename =  os.path.basename(dataset_file_path)
  # Split the filename and extension
  dataset_name = os.path.splitext(basename)[0]
  clear_csv = i == 0 # only clear the if this is the 1st dataset
  df_dataset = read_csv_to_dataframe(dataset_file_path)

  # step 2 - preprocess (convert to duartion in min, create grpah, and find commutnies)
  df_final, community_members, community_stats = create_graph_from_dataframe(df_dataset)

  # step 3 - create CSR matrix for each communite
  df_community_stats_csr = format_members_to_csr_matrix(community_members, community_stats)

  # step 4 - calculate similarities between communties for find groups
  # Define columns to use for bucketization
  columns = ['unique_members', 'total_calls']

  # Create adaptive buckets and calculate similarities
  # Define columns to use for bucketization
  columns = ['unique_members']
  df_with_buckets = create_adaptive_buckets(df_community_stats_csr, columns, min_size=2)
  df_groups = calculate_similarity_within_buckets(df_with_buckets, columns)

These are the found datasets
Starting to process 1 dataset with the following params: 
num_communities                                                5
community_size_range                                      (3, 5)
calls_per_connection_range                                (1, 2)
duration_range                                         (30, 120)
density                                                      0.5
num_samples                                                   50
dataset_name                                    dataset_config_1
csv_filename                  /content/datasets/dataset_config_1
Name: 0, dtype: object
Starting Stage 1: Reading the calls dataset
The following dataframe has been read from the CSV file:
+-------+-------+----------+----------+----------------------+
|Client1|Client2|Start_Time|  End_Time|total_duration_minutes|
+-------+-------+----------+----------+----------------------+
|      0|      1|2401010154|2401010350|                 147.0|
|      0|      1|




Final DataFrame with Sequential Community IDs:
+-------+-------+----------------------+------------+--------------+
|Client1|Client2|total_duration_minutes|community_id|community_size|
+-------+-------+----------------------+------------+--------------+
|      0|      4|                  87.0|           1|             5|
|      0|      3|                 220.0|           1|             5|
|      0|      3|                 220.0|           1|             5|
|      0|      2|                 166.0|           1|             5|
|      0|      2|                 166.0|           1|             5|
|      0|      1|                 147.0|           1|             5|
|      0|      1|                 147.0|           1|             5|
|      3|      4|                 111.0|           1|             5|
|      3|      4|                 111.0|           1|             5|
|      1|      4|                 161.0|           1|             5|
|      1|      4|                 161.0|           1|  



PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "<ipython-input-14-593420c939d1>", line 83, in create_csr_matrix_from_edges
TypeError: tuple indices must be integers or slices, not str


In [None]:
# 9th cell - Plot the resources usgae of current datasets
df_monitor = spark.read.csv("stage_metrics",  header=True)
df_monitor = df_monitor.orderBy('stage_name')
print(f"count: {df_monitor.count()}")
df_monitor.show(truncate=False)

# Convert the Spark DataFrame to a Pandas DataFrame for plotting
pdf_monitor = df_monitor.toPandas()

# Select the columns of interest for plotting
columns_of_interest = ['stage_name', 'numTasks', 'stageDuration', 'peakExecutionMemory', 'executorCpuTime']
pdf_plot = pdf_monitor[columns_of_interest].copy()  # Create a copy to avoid the warning

# Format memory and time measurements using .loc
pdf_plot.loc[:, 'peakExecutionMemory'] = pdf_plot['peakExecutionMemory'].astype(float) / (1024**3)  # Bytes to GB
pdf_plot.loc[:, 'stageDuration'] = pdf_plot['stageDuration'].astype(float) / (1000 * 60)  # ms to minutes
pdf_plot.loc[:, 'executorCpuTime'] = pdf_plot['executorCpuTime'].astype(float) / (1000 * 60)  # ms to minutes
pdf_plot.loc[:, 'numTasks'] = pdf_plot['numTasks'].astype(int)

# Extract stage numbers from stage_name
pdf_plot['stage_number'] = pdf_plot['stage_name'].apply(lambda x: int(re.search(r'\d+', x).group()))

# Melt the DataFrame for easier plotting with Seaborn
pdf_plot_melted = pd.melt(pdf_plot, id_vars=['stage_name', 'stage_number'], var_name='Metric', value_name='Value')

fig = plt.figure(figsize=(10, 8))
# Create the plot with facets, stage numbers as x-axis, and legend with full stage names
g = sns.FacetGrid(pdf_plot_melted, col='Metric',
                   height=6, aspect=1.5,
                  col_wrap=2, sharey=False, sharex=False)
g.map(sns.barplot, 'stage_number', 'Value', palette='hls', hue='stage_name',
      data=pdf_plot_melted, dodge=False)  # Pass data argument
g.set_xticklabels(pdf_plot['stage_number'].unique(), size=16)
g.set_titles("{col_name}", size=18)
g.fig.suptitle('Spark Stage Metrics', y=1.02, size=18)
g.add_legend(loc='upper right', bbox_to_anchor=(1.2, 0.92))
g.legend.set_title('Stage Names', prop={'weight': 'bold', 'size': 22})  # Add title and format it
for text in g.legend.get_texts():
    text.set_fontsize(18)  # Legend label font size# Set y-axis labels with units and add x-axis label
for ax in g.axes.flat:
    metric = ax.get_title()
    if metric == 'peakExecutionMemory':
        ax.set_ylabel('Peak Execution Memory (GB)', size=18)
    elif metric in ('stageDuration', 'executorCpuTime'):
        ax.set_ylabel('Time (minutes)', size=18)
    else:
        ax.set_ylabel(metric, size=18)

    ax.set_xlabel('Stage Number', size=18)  # Change x-axis label to "Stage Number"

g.set_yticklabels(fontsize=18)
plt.tight_layout()
plt.show()

In [None]:
# 10th cell - kill spark
spark.stop()