### Graph representaion class with NetworkX package
*requests as input a list of edges with a possible rating in the third column, or matrix market format*

In [10]:
import csv, enum, inspect
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from dataclasses import dataclass
from scipy.io import mmread
from networkx.algorithms import community as cmnt


class Graph:
    """
    Graph representation using NetworkX accepting two types of data files as input
    [csv, mtx (coordinate format!)]
    """

    G = nx.Graph()  # networkx graph representation
    __communities = {"k_clique": None, "louvain": None, "kernighan": None}

    class InputType:
        """Enum which represent extension of data file"""

        class Extension(enum.Enum):
            CSV = "CSV"
            MTX = "MATRIX_MARKET"

        class ColumnTypes(enum.Enum):
            SOURCE = "SOURCE"
            TARGET = "TARGET"
            WEIGHT = "WEIGHT"

        __extension = None
        __file_structure = ()

        def __init__(self, extension: Extension, data_type: tuple | None = None):
            if data_type:
                if not isinstance(data_type, tuple):
                    data_type = (data_type,)
                assert (
                    len(data_type) >= 2
                ), f"at least two types of columns are required"
                assert all(isinstance(val, self.ColumnTypes) for val in data_type), (
                    f"{list(inspect.signature(self.__init__).parameters.keys())[1]} "
                    f"must be instance of {self.ColumnTypes.__class__}"
                )
                assert len(set(data_type)) == len(data_type), (
                    f"column type values {[val.name for val in data_type]} must be mutually "
                    f"disjunctive"
                )
            self.__extension = extension
            self.__file_structure = data_type

        @property
        def ext(self) -> Extension:
            return self.__extension

        @property
        def struct(self) -> dict:
            return self.__file_structure

    def __init__(
        self,
        file_path: str,
        type: InputType.Extension,
        struct: tuple | None = (),
        delim: str | None = ",",
        has_header: bool | None = False,
    ):
        match type:
            case self.InputType.Extension.CSV:
                with open(file_path) as f:
                    data = csv.reader(f, delimiter=delim)
                    for i, row in enumerate(data):
                        if has_header and i == 0:
                            continue
                        data = {"weight": 1}
                        try:
                            for n in range(len(struct)):
                                self.__process_column_type(data, row, struct, n)
                        except IndexError as ex:
                            raise Exception(
                                f"you cannot define [{len(struct)}] types of columns, "
                                f"but your file has only [{len(row)}] columns"
                            )
                        self.G.add_edge(
                            u_of_edge=data["source"],
                            v_of_edge=data["target"],
                            weight=data["weight"],
                        )
            case self.InputType.Extension.MTX:
                try:
                    self.G = nx.from_scipy_sparse_array(mmread(file_path))
                except:
                    raise Exception(
                        f"inputed file {file_path} is not in standardized MTX format, "
                        f"which starts with `%%MatrixMarket` sequence - check your file"
                    )

    @property
    def communities(self) -> dict:
        pass
        # if self.__communities.get("k_clique")
        # return

    def __process_column_type(
        self, data: dict, row: list, struct: tuple, i: int
    ) -> None:
        if struct[i].name == self.InputType.ColumnTypes.SOURCE.name:
            data["source"] = row[i]
        if struct[i].name == self.InputType.ColumnTypes.TARGET.name:
            data["target"] = row[i]
        if struct[i].name == self.InputType.ColumnTypes.WEIGHT.name:
            data["weight"] = row[i]

### Instantiate graph class and show its content

In [None]:
type = Graph.InputType(extension=Graph.InputType.Extension.MTX)
g = Graph(file_path="Emails.mtx", type=type.ext)
g.G.nodes(data=True)
g.G.edges(data=True)

In [15]:
class Utilities:
    def __init__(self):
        pass

    def starts_with_comment(self, line):
        symbols = ['#', '*', '"']
        for symbol in symbols:
            if line.startswith(symbol):
                return False
        return True

    def get_data(self, path: str = None, filetype: str = '.txt', encoding: str = 'utf-8', delimiter: str = ','):
        '''Retrieves data from given path.

        Args:
            path (str, required): Path of the supplied file in string format. Defaults to None (leaving path as None raises a ValueError).
            filetype (str, optional): Filetype of the supplied file. Must be correctly set to correspond with the actual filetype, otherwises raises a ValueError. Defaults to '.txt'.
            encoding (str, optional): Encoding of the file. Defaults to 'utf-8'.
            delimiter (str, optional): Separator for when supplied file is of .csv type. Defaults to '\t'.

        Raises:
            ValueError: When path (required argument) is not specified or set to an unsupported type.

        Returns:
            list: A list of rows returned from supplied file - format of returned rows is dependend on chosen filetype.
        '''
        self.path = path
        self.filetype = filetype
        self.encoding = encoding
        with open(path, 'r', encoding=encoding) as r:
            if filetype == '.txt':
                data = r.readlines()
                return np.array(
                    [[int(y) for y in x.split()] for x in data if
                     self.starts_with_comment(x)])

            elif filetype == '.csv':
                data = csv.reader(r, delimiter=delimiter)
                return np.array(
                    [[int(y) for y in x] for x in data if
                     self.starts_with_comment(x[0])])

            else:
                pass

def get_graph(data):
    graph = nx.Graph()
    try:
        graph.add_edges_from(data)
    except [ValueError, TypeError]:
        exit('Wrong data format.')
    return graph

data = Utilities()
data = data.get_data(path="Email-Enron.txt")
g = get_graph(data)

### Computation blocks

In [46]:
# Louvain-Method
community = cmnt.label_propagation_communities(g)
modularita = cmnt.modularity(g, community)

In [48]:
community = cmnt.louvain_communities(g, seed=100)
modularita = cmnt.modularity(g, community)

### Printing information block

In [43]:
# SUMMARY
print(len(community))
print(sum(len(c) for c in community) / len(community))
print(min(len(c) for c in community))
print(max(len(c) for c in community))
print(modularita)

1236
29.686084142394822
2
5131
0.6159506101187202


In [40]:
# BAR PLOT
%matplotlib qt
data = {}
for i, x in enumerate(community):
    data[f"{i}"] = len(x)
communities = list(data.keys())
length = list(data.values())

plt.figure(figsize=(10, 5))
plt.bar(communities, length, width=0.4)
plt.ylabel("Size of the community")
plt.xlabel("No. of community")
plt.title("Distribution of community size")
plt.xticks(rotation=30)
plt.tick_params(bottom=False, labelbottom=False)
plt.show()

In [50]:
# GRAPH VIZUALIZATION
def assign_node_comm(G, communities) -> None:
    for i, x in enumerate(communities):
        for j in x:
            G.nodes[j]["community"] = i + 1


def assign_edge_comm(G) -> None:
    for (
        i,
        x,
    ) in G.edges:
        if G.nodes[i]["community"] == G.nodes[x]["community"]:
            G.edges[i, x]["community"] = G.nodes[i]["community"]
        else:
            G.edges[i, x]["community"] = 0


assign_node_comm(g, community)
assign_edge_comm(g)

name = input("Choose a name for community visualisation GEXF file")
nx.write_gexf(g, f"{name}.gexf")