In [2]:
import os
import pickle
import numpy as np
import scipy.io as sio
from tqdm.auto import tqdm

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.patches import Rectangle
import importlib
import RPLAN_Toolbox.rplan.floorplan as floorplan
import RPLAN_Toolbox.rplan.align as align
import RPLAN_Toolbox.rplan.measure as measure



importlib.reload(floorplan)   # forces reload from disk
importlib.reload(align)   # forces reload from disk
importlib.reload(measure)   # forces reload from disk


from config import data_path
from RPLAN_Toolbox.rplan.floorplan import Floorplan
from RPLAN_Toolbox.rplan.align import align_fp_gt
from RPLAN_Toolbox.rplan.decorate import get_dw
from RPLAN_Toolbox.rplan.measure import compute_tf, sample_tf, compute_tf_dist
from RPLAN_Toolbox.rplan.plot import get_figure,get_axes,plot_category,plot_boundary,plot_graph,plot_fp,plot_tf

# Split data for (testing) and (retrieval + training)

In [None]:
folder = ".\\raw_data\\floorplan_dataset"
png_file_names = [f.split('.')[0] for f in os.listdir(folder) if f.endswith('.png')]

In [6]:
len(png_file_names)

80788

In [7]:
import random

random.shuffle(png_file_names)
test_files = png_file_names[:int(len(png_file_names) * 0.1)]
train_files = png_file_names[int(len(png_file_names) * 0.1):]

with open(".\\processed_data\\test.txt", "w") as f:
    for name in test_files:
        f.write(name + "\n")

with open(".\\processed_data\\train.txt", "w") as f:
    for name in train_files:
        f.write(name + "\n")

print(f"Saved {len(test_files)} file names to test.txt")
print(f"Saved {len(train_files)} file names to train.txt")

Saved 8078 file names to test.txt
Saved 72710 file names to train.txt


# Extract .png data and save to .mat

In [10]:
names_train = open('./processed_data/train.txt').read().split('\n')
names = names_train

In [19]:
from tqdm.auto import tqdm
# point in xyxy format

mat_structs = []

for i in tqdm(range(len(names) - 1)):
    file_path = f"./raw_labeled_data/floorplan_dataset/{names[i]}.png"
    try:
        fp = Floorplan(file_path)
    except:
        continue
    data = fp.to_dict()
    boxes_aligned, order, room_boundaries = align_fp_gt(data['boundary'],data['boxes'],data['types'],data['edges'])
    data['boxes_aligned'] = boxes_aligned
    data['order'] = order
    data['room_boundaries'] = room_boundaries

    mat_struct = {}
    mat_struct['boundary'] = np.array(data['boundary'])
    mat_struct['name'] = data['name']
    mat_struct['rType'] = np.array(data['types'])
    mat_struct['rEdge'] = np.array(data['edges'])
    mat_struct['gtBox'] = np.array(data['boxes'])
    mat_struct['gtBoxNew'] = np.array(data['boxes_aligned'])
    mat_struct['order'] = np.array(data['order'])
    mat_struct['rBoundary'] = np.array(data['room_boundaries'])

    mat_structs.append(mat_struct)

sio.savemat('./processed_data/data.mat', {'data': mat_structs})

100%|██████████| 72710/72710 [1:46:14<00:00, 11.41it/s]  


# From labeled images to .jpg boundary-only images

In [None]:
from skimage import io
import cv2

import numpy as np

def labeled_png_to_boundary_jpg(png_path, save_path):
    """
    Convert 4-channel labeled PNG to boundary-only JPG (black-white).
    Boundary = black, background = white.
    """

    img = io.imread(png_path)
    if img is None or img.shape[2] < 3:
        raise ValueError(f"Image {png_path} is not a valid 4-channel labeled PNG")

    boundary_img = img[:, :, 0]

    # Save as JPG
    cv2.imwrite(save_path, boundary_img)


In [67]:
names_test = open('./data_processed/test.txt').read().split('\n')
for i in tqdm(range(len(names_test) - 1)):
    labeled_png_to_boundary_jpg(f"./data_raw_labeled/floorplan_dataset/{names_test[i]}.png", f"./data_test/{names_test[i]}.jpg")
    # break

100%|██████████| 8078/8078 [01:45<00:00, 76.58it/s] 


# 1.tf_train.py

In [62]:
# load data
data = sio.loadmat(data_path, squeeze_me=True, struct_as_record=False)['data']
data_dict = {d.name:d for d in data}

