In [8]:
%matplotlib inline
import os
import csv
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
from osgeo import gdal
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans

In [30]:
def np_divide(a, b):
    return np.divide(a, b, out=np.zeros_like(a), where=b != 0)

class Landsat_Dataset(object):

    # 使用data_analysis/data_analysis.py的calculate_mean_std()计算得到
    mean = np.array([961.8782, 1053.7188, 1302.2211, 1573.1169, 2185.5655, 2097.9481, 1643.7379])
    std = np.array([362.9614, 395.8249, 512.3208, 599.7623, 769.1146, 734.0038, 645.0721])

    class_info = {    # 标注信息
        "shaqiu": 1,
        "xintan": 2,
        "gengdi": 3,
        "building": 4,
        "water": 5,
        "veg": 6
    }
    n_class = len(class_info.items())

    def __init__(self, dataset_dir, alpha=0.3, random_seed=2024):
        self.dataset_dir = dataset_dir
        self.val_rate = alpha
        self.rand_seed = random_seed
        self.train_set, self.val_set = self._load_dataset()
        self.train_data_len = len(self.train_set)
        self.valid_data_len = len(self.val_set)

    def _load_dataset(self):
        fids = [f for f in os.listdir(os.path.join(self.dataset_dir, 'images')) if f[-4:] == ".tif"]
        class_sets = defaultdict(list)
        # 读取数据并按类别收集样本
        for f in fids:
            img, _, _ = read_gdal(os.path.join(self.dataset_dir, 'images', f))
            label, _, _ = read_gdal(os.path.join(self.dataset_dir, 'labels', f))
            assert img.shape[0:2] == label.shape[0:2], "样本数据中影像和标签栅格行列数必须一致"

            h, w, c = img.shape
            img = img.reshape((h*w, c))
            label = label.reshape((-1))
            for cls_name in self.class_info.keys():
                _indexes = (label == self.class_info[cls_name])
                _data = img[_indexes]
                class_sets[cls_name].extend(list(_data))
        
        if self.rand_seed is not None:
            random.seed(self.rand_seed)
            
        if True:
            negative_num = len(class_sets['target']) * 10
            _list = class_sets['others'].copy()
            class_sets['others'] = random.sample(_list, negative_num)

        # 每个类别按统一比例分为训练/验证样本数据
        train_set = defaultdict(list)
        val_set = defaultdict(list)
        print("{:<15} {:<10} {:<10}".format("class", "train", "val"))
        for cls_name, cls_id in self.class_info.items():
            _data = class_sets[cls_name]
            _total_num = len(_data)
            random.shuffle(_data)
            _train_num = int(_total_num * (1 - self.val_rate))
            _train_data = _data[:_train_num]
            _val_data = _data[_train_num:]
            train_set[cls_name].extend(_train_data)
            val_set[cls_name].extend(_val_data)
            print("{:<15} {:<10d} {:<10d}".format(cls_name, len(_train_data), len(_val_data)))
        return train_set, val_set

    @classmethod
    def Normalize(cls, img_data):
        return (img_data - cls.mean) / cls.std

    @classmethod
    def NDWI(cls, img_data):
        # img_data (N, 7)
        # NDWI: (G-NIR)/(G+NIR)
        g = img_data[..., 2].astype(np.float32)
        nir = img_data[..., 4].astype(np.float32)
        ndwi = np_divide((g - nir), (g + nir))
        return ndwi

    @classmethod
    def NDVI(cls, img_data):
        # img_data (N, 7)
        # NDVI: (NIR-R)/(NIR+R)
        r = img_data[..., 3].astype(np.float32)
        nir = img_data[..., 4].astype(np.float32)
        ndvi = np_divide((nir - r), (nir + r))
        return ndvi

    @classmethod
    def transform(cls, img_data, pca=None):
        ndwi = cls.NDWI(img_data)
        # ndvi = cls.NDVI(img_data)
        data = cls.Normalize(img_data)
        if pca is not None:
            data = pca.transform(data)
        data = np.concatenate([data, ndwi], axis=-1)
        return data
    
    def get_class_feature(self, class_name=''):
        if class_name in self.class_info:
            train_data = self.train_set[class_name]
            val_data = self.val_set[class_name]
        return np.array(train_data + val_data)

    def _get_samples(self, dataset, num_per_cls):
        image_data = []
        label_data = []
        if self.rand_seed is not None:
            random.seed(self.rand_seed)

        tmp = []
        for cls_name, cls_id in self.class_info.items():
            cls_data = dataset[cls_name]
            cls_label = list(np.ones(len(cls_data), np.uint8) * (cls_id - 1))
            cls_samples = list(zip(cls_label, cls_data))
            if num_per_cls is not None:
                _num = min(num_per_cls, len(cls_samples))
                cls_samples = random.sample(cls_samples, _num)
            tmp.extend(cls_samples)
        random.shuffle(tmp)

        for label_item, data_item in tmp:
            image_data.append(data_item)
            label_data.append(label_item)

        image_data, label_data = np.array(image_data), np.array(label_data)
        return image_data, label_data

    def get_train_set(self, num_per_cls=None):
        ''' data for trainning '''
        x_train, y_train = self._get_samples(self.train_set, num_per_cls)
        return x_train, y_train

    def get_val_set(self, num_per_cls=None):
        ''' data for valuation '''
        x_val, y_val = self._get_samples(self.val_set, num_per_cls)
        return x_val, y_val
    
    
