In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import re
import numpy as np
import matplotlib as plt
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
from scipy.spatial.transform import Rotation as R

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import Subset
from PIL import Image

from sklearn.model_selection import GroupShuffleSplit

In [None]:
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])

        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=False):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = (DoubleConv(n_channels, 64))
        self.down1 = (Down(64, 128))
        self.down2 = (Down(128, 256))
        self.down3 = (Down(256, 512))
        factor = 2 if bilinear else 1
        self.down4 = (Down(512, 1024 // factor))
        self.up1 = (Up(1024, 512 // factor, bilinear))
        self.up2 = (Up(512, 256 // factor, bilinear))
        self.up3 = (Up(256, 128 // factor, bilinear))
        self.up4 = (Up(128, 64, bilinear))
        self.outc = (OutConv(64, n_classes))

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        logits = torch.sigmoid(logits)  #Sigmoid
        return logits


In [None]:
def read_map_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    resolution = float(lines[0].split(': ')[1])
    width = int(lines[1].split(': ')[1])
    height = int(lines[2].split(': ')[1])

    origin_x = float(lines[4].split(': ')[1])
    origin_y = float(lines[5].split(': ')[1])

    data_start_index = lines.index('Data:\n') + 1
    data = []
    for line in lines[data_start_index:]:
        data.extend([float(value) for value in line.split()])

    data = np.array(data, dtype=np.float32).reshape((height, width))
    return resolution, width, height, data, {'x': origin_x, 'y': origin_y}

def convert_coordinates(x, y, resolution, origin):
    x_map = round((x - origin['x']) / resolution)
    y_map = round((y - origin['y']) / resolution)
    return x_map, y_map

def visualize_map(data):#same as gsl_train detailed notes refer to training code

    plt.figure(figsize=(10, 10))


    data_viz = np.copy(data)
    data_viz[data == -1] = np.nan
    im = plt.imshow(data_viz, cmap='gray_r', origin='lower')


    plt.imshow(np.where(np.isnan(data_viz), 0, np.nan), cmap=mcolors.ListedColormap(['black']), origin='lower', alpha=1.0)

    y_indices, x_indices = np.where(data == 233)
    plt.scatter(x_indices, y_indices, marker='*', color='red', s=200, label='Plume Encounter')


    cbar = plt.colorbar(im)
    cbar.set_ticks([0, 116, 233])
    cbar.set_ticklabels(['Free', 'Partially Occupied', 'Occupied'], fontsize=14)

    plt.legend(loc='upper right', fontsize=18)
    plt.show()

def visualize_label_map(data,save=False):

    plt.figure(figsize=(8, 8))
    plt.imshow(data, cmap='Reds', origin='lower')
    plt.colorbar(label='Labeled Value')
    #plt.title(f'Visualization of {file_path}')
    if save:
        output_path = file_path.replace('.txt', '_visualization.png')
        plt.savefig(output_path, dpi=300)
    plt.show()
    #plt.close()

def read_baselink_to_map_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    for line in lines:
        if 'translation:' in line:
            x = float(lines[lines.index(line) + 1].split(': ')[1])
            y = float(lines[lines.index(line) + 2].split(': ')[1])

        if "rotation:" in line:
            X=float(lines[lines.index(line) + 1].split(': ')[1])
            Y=float(lines[lines.index(line) + 2].split(': ')[1])
            Z=float(lines[lines.index(line) + 3].split(': ')[1])
            W=float(lines[lines.index(line) + 4].split(': ')[1])


    return x, y, X,Y,Z,W


def modify_map(map_data, width, height, baselink_file, resolution, origin):

    x, y, X,Y,Z,W = read_baselink_to_map_file(baselink_file)
    x_map, y_map = convert_coordinates(x, y, resolution, origin)

    if 0 <= x_map < width and 0 <= y_map < height:
        map_data[y_map, x_map] = 1
    else:
        print(f"Warning: Coordinate ({x_map}, {y_map}) is out of bounds for map size ({width}, {height}).")


def mark_wind_direction(array, radians):#same as gsl_train detailed notes refer to training code

    position = np.argwhere(array == 1)
    if len(position) == 0:
        raise ValueError("Array must contain exactly one '1'.")

    pos_x, pos_y = position[0]


    wind_vector = np.array([np.sin(radians), np.cos(radians)])


    rows, cols = array.shape

    for i in range(rows):
        for j in range(cols):
            if (i, j) == (pos_x, pos_y):
                continue


            grid_vector = np.array([i - pos_x, j - pos_y])


            dot_product = np.dot(grid_vector, wind_vector)


            if dot_product < 0:
                array[i, j] = -1
            else:
                array[i, j] = 1
    return array


def sensor_direction_in_world(x, y, z, w, a):#same as gsl_train detailed notes refer to training code



    r = R.from_quat([x, y, z, w])


    sensor_direction_robot = np.array([np.cos(a), np.sin(a), 0])


    sensor_direction_world = r.apply(sensor_direction_robot)


    yaw_world = np.arctan2(sensor_direction_world[1], sensor_direction_world[0])

    return yaw_world




def write_map_file(file_path, resolution, width, height, data, origin):
    with open(file_path, 'w') as file:
        file.write(f"Resolution: {resolution}\n")
        file.write(f"Width: {width}\n")
        file.write(f"Height: {height}\n")
        file.write("Origin: position: \n")
        file.write(f"  x: {origin['x']}\n")
        file.write(f"  y: {origin['y']}\n")
        file.write("  z: 0.0\n")
        file.write("orientation: \n")
        file.write("  x: 0.0\n")
        file.write("  y: 0.0\n")
        file.write("  z: 0.0\n")
        file.write("  w: 1.0\n")
        file.write("Data:\n")

        for row in data:
            file.write(" ".join(f"{value:.2f}" for value in row) + "\n")

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(3, 1).to(device)
model.load_state_dict(torch.load("/content/drive/MyDrive/unet_model_3d_final_1.pth"))


model.eval()

base_folder_path = '/content/drive/MyDrive/OSLAM_result/windchange/'





for i in [61,62,63,64,70,71,72,73,74,75,76,77]:


    folder_name = f'epoch{i}'
    folder_path = os.path.join(base_folder_path, folder_name)
    map_files = sorted([f for f in os.listdir(folder_path) if re.match(r'^map_proceeded_\d+\.txt$', f)])
    map_files_origin=sorted([f for f in os.listdir(folder_path) if re.match(r'^map\d+\.txt$', f)])
    #label_files = sorted([f for f in os.listdir(folder_path) if re.match(r'^label\d+\.txt$', f)])
    wind_files = sorted([f for f in os.listdir(folder_path) if re.match(r'^wind_\d+\.txt$', f)])
    tf_files = sorted([f for f in os.listdir(folder_path) if re.match(r'^baselink_to_map_\d+\.txt$', f)])
    for j in range(len(map_files)):
        print(i,j)
        resolution, width, height, data, origin = read_map_file(os.path.join(folder_path, map_files[j]))
        target_shape = (279, 279)
        padding = ((0, target_shape[0] - data.shape[0]), (0, target_shape[1] - data.shape[1]))
        padded_map = np.pad(data, padding, mode='constant', constant_values=-1.0)
        #map_list.append(padded_map)
        #w.append(width)
        #h.append(height)
        #resolution, width, height, data, origin = read_map_file(os.path.join(folder_path, label_files[j]))
        #padding = ((0, target_shape[0] - data.shape[0]), (0, target_shape[1] - data.shape[1]))
        #padded_label = np.pad(data, padding, mode='constant', constant_values=0.0)
        #gt_list.append(padded_label)

        windpath = os.path.join(folder_path, wind_files[j])
        mappath = os.path.join(folder_path, map_files[j])
        tfpath = os.path.join(folder_path, tf_files[j])
        mappath_origin=os.path.join(folder_path, map_files_origin[j])
        resolution, width, height, data, origin = read_map_file(mappath)
        target_shape = (279, 279)
        padding = ((0, target_shape[0] - data.shape[0]), (0, target_shape[1] - data.shape[1]))
        padded_map = np.pad(data, padding, mode='constant', constant_values=0)#补正
        encounters=np.zeros(target_shape, dtype=int)
        for p in range(padded_map.shape[0]):
          for q in range(padded_map.shape[1]):
            if padded_map[p, q] == 233:
              encounters[p, q] = 1
        resolution, width, height, data, origin = read_map_file(mappath_origin)
        padded_map_origin=np.pad(data, padding, mode='constant', constant_values=0)#补正


        windmap=np.zeros((279,279))
        save=False
        for h in range(j+1):
          windpath = os.path.join(folder_path, wind_files[h])
          tfpath = os.path.join(folder_path, tf_files[h])

          with open(windpath, 'r') as file:
              lines = file.readlines()
          wind=lines[0]
          if wind=='0,0':
              windmap1=np.zeros((279,279))

          else:
              resolution, width, height, data, origin = read_map_file(mappath)
              windmap1=np.zeros_like(data)#生成
              modify_map(windmap1,width,height,tfpath,resolution,origin)#标注
              target_shape = (279, 279)
              padding = ((0, target_shape[0] - windmap1.shape[0]), (0, target_shape[1] - windmap1.shape[1]))
              padded_windmap = np.pad(windmap1, padding, mode='constant', constant_values=0)#补正
              x, y, X, Y, Z, W = read_baselink_to_map_file(tfpath)

              ang=-np.radians(int(wind.split(",")[1]))

              ang_wd=sensor_direction_in_world(X,Y,Z,W,ang)
              marked_map=mark_wind_direction(padded_windmap,ang_wd)
              windmap1=marked_map
          windmap+=windmap1

        if (windmap != np.zeros((279,279))).all():
          windmap=np.zeros((279,279))
          save=True


        #print(padded_map.shape,windmap.shape)
        #current_map = np.stack([padded_map, windmap], axis=0)
        current_map_3d = np.stack([padded_map_origin, windmap, encounters], axis=0)
        map_tensor=torch.tensor(current_map_3d, dtype=torch.float32).to(device).unsqueeze(0)

        if save:

          with torch.no_grad():
            output = model(map_tensor)  # 进行推理

          #print(output)
        #print(output.size())
          output_np = output.squeeze().cpu().numpy()
          top_pad, bottom_pad = padding[0]
          left_pad, right_pad = padding[1]

        # crop

          cropped_output = output_np[top_pad:output_np.shape[0]-bottom_pad, left_pad:output_np.shape[1]-right_pad]



          #processed_output = np.floor(cropped_output * 100).astype(int)#转为./map格式
          #print(processed_output)
          output_file_path = os.path.join(folder_path, f'result_nowind{j+1}.txt')
          #output_file_path = os.path.join(folder_path, f'result{j+1}.txt')

          write_map_file(output_file_path, resolution, width, height, cropped_output, origin)



        #wind_list.append(windmap)


  model.load_state_dict(torch.load("/content/drive/MyDrive/unet_model_3d_final_1.pth"))


61 0
61 1
61 2
61 3
62 0
62 1
62 2
62 3
63 0
63 1
63 2
63 3
64 0
64 1
64 2
64 3
70 0
70 1
70 2
70 3
71 0
71 1
71 2
71 3
72 0
72 1
72 2
72 3
73 0
73 1
73 2
73 3
74 0
74 1
74 2
74 3
75 0
75 1
75 2
75 3
76 0
76 1
76 2
77 0
77 1
77 2
77 3