names_train = open('./data/train.txt').read().split('\n')[:10]
names_test = open('./data/test.txt').read().split('\n')[:10]
n_train = len(names_train)
n_test = len(names_test)

# turning function: training data
trainTF = []
tf_train = []
for i in tqdm(range(n_train)):
    boundary = data_dict[names_train[i]].boundary
    x,y = compute_tf(boundary)
    trainTF.append({'x':x,'y':y})
pickle.dump(trainTF,open('./data/trainTF.pkl','wb'))

tf_train = []
for i in tqdm(range(n_train)):
    x,y = trainTF[i]['x'],trainTF[i]['y']
    tf_train.append(sample_tf(x,y))
tf_train = np.stack(tf_train,axis=0)
np.save('./data/tf_train.npy',tf_train)
      
# turning function: testing data                   
testTF = []
for i in tqdm(range(n_test)):
    boundary = data_dict[names_test[i]].boundary
    x,y = compute_tf(boundary)
    testTF.append({'x':x,'y':y})
pickle.dump(testTF,open('./data/testTF.pkl','wb'))

100%|██████████| 10/10 [00:00<00:00, 2204.40it/s]
100%|██████████| 10/10 [00:00<00:00, 6580.33it/s]
100%|██████████| 10/10 [00:00<00:00, 2649.09it/s]


In [63]:
# turning function distance: test-train
print('Computing turning function distance ... it will take a long time.')
D_test_train = np.zeros((n_test,n_train),dtype='float32')
for i in tqdm(range(n_test)):
    for j in range(n_train):
        D_test_train[i,j] = compute_tf_dist(testTF[i],trainTF[j])
np.save('./data/D_test_train.npy',D_test_train)


Computing turning function distance ... it will take a long time.


100%|██████████| 10/10 [00:00<00:00, 1221.90it/s]


# 2.data_train_converted.py

In [64]:
# load data
data = sio.loadmat(data_path, squeeze_me=True, struct_as_record=False)['data']
data_dict = {d.name:d for d in data}

names_train = open('./data/train.txt').read().split('\n')[:10]
n_train = len(names_train)

trainTF = pickle.load(open('./data/trainTF.pkl','rb'))

data_converted = []

for i in tqdm(range(n_train)):
    d = data_dict[names_train[i]]
    d_converted = {}
    d_converted['name'] = d.name
    d_converted['boundary'] = d.boundary
    d_converted['boxes_aligned'] = np.concatenate([d.boxes_aligned,d.types[:,None]],axis=-1)
    d_converted['order'] = d.order
    d_converted['edges'] = d.edges
    d_converted['room_boundaries'] = d.room_boundaries
    data_converted.append(d_converted)

sio.savemat('./data/data_train_converted.mat',{'data':data_converted,'nameList':names_train,'trainTF':trainTF})
data = sio.loadmat('./data/data_train_converted.mat', squeeze_me=True, struct_as_record=False)
pickle.dump(data,open('./data/data_train_converted.pkl','wb'))

100%|██████████| 10/10 [00:00<00:00, 85598.04it/s]


# 3.rNum_train.py

In [65]:
data = sio.loadmat(data_path, squeeze_me=True, struct_as_record=False)['data']
data_dict = {d.name:d for d in data}

names_train = open('./data/train.txt').read().split('\n')[:10]
n_train = len(names_train)

rNum = np.zeros((n_train,14),dtype='uint8')     # 13 rooms, each floorplan in each row
for i in tqdm(range(n_train)):
    rType = data_dict[names_train[i]].types
    for j in range(13):
        rNum[i,j] = (rType==j).sum()
    
    # count the number of important rooms
    rNum[i,13] = rNum[i,[1,5,6,7,8]].sum()

np.save('./data/rNum_train.npy',rNum)

100%|██████████| 10/10 [00:00<00:00, 11060.93it/s]


# 4.data_train_eNum.py

In [66]:
import pickle
import numpy as np
from tqdm import tqdm

# Load converted training data (a list of dicts, each floorplan as one dict)
data = pickle.load(open('./data/data_train_converted.pkl','rb'))['data']

# Read list of training names
names_train = open('./data/train.txt').read().split('\n')[:10]
n_train = len(names_train)

# Initialize storage for edge statistics
# Shape: (n_train, 25) because each floorplan produces a 5x5 adjacency matrix flattened to 25 values
eNum = np.zeros((n_train,25),dtype='uint8')

