In [None]:
import os
import numpy as np
import pandas as pd
import ast
import matplotlib.pyplot as plt

# Utility Functions

In [164]:
def Subset(E,t):
    if(isinstance(t, list)):
        return E[E['type'].isin(t)]
    else:
        return E[E.type==t]

def euclidean_distance(a, b):
    return np.linalg.norm(np.array(a) - np.array(b))

def Dist(ei, ej):
    # Convert single points to a list for uniform handling
    if not isinstance(ei, list):
        ei = [ei]
    if not isinstance(ej, list):
        ej = [ej]
    # Calculate minimum distance
    return min(euclidean_distance(a, b) for a in ei for b in ej)

def Closest(e, S, R=np.inf):
    min_distance = [float('inf'),float('inf')]
    closest_point = [-1,-1]

    for i,s in S.iterrows():
        if(e.name != s.name):
            if(isinstance(e.coor, tuple)):
                distance = Dist(e.coor, s.coor)
                if distance <= min_distance[0] and distance < R:
                    min_distance[0] = distance
                    closest_point[0] = i
            else:
                for coor_ind in [0,-1]:
                    c = e.coor[coor_ind]
                    distance = Dist(c, s.coor)
                    if distance <= min_distance[coor_ind] and distance < R:
                        min_distance[coor_ind] = distance
                        closest_point[np.abs(coor_ind)] = i
    closest_point = [i for i in closest_point if i!=-1]
    return closest_point

def ReadFiles(folder_path, symbol='e'):
    elements = pd.DataFrame()

    for filename in sorted(os.listdir(folder_path)):
        if filename.endswith('.csv'):
            file_path = os.path.join(folder_path, filename)
            print(file_path)

            df = pd.read_csv(file_path, delimiter=';')

            df['coor'] = df['coor'].apply(ast.literal_eval)
            df['type'] = filename[2:-5]

            elements = pd.concat([elements,df], ignore_index=True)
    id_column = 'ID'
    elements[id_column] = [f"{symbol}{i+1}" for i in range(len(elements))]
    elements['connections'] = None
    elements.insert(0, id_column, elements.pop(id_column))

    return elements

def PlotElements(df):
    fig, ax = plt.subplots(figsize=(10, 8))

    for index, row in df.iterrows():
        if isinstance(row['coor'], list):
            line_coords = list(zip(*row['coor']))
            ax.plot(line_coords[0], line_coords[1], marker='o', label=row['name'])
        else:
            ax.plot(row['coor'][0], row['coor'][1], marker='o', label=row['name'])

    ax.set_xlabel('X [m]')
    ax.set_ylabel('Y [m]')
    ax.legend()

    plt.show()

def StoreHinFile(file_path,hypothetical_paths,elements):
    print("Number of hypothetical paths:", len(hypothetical_paths))
    with open(file_path, 'w') as fp:
        # fp.write("\n".join(str(item) for item in hypothetical_paths))
        fp.write("\n".join(str(elements.iloc[item].name.values) for item in hypothetical_paths))

# Raw Information

- I = {
    - i_1: A document containing the DSO's list of elements, their coordinates, and their types. Some elements may be missing, and the coordinates may be inaccurate.

    - i_2: The DSOs lack of information about the customers connections to the network and goal to minimise the cost of connecting customers to the grid. 
    - i_3: A technical meeting stating that for economical and efficiency factors, the connections between two elements must be not too long.
    - i_3: Information regarding the radial configuration of the distribution network.
    - i_4: DSOs goal to reduce energy losses by minimising the path length of each customer.
    - i_5: The knowledge that electricity distributed from one transformer cannot be transmitted to another transformer.  
}

# Well-Defined Information

- $I'$ = {
    - $i'_1 = f_1(i_1)$: Sets of elements, their properties like coordinates and types.
    - $i'_2 = f_2(i_2)$: Elements of type $customer$ are connected to the closest element of type $line$.
    - $i'_3 = f_3(i_3)$: Elements of type $transformer$, $line$ and $switch$ cannot be connected to an element whose distance calculated using the elements GIS coordinates is greater than $R$.
    - $i'_4  = f_4(i_4)$: The total length of a path, defined as the sum of the lengths of the single lines, is at most $L$.  
}

# Read Elements

In [None]:
#Set of all Real Elements. These elements represent all the elements that is possible to find in the academic example
real_elements = ReadFiles("RealElements")
real_types = set(real_elements.type)
#Set of all DSO Elements. These elements represent the elements that is DSO knows about
elements = ReadFiles("DSOElements", symbol="hat{e}")
types = set(elements.type)

In [None]:
real_elements

In [None]:
PlotElements(real_elements)

In [None]:
elements

# Hypothetical Paths

In [None]:
from itertools import permutations

def calculate_hypothetical_paths(E, C, T):
    R = list(set(E) - set(C) - set(T))
    R.sort()
    print(f"E set: {E}. Len: {len(E)}")
    print(f"C set: {C}. Len: {len(C)}")
    print(f"T set: {T}. Len: {len(T)}")
    print(f"R set: {R}. Len: {len(R)}")
    H = []

    for i in range(len(R) + 1):
        for permutation in permutations(R, i):
            for c in C:
                for t in T:
                    path = [c] + [p for p in permutation] + [t]
                    H.append(path)

    return H

