## Packages

In [None]:
# Setup of the Spark
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 49 not upgraded.
Need to get 39.6 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 123623 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u422-b05-1~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u422-b05-1~22.04) ...
Sel

In [None]:
import time
import random
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, when
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType, IntegerType

from warnings import filterwarnings
filterwarnings('ignore')

In [None]:
# Creating the session
conf = SparkConf()
conf.set("spark.ui.port", "4050")
conf.setAppName("DIS-project")
conf.setMaster("local[*]")
conf.set("spark.driver.memory", "8G")
conf.set("spark.driver.maxResultSize", "8g")
conf.set("spark.executor.memory", "4G")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

## Data Generation

In [None]:
def generate_synthetic_call_data(num_clients,
                                 call_frequency_range,
                                 call_duration_range,
                                 time_range,
                                 save_to_csv=False,
                                 logs=False):
    """
    Generates synthetic call data for a specified number of clients, with calls distributed across
    multiple communities. The function returns the data as a PySpark DataFrame and optionally saves
    it to a CSV file.

    Args:
        num_clients (int): Total number of clients.
        call_frequency_range (tuple): Min and max range of call frequency per client in each community.
        call_duration_range (tuple): Min and max duration range for each call in minutes.
        time_range (tuple): Start and end time range for calls, formatted as 'YYMMDDHHMM'.
        save_to_csv (bool): If True, saves the resulting DataFrame to a CSV file. Default is False.
        logs (bool): If True, logs intermediate steps and timing information. Default is False.

    Returns:
        pyspark.sql.DataFrame: A DataFrame containing generated call data with columns:
            - 'c1': ID of the first client in the call.
            - 'c2': ID of the second client in the call.
            - 'start': Start time of the call (formatted as 'YYMMDDHHMM').
            - 'end': End time of the call (formatted as 'YYMMDDHHMM').
    """

    community_sizes, remaining_clients = [], num_clients
    lower_size_proportion = 0.05
    upper_size_proportion = 0.2

    while remaining_clients / num_clients > 0.1:
        # Split version 2
        min_possible_size = int(remaining_clients * lower_size_proportion) if int(remaining_clients * lower_size_proportion) > 5 else 5
        max_possible_size = int(remaining_clients * upper_size_proportion) if int(remaining_clients * upper_size_proportion) > min_possible_size else min_possible_size + 5
        size = random.randint(min_possible_size, max_possible_size)
        # print(f'Size = ({min_possible_size}, {max_possible_size}) = {size}')
        community_sizes.append(size)
        remaining_clients -= size

    community_sizes.append(remaining_clients)  # The last community gets the remaining clients


    # Step 1.3: Shuffle the clients list for randomness
    # random.shuffle(clients)
    shuffled_indices = list(range(1, num_clients + 1))
    random.shuffle(shuffled_indices)

    # Step 3: Assign clients to communities based on the predefined sizes
    communities = []
    current_index = 0

    for size in community_sizes:
        communities.append(shuffled_indices[current_index:current_index + size])
        current_index += size
    print(f"Number of clients per community: {' '.join(map(str, [len(comm) for comm in communities]))}")
    print(f"Total of {len(communities)} communities created.")

    call_data = []

    # Step 5: Generate call data for each community
    min_duration, max_duration = call_duration_range
    start_time_str, end_time_str = time_range

    start_time = datetime.strptime(start_time_str, '%y%m%d%H%M')
    end_time = datetime.strptime(end_time_str, '%y%m%d%H%M')
    time_diff_minutes = int((end_time - start_time).total_seconds() / 60)
    s5 = time.time()
    for community in communities:
        # Introduce a community-level time shift to vary start times across communities
        community_time_shift = random.randint(0, time_diff_minutes // 5)
        community_start_time = start_time + timedelta(minutes=community_time_shift)

        # Vary the duration range for each community to create different mean durations
        community_min_duration = random.randint(min_duration, max_duration // 2)
        community_max_duration = random.randint(max_duration // 2, max_duration)

        # Vary call frequency for communities: some are more active, others less
        community_call_frequency_range = (
            random.randint(call_frequency_range[0] // 2, call_frequency_range[0]),
            random.randint(call_frequency_range[1], call_frequency_range[1] * 2)
        )

        for client in community:
            call_frequency = random.randint(community_call_frequency_range[0], community_call_frequency_range[1])
            for _ in range(call_frequency):
                other_client = random.choice(community)
                while other_client == client:
                    other_client = random.choice(community)
                call_data.append(generate_call(client, other_client, community_start_time, time_diff_minutes, community_min_duration, community_max_duration))

    e5 = time.time()
    print(f"Step 3 generate call data finished in {np.round(e5 - s5, 3)} seconds") if logs else None

    # Step 6: Create and return a PySpark DataFrame
    s7 = time.time()
    call_df = spark.createDataFrame(call_data, schema=['c1', 'c2', 'start', 'end'])
    e7 = time.time()
    print(f"Step 6 create PySpark DataFrame finished in {np.round(e7 - s7, 3)} seconds") if logs else None

    tot_num_calls = call_df.count()
    print(f"Total number of calls: {tot_num_calls}")
    # Step 7: Optionally save to CSV
    if save_to_csv:
        s6 = time.time()
        call_df.coalesce(1).write.csv(f'{tot_num_calls}_calls.csv', header=True, mode='overwrite')
        e6 = time.time()
        print(f"Step 7 save to CSV finished in {np.round(e6 - s6, 3)} seconds") if logs else None

    return call_df


def generate_call(client_a,
                  client_b,
                  start_time,
                  time_diff_minutes,
                  min_duration,
                  max_duration):
    """
    Generates a single call between two clients with a random start time within a given range
    and a random duration within specified limits.

    Args:
        client_a (int): ID of the initiating client.
        client_b (int): ID of the receiving client.
        start_time (datetime.datetime): Start of the allowed time range for calls.
        time_diff_minutes (int): Total allowed time range in minutes.
        min_duration (int): Minimum duration for each call in minutes.
        max_duration (int): Maximum duration for each call in minutes.

    Returns:
        tuple: A tuple representing the call details, with:
            - client_a (int): ID of the initiating client.
            - client_b (int): ID of the receiving client.
            - call_start_str (str): Start time of the call (formatted as 'YYMMDDHHMM').
            - call_end_str (str): End time of the call (formatted as 'YYMMDDHHMM').
    """
    # Random call start time within the time range
    random_minutes = random.randint(0, time_diff_minutes)
    call_start_time = start_time + timedelta(minutes=random_minutes)

    # Vary call duration by adding randomness to the duration range
    call_duration = random.randint(min_duration, max_duration)
    call_end_time = call_start_time + timedelta(minutes=call_duration)

    # Randomly add variability to start and end times (like delays or extensions)
    additional_random_minutes = random.randint(-5, 10)
    call_start_time += timedelta(minutes=additional_random_minutes)
    call_end_time += timedelta(minutes=random.randint(-2, 5))

    call_start_str = call_start_time.strftime('%y%m%d%H%M')
    call_end_str = call_end_time.strftime('%y%m%d%H%M')

    return (client_a, client_b, call_start_str, call_end_str)


## Community Creation

In [None]:
def dfs(client, community, visited, graph):
    """
    Performs a depth-first search (DFS) to find all connected nodes in a graph component starting from a given client.

    Args:
        client (int): The starting client node for DFS.
        community (list): List to store clients in the current community.
        visited (set): Set of visited clients to avoid re-processing.
        graph (dict): Dictionary representing the graph with clients as keys and their connections as values.
    """
    stack = [client]
    visited.add(client)

    while stack:
        node = stack.pop()
        community.append(node)
        for neighbor in graph[node]:
            if neighbor not in visited:
                visited.add(neighbor)
                stack.append(neighbor)

def find_communities(call_df, logs=False):
    """
    Identifies communities of clients from call data using DFS to find connected components.

    Args:
        call_df (pyspark.sql.DataFrame): DataFrame containing call data with columns:
            - 'c1': ID of the first client in a call.
            - 'c2': ID of the second client in a call.
        logs (bool): If True, logs the execution time of each step. Default is False.

    Returns:
        pyspark.RDD: An RDD of (client, community_number) pairs, where each client is assigned a community.

    Steps:
        1. Build a graph of unique client connections from the call data.
        2. Aggregate data to create a list of connections for each client.
        3. Collect the graph as a dictionary to enable DFS traversal.
        4. Perform DFS to identify connected components, each representing a community.
        5. Convert the client-community mapping to an RDD for efficient processing.
        6. Validate the results by counting unique communities.
    """
    # Step 1: Build the graph from call data in a distributed manner
    s1 = time.time()
    graph_df = call_df.select("c1", "c2").distinct()
    e1 = time.time()
    print(f"Step 1 build graph finished in {np.round(e1 - s1, 3)} seconds") if logs else None

    # Step 2: Aggregate the data to create connections between clients
    s2 = time.time()
    client_graph = (
        graph_df
        .withColumn("client_pair", F.array("c1", "c2"))
        .select(F.explode("client_pair").alias("client"), "c1", "c2")
        .groupBy("client")
        .agg(F.collect_set(F.when(F.col("client") == F.col("c1"), F.col("c2")).otherwise(F.col("c1"))).alias("connections"))
    )
    # print(client_graph.collect())
    # print(client_graph.count())
    e2 = time.time()
    print(f"Step 2 aggregate data finished in {np.round(e2 - s2, 3)} seconds") if logs else None
    # print(f"Client graph: {client_graph.collect()}")

    # Step 3: Collect the graph as a dictionary for DFS traversal
    s3 = time.time()
    graph = defaultdict(list)
    for row in client_graph.collect():
        client = row["client"]
        connections = row["connections"]
        graph[client].extend(connections)
    e3 = time.time()
    print(f"Step 3 collect graph finished in {np.round(e3 - s3, 3)} seconds") if logs else None

    # V2
    s4 = time.time()
    visited = set()
    client_to_community = {}
    community_counter = 1
    community_sizes = {}

    for client in graph:
        if client not in visited:
            community = []
            dfs(client, community, visited, graph)

            # Only add the community if it has at least 5 clients
            if len(community) >= 5:
                for member in community:
                    client_to_community[member] = community_counter
                community_sizes[community_counter] = len(community)
                community_counter += 1
    e4 = time.time()
    print(f"Step 4 DFS traversal finished in {np.round(e4 - s4, 3)} seconds") if logs else None
    # print(f"Client to community: {client_to_community}")
    # Step 5: Return RDD of (client, community_number)
    s5 = time.time()
    client_to_community_rdd = spark.sparkContext.parallelize([(client, comm_number) for client, comm_number in client_to_community.items()])
    e5 = time.time()
    print(f"Step 5 convert to RDD finished in {np.round(e5 - s5, 3)} seconds") if logs else None

    # Step 6: Validate result by counting unique communities
    s6 = time.time()
    unique_comms_count = client_to_community_rdd.map(lambda x: x[1]).distinct().count()
    e6 = time.time()
    print(f"Step 6 count unique communities finished in {np.round(e6 - s6, 3)} seconds") if logs else None
    print(f"Number of unique communities found: {unique_comms_count}")

    return client_to_community_rdd


## Similarity Check - Ready

In [None]:
def normalized_difference(val1, val2):
    """
    Calculate the normalized difference between two values, accounting for None values.

    Parameters:
        val1 (float or None): First value to compare.
        val2 (float or None): Second value to compare.

    Returns:
        float: Normalized difference between val1 and val2, or 0 if either value is None
               or their maximum is zero.
    """
    if val1 is None or val2 is None:
        return 0  # Or another appropriate default value

    # Normalize difference based on the maximum of the two values
    norm_diff = np.abs(val1 - val2) / np.max([val1, val2]) if np.max([val1, val2]) != 0 else 0
    return norm_diff

def compute_similarity(comm1_number: int, comm2_number: int,
                       num_clients1: int, mean_duration1: float, stddev_duration1: float, total_duration1: float,
                       num_clients2: int, mean_duration2: float, stddev_duration2: float, total_duration2: float,
                       logs=False) -> float:
    """
    Compute the similarity score between two communities based on normalized differences
    in metrics like number of clients, mean call duration, standard deviation, and total duration.

    Parameters:
        comm1_number (int): Community number for the first community.
        comm2_number (int): Community number for the second community.
        num_clients1 (int): Number of clients in the first community.
        mean_duration1 (float): Mean call duration in the first community.
        stddev_duration1 (float): Standard deviation of call duration in the first community.
        total_duration1 (float): Total call duration in the first community.
        num_clients2 (int): Number of clients in the second community.
        mean_duration2 (float): Mean call duration in the second community.
        stddev_duration2 (float): Standard deviation of call duration in the second community.
        total_duration2 (float): Total call duration in the second community.
        logs (bool): If True, print detailed logs for each similarity component.

    Returns:
        float: Similarity score between the two communities (1 indicates identical communities).
    """

    secondary_prints = 0
    print(f"{comm1_number} <=> {comm2_number}.", end=" ") if logs else None

    norm_num_clients = normalized_difference(num_clients1, num_clients2)
    print(f"Normalized number of clients difference: {norm_num_clients:.4f}") if secondary_prints else None

    norm_mean_duration = normalized_difference(mean_duration1, mean_duration2)
    print(f"Normalized mean duration difference: {norm_mean_duration:.4f}") if secondary_prints else None

    norm_stddev_duration = normalized_difference(stddev_duration1, stddev_duration2)
    print(f"Normalized stddev duration difference: {norm_stddev_duration:.4f}") if secondary_prints else None

    norm_total_duration = normalized_difference(total_duration1, total_duration2)
    print(f"Normalized total duration difference: {norm_total_duration:.4f}") if secondary_prints else None

    # Compute similarity score (as per previous formulas)
    similarity_score = 1 - (1 / 4) * (norm_num_clients + norm_mean_duration + norm_stddev_duration + norm_total_duration)

    # Print the final similarity score
    print(f"= {similarity_score:.4f}") if logs else None

    return similarity_score


def group_similar_communities(call_df: DataFrame,
                              client_to_community_rdd,
                              filename,
                              save_to_txt=False,
                              logs=True) -> DataFrame:
    """
    Group similar communities based on similarity metrics calculated from call data.

    Parameters:
        call_df (DataFrame): DataFrame containing call data, with columns for client IDs and call times.
        client_to_community_rdd (RDD): RDD mapping clients to their respective communities.
        filename (str): Filename to save community groupings if `save_to_txt` is True.
        save_to_txt (bool): If True, save grouped communities to a text file.
        logs (bool): If True, print detailed logs for each processing step.

    Returns:
        DataFrame: A DataFrame with each row representing a group of similar communities.
    """
    THRESHOLD = 0.8  # Set threshold for similarity score

    # Step 1: Convert RDD to DataFrame and join with the original call data
    s1 = time.time()

    comm_df = client_to_community_rdd.toDF(["client", "community_number"])

    enriched_df = call_df.join(comm_df, call_df["c1"] == comm_df["client"], "left").drop("client")
    enriched_df = enriched_df.withColumnRenamed("community_number", "comm_number_c1")
    enriched_df = enriched_df.join(comm_df, call_df["c2"] == comm_df["client"], "left").drop("client")
    enriched_df = enriched_df.withColumnRenamed("community_number", "comm_number_c2")

    enriched_df = enriched_df.withColumn(
        "comm_number", F.coalesce(F.col("comm_number_c1"), F.col("comm_number_c2"))
    ).drop("comm_number_c1", "comm_number_c2").dropna()



    # print(enriched_df.select('comm_number').distinct().show(n=25, truncate = False))
    # print(f"Number of unique communities: {enriched_df.select('comm_number').distinct().count()}")

    # Convert durations to minutes (assuming 'start' and 'end' are in seconds)
    enriched_df = enriched_df.withColumn("duration", (enriched_df["end"] - enriched_df["start"]) / 60.0)
    e1 = time.time()
    print(f"Step 1 convert RDD to DataFrame and join with the original call data finished {np.round(e1 - s1, 3)} seconds") if logs else None

    # Step 2: Calculate community metrics
    s2 = time.time()
    community_metrics = (
        enriched_df.groupBy("comm_number")
        .agg(
            F.countDistinct("c1").alias("num_clients"),
            F.mean("duration").alias("mean_duration"),
            F.stddev("duration").alias("stddev_duration"),
            F.sum("duration").alias("total_duration")
        )
    )
    e2 = time.time()
    print(f"Step 2 calculate community metrics finished {np.round(e2 - s2, 3)} seconds") if logs else None

    # Step 3: Compare communities using similarity formula
    s3 = time.time()
    community_data = community_metrics.collect()

    similar_communities = []
    assigned_groups = {}

    for i in range(len(community_data)):
        for j in range(i + 1, len(community_data)):
            comm1 = community_data[i]
            comm2 = community_data[j]

            # Calculate similarity using the combined similarity score
            similarity_value = compute_similarity(
                  comm1.comm_number, comm2.comm_number,
                  comm1.num_clients, comm1.mean_duration, comm1.stddev_duration, comm1.total_duration,
                  comm2.num_clients, comm2.mean_duration, comm2.stddev_duration, comm2.total_duration
              )

            # Store similarity results
            similar_communities.append((comm1.comm_number, comm2.comm_number, similarity_value))

            if similarity_value >= THRESHOLD:
                # Ensure communities only belong to one group
                group1 = assigned_groups.get(comm1.comm_number, set())
                group2 = assigned_groups.get(comm2.comm_number, set())

                if not group1 and not group2:
                    # Create a new group for both communities
                    new_group = {comm1.comm_number, comm2.comm_number}
                    assigned_groups[comm1.comm_number] = new_group
                    assigned_groups[comm2.comm_number] = new_group
                elif group1 and not group2:
                    # Add comm2 to comm1's group
                    group1.add(comm2.comm_number)
                    assigned_groups[comm2.comm_number] = group1
                elif group2 and not group1:
                    # Add comm1 to comm2's group
                    group2.add(comm1.comm_number)
                    assigned_groups[comm1.comm_number] = group2
                else:
                    # Merge the two groups
                    merged_group = group1.union(group2)
                    for comm in merged_group:
                        assigned_groups[comm] = merged_group
    e3 = time.time()
    print(f"Step 3 compare communities using similarity formula finished {np.round(e3 - s3, 3)} seconds") if logs else None


    # Step 4: Group similar communities and save final groups to a text file
    s4 = time.time()
    final_groups = {}
    for comm, group in assigned_groups.items():
        group_key = tuple(sorted(group))  # Use sorted tuple as a key for grouping
        if group_key not in final_groups:
            final_groups[group_key] = group_key

    # Ensure that ungrouped communities are also included in the final output
    ungrouped_communities = set(comm.comm_number for comm in community_data) - set(assigned_groups.keys())
    if ungrouped_communities:  # Check if there are any ungrouped communities
        for comm in ungrouped_communities:
            final_groups[frozenset({comm})] = frozenset({comm})
    e4 = time.time()
    print(f"Step 4 group similar communities took {np.round(e4 - s4, 3)} seconds") if logs else None

    # Step 5 printing and saving
    gr_num = 1
    s5 = time.time()
    if save_to_txt:
        # Step 5.1: Group the DataFrame by communities and collect all the necessary data at once
        s51 = time.time()
        community_calls = (
            enriched_df.groupBy("comm_number")
            .agg(F.collect_list(F.struct("c1", "c2", "start", "end")).alias("calls"))
            .collect()
        )
        e51 = time.time()
        print(f"Step 5.1 collect all data at once finished {np.round(e51 - s51, 3)} seconds") if logs else None

        # Step 5.2: Build the text output in memory and then write it all at once
        s52 = time.time()
        output_lines = []

        for group_key in final_groups:
            output_lines.append(f"Group {gr_num}:\n")
            print(f"Group {gr_num}: " + ", ".join(f"c{comm}" for comm in group_key))
            comm_num = 1

            for comm in group_key:
                output_lines.append(f"  Community {comm_num}:\n")

                # Find the community data in the pre-collected results
                comm_data = next((comm_row.calls for comm_row in community_calls if comm_row.comm_number == comm), [])

                # Format the call data for each community
                for call in comm_data:
                    output_lines.append(f"    {call['c1']}, {call['c2']}, {call['start']}, {call['end']}\n")

                comm_num += 1

            output_lines.append("\n")
            gr_num += 1
        e52 = time.time()
        print(f"Step 5.2 build text output in memory finished {np.round(e52 - s52, 3)} seconds") if logs else None

        # Step 5.3: Write all lines to the file at once
        s53 = time.time()
        with open(f'{filename}.txt', 'w') as f:
            f.writelines(output_lines)
        e53 = time.time()
        print(f"Step 5.3 write all lines to the file {np.round(e53 - s53, 3)} seconds") if logs else None
    else:
        for group_key in final_groups:
            print(f"Group {gr_num}: " + ", ".join(f"c{comm}" for comm in group_key))
            gr_num += 1
    e5 = time.time()
    print(f"Step 5 write all lines to the file at once finished {np.round(e5 - s5, 3)} seconds") if logs else None

    # Prepare DataFrame output (if needed for further processing)
    group_data = [(list(group),) for group in final_groups.values()]
    schema = StructType([StructField("grouped_communities", ArrayType(IntegerType()), True)])
    return spark.createDataFrame(group_data, schema=schema)

## Simple test run one call

In [None]:
# Step 1: Generate synthetic call data
start_time_data_creation = time.time()

call_df = generate_synthetic_call_data(
    num_clients=10000,
    call_frequency_range=(1, 3),
    call_duration_range=(20, 600000),
    time_range=('2201010000', '2412312359'),
    save_to_csv=False,
    logs=False
)

end_time_data_creation = time.time()
time_secs_data_creation = np.round(end_time_data_creation - start_time_data_creation, 3)
print(f"Execution time for data creation: {time_secs_data_creation} seconds")
print('_____'*10)

# ______________________________________________________________________________________________________
# Step 2: Find communities and assign community numbers (now returns DataFrame with `comm_number`)
start_time_community_finding = time.time()

comms_rdd = find_communities(call_df, logs=True)

end_time_community_finding = time.time()
time_secs_community_finding = np.round(end_time_community_finding - start_time_community_finding, 3)
print(f"Execution time for community finding: {time_secs_community_finding} seconds")

comms_list = comms_rdd.collect()
community_counts = {}

for client, community in comms_list:
    community_counts[community] = community_counts.get(community, 0) + 1  # Use get with default value of 0

all_count = 0
for community, count in community_counts.items():
    # print(f"Community {community} has {count} clients.")
    all_count += count

print('Number of clients here ', all_count)
print('_____'*10)


# ______________________________________________________________________________________________________
# Step 3: Group similar communities (this step will still work with the community DataFrame)
start_time_similarity_check = time.time()

groups = group_similar_communities(call_df, comms_rdd, 'a', save_to_txt=False, logs=True)

end_time_similarity_check = time.time()
time_secs_similarity_check = np.round(end_time_similarity_check - start_time_similarity_check, 3)
print(f"Execution time for community grouping: {time_secs_similarity_check} seconds")

Number of clients per community: 1444 1257 838 449 415 1002 275 678 521 353 447 349 244 132 170 284 174 968
Total of 18 communities created.
Total number of calls: 24032
Execution time for data creation: 13.149 seconds
__________________________________________________
Step 1 build graph finished in 0.128 seconds
Step 2 aggregate data finished in 0.263 seconds
Step 3 collect graph finished in 4.349 seconds
Step 4 DFS traversal finished in 0.018 seconds
Step 5 convert to RDD finished in 0.015 seconds
Step 6 count unique communities finished in 1.3 seconds
Number of unique communities found: 18
Execution time for community finding: 6.079 seconds
Number of clients here  9835
__________________________________________________
Step 1 convert RDD to DataFrame and join with the original call data finished 0.556 seconds
Step 2 calculate community metrics finished 0.097 seconds
Step 3 compare communities using similarity formula finished 2.989 seconds
Step 4 group similar communities took 0.0 s

Toy Example

In [None]:
# test1.csv
data = [
    (1, 4, '2408010800', '2408010830'),
    (4, 6, '2408021000', '2408021030'),
    (1, 6, '2408021100', '2408021130'),
    (2, 3, '2408010900', '2408010930'),
    (3, 5, '2408010930', '2408011000'),
    (5, 7, '2408021400', '2408021430'),
    (2, 7, '2408030800', '2408030830'),
]

# test2.csv
data = [
    (1, 2, '2408010800', '2408010830'),
    (2, 3, '2408010900', '2408010930'),
    (1, 3, '2408011000', '2408011030'),
    (4, 5, '2408011100', '2408011130'),
    (5, 6, '2408011200', '2408011230'),
    (6, 7, '2408011300', '2408011330'),
    (4, 7, '2408011400', '2408011430'),
    (8, 9, '2408011500', '2408011530')
]


schema = StructType([
    StructField("c1", LongType(), True),
    StructField("c2", LongType(), True),
    StructField("start", StringType(), True),
    StructField("end", StringType(), True)
])

call_df = spark.createDataFrame(data, schema=schema)
call_df.show()

+---+---+----------+----------+
| c1| c2|     start|       end|
+---+---+----------+----------+
|  1|  2|2408010800|2408010830|
|  2|  3|2408010900|2408010930|
|  1|  3|2408011000|2408011030|
|  4|  5|2408011100|2408011130|
|  5|  6|2408011200|2408011230|
|  6|  7|2408011300|2408011330|
|  4|  7|2408011400|2408011430|
|  8|  9|2408011500|2408011530|
+---+---+----------+----------+



## Test Runs All Together

In [None]:
clients_call_freq_pairs = [
    [10_000, (1, 3)],
    [10_000, (4, 6)],
    [100_000, (1, 3)],
    [100_000, (4, 6)],
    [500_000, (1, 3)],
    [500_000, (4, 6)],
    [1_000_000, (1, 3)],
    [1_000_000, (4, 6)],
    [2_000_000, (1, 3)],
]

results = {
    'num_clients': [],
    'num_comms': [],
    'num_calls': [],
    'call_freq': [],
    'time_secs_data_creation': [],
    'time_secs_community_finding': [],
    'time_secs_similarity_finding': [],
    'time_secs_total': [],
}

for params in clients_call_freq_pairs:
    num_client = params[0]
    call_frequency = params[1]

    print(f"For {num_client} clients...")
    # Step 1: Generate synthetic call data
    start_time_data_creation = time.time()

    call_df = generate_synthetic_call_data(
        num_clients=num_client,
        call_frequency_range=call_frequency,
        call_duration_range=(20, 600000),
        time_range=('2201010000', '2412312359'),
        save_to_csv=True,
        logs=True
    )

    # Print length of the created df
    ncalls = call_df.count()
    print(f"Total number of calls will be processed: {ncalls}")
    end_time_data_creation = time.time()
    time_secs_data_creation = np.round(end_time_data_creation - start_time_data_creation, 3)
    print(f"Execution time for data creation: {time_secs_data_creation} seconds")
    print("_________"*7)

    # ______________________________________________________________________________________________________
    # Step 2: Find communities and assign community numbers (now returns DataFrame with `comm_number`)
    start_time_community_finding = time.time()

    comms_rdd = find_communities(call_df, logs=False)

    end_time_community_finding = time.time()
    time_secs_community_finding = np.round(end_time_community_finding - start_time_community_finding, 3)
    print(f"Execution time for community finding: {time_secs_community_finding} seconds")

    num_comms = comms_rdd.map(lambda x: x[1]).distinct().count()

    # ______________________________________________________________________________________________________
    # Step 3: Group similar communities (this step will still work with the community DataFrame)
    start_time_similarity_check = time.time()

    groups = group_similar_communities(call_df, comms_rdd, num_client, logs=False, save_to_txt=True)

    end_time_similarity_check = time.time()
    time_secs_similarity_check = np.round(end_time_similarity_check - start_time_similarity_check, 3)
    print(f"Execution time for community grouping: {time_secs_similarity_check} seconds")
    print("_________"*7)

    # ______________________________________________________________________________________________________
    # Calculate total time and store results
    total_time = np.round(
        time_secs_data_creation + time_secs_community_finding + time_secs_similarity_check, 3
    )
    print(f"Total execution time: {total_time} seconds")

    results['num_clients'].append(num_client)
    results['num_calls'].append(ncalls)
    results['call_freq'].append(call_frequency)  # Add call frequency as a fixed value
    results['time_secs_data_creation'].append(time_secs_data_creation)
    results['time_secs_community_finding'].append(time_secs_community_finding)
    results['time_secs_similarity_finding'].append(time_secs_similarity_check)
    results['time_secs_total'].append(total_time)
    results['num_comms'].append(num_comms)
    print("========="*7)

# Convert results to DataFrame and save to CSV
result_df = pd.DataFrame(results)
result_df.to_csv('performance_results.csv', index=False)

## Tests with bigger sizes

In [None]:
clients_call_freq_pairs = [
    [2_500_000, (1, 3)],
    [2_500_000, (4, 6)],
    [4_000_000, (1, 3)],
    [4_000_000, (4, 6)],
    [6_000_000, (1, 3)],
    [6_000_000, (4, 6)],
]

results = {
    'num_clients': [],
    'num_comms': [],
    'num_calls': [],
    'call_freq': [],
    'time_secs_data_creation': [],
    'time_secs_community_finding': [],
    'time_secs_similarity_finding': [],
    'time_secs_total': [],
}

for params in clients_call_freq_pairs:
    num_client = params[0]
    call_frequency = params[1]

    print(f"For {num_client} clients...")
    # Step 1: Generate synthetic call data
    start_time_data_creation = time.time()

    call_df = generate_synthetic_call_data(
        num_clients=num_client,
        call_frequency_range=call_frequency,
        call_duration_range=(20, 600000),
        time_range=('2201010000', '2412312359'),
        save_to_csv=True,
        logs=True
    )

    # Print length of the created df
    ncalls = call_df.count()
    print(f"Total number of calls will be processed: {ncalls}")
    end_time_data_creation = time.time()
    time_secs_data_creation = np.round(end_time_data_creation - start_time_data_creation, 3)
    print(f"Execution time for data creation: {time_secs_data_creation} seconds")
    print("_________"*7)

    # ______________________________________________________________________________________________________
    # Step 2: Find communities and assign community numbers (now returns DataFrame with `comm_number`)
    start_time_community_finding = time.time()

    comms_rdd = find_communities(call_df, logs=False)

    end_time_community_finding = time.time()
    time_secs_community_finding = np.round(end_time_community_finding - start_time_community_finding, 3)
    print(f"Execution time for community finding: {time_secs_community_finding} seconds")

    num_comms = comms_rdd.map(lambda x: x[1]).distinct().count()

    # ______________________________________________________________________________________________________
    # Step 3: Group similar communities (this step will still work with the community DataFrame)
    start_time_similarity_check = time.time()

    groups = group_similar_communities(call_df, comms_rdd, num_client, logs=False, save_to_txt=True)

    end_time_similarity_check = time.time()
    time_secs_similarity_check = np.round(end_time_similarity_check - start_time_similarity_check, 3)
    print(f"Execution time for community grouping: {time_secs_similarity_check} seconds")
    print("_________"*7)

    # ______________________________________________________________________________________________________
    # Calculate total time and store results
    total_time = np.round(
        time_secs_data_creation + time_secs_community_finding + time_secs_similarity_check, 3
    )
    print(f"Total execution time: {total_time} seconds")

    results['num_clients'].append(num_client)
    results['num_calls'].append(ncalls)
    results['call_freq'].append(call_frequency)  # Add call frequency as a fixed value
    results['time_secs_data_creation'].append(time_secs_data_creation)
    results['time_secs_community_finding'].append(time_secs_community_finding)
    results['time_secs_similarity_finding'].append(time_secs_similarity_check)
    results['time_secs_total'].append(total_time)
    results['num_comms'].append(num_comms)
    print("========="*7)

# Convert results to DataFrame and save to CSV
result_df_2 = pd.DataFrame(results)
result_df_2.to_csv('performance_results_2.csv', index=False)