for i in tqdm(range(n_train)):
    d = data[i]

    # rType: room type IDs (last column of box array)
    rType = d.boxes_aligned[:,-1]

    # eType: the room types at both ends of each edge
    eType = rType[d.edges[:,:2]]

    # === Map raw room types into coarser categories ===
    # This array remaps 18 room types into 10 categories (original MATLAB indexing was 1-based)
    rMap = np.array([1,2,3,4,1,2,2,2,2,5,1,6,1,10,7,8,9,10])-1 

    # Apply mapping
    edge = rMap[eType]

    # Reorder into 6 final categories: [0..5]
    reorder = np.array([0,1,3,2,4,5])
    edge = reorder[edge]

    # Only keep edges where both endpoints are between 1–5 (valid categories)
    I = (edge[:,0]<=5)&(edge[:,0]>=1)&(edge[:,1]<=5)&(edge[:,1]>=1)
    edge = edge[I,:]-1  # convert to 0-based index

    # Initialize adjacency matrix for this floorplan (5x5 categories)
    e = np.zeros((5,5),dtype='uint8') 

    # Count edges between categories
    for j in range(len(edge)):
        e[edge[j,0],edge[j,1]] += 1
        if edge[j,0] != edge[j,1]:   # if different categories, make symmetric
            e[edge[j,1],edge[j,0]] += 1
    
    # Flatten 5x5 adjacency into length-25 vector
    eNum[i] = e.reshape(-1)

# Save results: edge connection statistics for all training floorplans
pickle.dump({'eNum':eNum},open('./data/data_train_eNum.pkl','wb'))


100%|██████████| 10/10 [00:00<00:00, 17063.89it/s]


# 5.data_test_converted.py

In [67]:
data = sio.loadmat(data_path, squeeze_me=True, struct_as_record=False)['data']
data_dict = {d.name:d for d in data}
testTF = pickle.load(open('./data/testTF.pkl','rb'))
rNum = np.load('./data/rNum_train.npy')

names_train = open('./data/train.txt').read().split('\n')[:10]
names_test = open('./data/test.txt').read().split('\n')[:10]
n_train = len(names_train)
n_test = len(names_test)

D = np.load('./data/D_test_train.npy')
data_converted = []
for i in tqdm(range(n_test)):
    d = data_dict[names_test[i]]
    d_converted = {}
    d_converted['boundary'] = d.boundary
    d_converted['tf'] = testTF[i]
    topK = np.argsort(D[i])[:1000]
    d_converted['topK'] = topK
    d_converted['topK_rNum'] = rNum[topK]
    data_converted.append(d_converted)

sio.savemat('./data/data_test_converted.mat',{'data':data_converted,'testNameList':names_test,'trainNameList':names_train})
data = sio.loadmat('./data/data_test_converted.mat', squeeze_me=True, struct_as_record=False)
pickle.dump(data,open('./data/data_test_converted.pkl','wb'))


100%|██████████| 10/10 [00:00<00:00, 19897.08it/s]


# 6.cluster.py

In [68]:
import faiss
tf_train = pickle.load(open('./data/trainTF.pkl','rb'))

tf = []
for i in tqdm(range(len(tf_train))):
    tf_i = tf_train[i]
    tf.append(sample_tf(tf_i['x'],tf_i['y']))

d = 1000
tf = np.array(tf).astype(np.float32)

ncentroids = 5
niter = 200
verbose = True

kmeans = faiss.Kmeans(d, ncentroids, niter=niter, verbose=verbose,gpu=False)
kmeans.train(tf)
centroids = kmeans.centroids

index = faiss.IndexFlatL2(d)
index.add(tf)
nNN = 1000
D, I = index.search (kmeans.centroids, nNN)

np.save(f'./data/centroids_train.npy',centroids)
np.save(f'./data/clusters_train.npy',I)

100%|██████████| 10/10 [00:00<00:00, 10894.30it/s]


Clustering 10 points in 1000D to 5 clusters, redo 1 times, 200 iterations
  Preprocessing in 0.00 s
  Iteration 199 (0.25 s, search 0.05 s): objective=3214.41 imbalance=1.300 nsplit=0       


In [69]:
centroids

array([[0.       , 0.       , 0.       , ..., 6.2831855, 6.2831855,
        6.2831855],
       [0.       , 0.       , 0.       , ..., 6.2831855, 6.2831855,
        6.2831855],
       [0.       , 0.       , 0.       , ..., 6.2831855, 6.2831855,
        6.2831855],
       [0.       , 0.       , 0.       , ..., 6.2831855, 6.2831855,
        6.2831855],
       [0.       , 0.       , 0.       , ..., 6.2831855, 6.2831855,
        6.2831855]], dtype=float32)

In [70]:
centroids.shape

(5, 1000)