def read_gdal(path):
    image = gdal.Open(path)  # 打开该图像
    if image == None:
        print(path + "文件无法打开")
        return
    img_w = image.RasterXSize  # 栅格矩阵的列数
    img_h = image.RasterYSize  # 栅格矩阵的行数
    # im_bands = image.RasterCount  # 波段数
    im_proj = image.GetProjection()  # 获取投影信息
    im_geotrans = image.GetGeoTransform()  # 仿射矩阵
    im_data = image.ReadAsArray(0, 0, img_w, img_h)
    if len(im_data.shape) == 2:
        im_data = im_data[np.newaxis, :, :]
    im_data = im_data.transpose((1, 2, 0))
    return im_data, im_proj, im_geotrans    

In [25]:
def calculate_mean_std(image, nodata=None):
    channel_num = image.shape[-1]
    if nodata is not None:
        nodata_mask = np.all(image==nodata, axis=-1)
        image = image[~nodata_mask]
    means, stdevs = [], []
    for i in range(channel_num):
        pixels = image[..., i].ravel()
        means.append(np.mean(pixels))
        stdevs.append(np.std(pixels))

    return means, stdevs

In [None]:
x_train, y_train = dataset.get_train_set()
x_val, y_val = dataset.get_val_set()
data = np.concatenate([x_train, x_val], axis=0)
calculate_mean_std(data)

In [12]:
img, _, _ = read_gdal(r"C:\Users\Admin\Desktop\HJN\data\image\cut.tif")
print(img.shape)

(1134, 1678, 7)


In [26]:
means, stdevs = calculate_mean_std(img)
print('mean:', np.round(means, 4))
print('std:', np.round(stdevs, 4))

mean: [ 961.8782 1053.7188 1302.2211 1573.1169 2185.5655 2097.9481 1643.7379]
std: [362.9614 395.8249 512.3208 599.7623 769.1146 734.0038 645.0721]


In [28]:
np.round(means, 4)

array([ 961.8782, 1053.7188, 1302.2211, 1573.1169, 2185.5655, 2097.9481,
       1643.7379])

In [29]:
np.round(stdevs, 4)

array([362.9614, 395.8249, 512.3208, 599.7623, 769.1146, 734.0038,
       645.0721])

In [31]:
ld = Landsat_Dataset("C:\\Users\\Admin\\Desktop\\HJN\\datasets\\v1")

class           train      val       
shaqiu          667        286       
xintan          7422       3181      
gengdi          1071       459       
building        625        268       
water           2480       1063      
veg             1026       441       