E = elements.index.values
C = Subset(elements,'customer').index.values
T = Subset(elements,'transformer').index.values

hypothetical_paths = calculate_hypothetical_paths(E, C, T)

In [None]:
StoreHinFile('hypotheticalpaths.txt', hypothetical_paths, elements)

# Hypothetical Paths Compatible with the Well-Defined Information

## $H^{i'_2}$

In [None]:
elements = ReadFiles("DSOElements", symbol="hat{e}")

In [None]:
for i,c in Subset(elements, "customer").iterrows():
    possible_elements = Subset(elements, list(types.difference(["customer"])))
    closest_element = Closest(c, possible_elements)
    elements.at[i, 'connections'] = closest_element
    elements.at[closest_element[0], 'connections'] = [i]

In [None]:
hp = hypothetical_paths
compatible_hp = list(hp)
length_H_before = len(hp)
print(f'Length H before: {length_H_before}')

paths_removed = 0
for id_path,h in enumerate(hp):
    for id_customer,c in Subset(elements, "customer").iterrows():
        if(h[0]==id_customer and h[1]!=c['connections'][0]):
            compatible_hp.pop(id_path-paths_removed)
            paths_removed+=1
length_H_after = len(compatible_hp)
print(f'Length H after: {length_H_after}. Reduction: {((length_H_before-length_H_after)/length_H_before*100):.2f}%')
compatible_hypothetical_paths_i2 = compatible_hp

## $H^{i'_2, i'_3}$

In [None]:
elements

In [None]:
R = 40
for t in list(types.difference(["customer"])):
    for index,elem in Subset(elements, t).iterrows():
        possible_elements = Subset(elements, list(types.difference(["customer"])))
        closest_elements = Closest(elem, possible_elements, R=R)
        
        #Adding connections to element: elem
        existing_connections = elem.connections
        if(existing_connections):
            for c in closest_elements:
                if(c not in existing_connections):
                    existing_connections.append(c)
            elements.at[index, f'connections'] = existing_connections
        else:
            elements.at[index, f'connections'] = closest_elements

        #Adding connections to element(s) in closest_elements
        for c in closest_elements:
            existing_connections = elements.iloc[c].connections
            if(existing_connections):
                if(index not in existing_connections):
                    existing_connections.append(index)
                elements.at[c, f'connections'] = existing_connections
            else:
                elements.at[c, f'connections'] = [index]

In [None]:
hp = compatible_hypothetical_paths_i2
compatible_hp = list(hp)
length_H_before = len(hp)
print(f'Length H before: {length_H_before}')
ds = []
paths_removed = 0
for id_path,h in enumerate(hp):
    for i in range(1,len(h)-1):
        h_ei = elements.iloc[h[i]].coor
        h_ej = elements.iloc[h[i+1]].coor
        d = Dist(h_ei,h_ej)
        ds.append(d)
        if(d>R):
            compatible_hp.pop(id_path-paths_removed)
            paths_removed+=1
            break
            
length_H_after = len(compatible_hp)
print(f'Length H after: {length_H_after}. Reduction: {((length_H_before-length_H_after)/length_H_before*100):.2f}%')
compatible_hypothetical_paths_i2i3 = compatible_hp

In [None]:
plt.scatter(x=list(range(len(ds))), y=ds)

## $H^{i'_2, i'_3, i'_4}$

In [None]:
L = 300

hp = compatible_hypothetical_paths_i2i3
compatible_hp = list(hp)
length_H_before = len(hp)
print(f'Length H before: {length_H_before}')

paths_removed = 0
for id_path,h in enumerate(hp):
    l = 0
    for i in range(len(h)-1):
        h_ei = elements.iloc[h[i]].coor
        h_ej = elements.iloc[h[i+1]].coor
        d = Dist(h_ei,h_ej)
        l += d
        if(isinstance(h_ei,list)):
            l+=Dist(h_ei[0],h_ei[-1])

    if(l>L):
        compatible_hp.pop(id_path-paths_removed)
        paths_removed+=1
            
length_H_after = len(compatible_hp)
print(f'Length H after: {length_H_after}. Reduction: {((length_H_before-length_H_after)/length_H_before*100):.2f}%')
compatible_hypothetical_paths_i2i3i4 = compatible_hp

## $H^{I'}$

In [None]:
hp = compatible_hypothetical_paths_i2i3
compatible_hp = list(hp)
length_H_before = len(hp)
print(f'Length H before: {length_H_before}')

paths_removed = 0
for id_path,h in enumerate(hp):
    l = 0
    for i in range(len(h)):
        if(isinstance(h_ei,list)):
            l+=Dist(h_ei[0],h_ei[-1])

    if(l>L):
        compatible_hp.pop(id_path-paths_removed)
        paths_removed+=1
            
length_H_after = len(compatible_hp)
print(f'Length H after: {length_H_after}. Reduction: {((length_H_before-length_H_after)/length_H_before*100):.2f}%')
compatible_hypothetical_paths_i2i3i4 = compatible_hp

In [None]:
compatible_hypothetical_paths_i2i3i4