# 全州县地表覆盖分类变化分析

环境初始化和方法定义

In [None]:
import requests
import aie
import pprint
import json

config = None

try:
    with open('config.json', 'r') as f:
        config = json.load(f)
except FileNotFoundError:
    print("没有找到config.json文件，请手动输入token")

if config:
    token = config['token']
    aie.Authenticate(token=token)
else:
    print("请输入token")
    aie.Authenticate(token="")


aie.Initialize()

# 利用云层可信度判断并覆盖同名点的去云算法
# https://engine-aiearth.aliyun.com/docs/page/case?d=6cf4e8
def removeLandsatCloud(image):
    # cloudShadowBitMask = (1 << 8)
    # cloudsBitMask = (1 << 9)
    qa = image.select('QA_PIXEL')
    # 有关QA_PIXEL的定义
    # https://engine-aiearth.aliyun.com/#/dataset/LANDSAT_LC09_C02_T1_L2
    mask = qa.bitwiseAnd(aie.Image(1 << 3)).eq(aie.Image(0)) \
                .And(qa.bitwiseAnd(aie.Image(1 << 4)).eq(aie.Image(0)))
    # 去云但是保留云下阴影部分
    # mask = qa.bitwiseAnd(aie.Image(cloudShadowBitMask)).eq(aie.Image(0))
    return image.updateMask(mask)

# 还原影像
def applyScaleFactors(image):
    opticalBands = image.select('SR_B.').multiply(aie.Image(0.0000275)).add(aie.Image(-0.2))
    thermalBands = image.select('ST_B.*').multiply(aie.Image(0.00341802)).add(aie.Image(149.0))
    return image.addBands(opticalBands, None, True).addBands(thermalBands, None, True)

# 获取影像
def getSpringImageCollection(dataset:aie.ImageCollection, year:int) -> aie.ImageCollection:
    return dataset.filterDate(year + '-02-01', year + '-10-01') \
                    .filter(aie.Filter.lte('eo:cloud_cover', 40.0)) \
                    .sort(property='eo:cloud_cover', ascending=True) 

## 数据集检索

Landsat 9 是美国陆地卫星计划（Landsat）的第九颗卫星，于 2021年9月27日从加利福尼亚州范登堡空军基地发射成功，Landsat 9将每隔16天对地球进行一次成像，与Landsat 8存在8天的偏移。

In [None]:
geojson_url = r"https://geo.datav.aliyun.com/areas_v3/bound/450324.json"

# 全州县的GeoJSON文件，可从阿里云DataV平台下载
req = requests.get(geojson_url,headers={'Content-Type': 'application/json'})
geojson = req.json()["features"][0]["geometry"]


roi = aie.Geometry(geojson)

map = aie.Map(
    center=roi.getCenter(),
    height=300,
    zoom=8
)

vis_params = {
    'bands': ['SR_B3', 'SR_B2', 'SR_B1'],
    'min': 0.0,
    'max': 0.3,
}


# 指定检索数据集
# Lansat8以后的卫星使用的传感器不是同一款，需要重写云函数
datasets = {
    '2000': aie.ImageCollection('LANDSAT_LT05_T02_T1_L2'),
    '2010': aie.ImageCollection('LANDSAT_LT05_T02_T1_L2'),
    '2020': aie.ImageCollection('LANDSAT_LE07_E02_T1_L2'),
}

filtered_ds = {}

origin_images = {}

for year, dataset in datasets.items():
    origin_ds = datasets[year]
    
    # 获取近两年影像
    spring_ds = []
    for i in range(int(year)-5, int(year)+5 + 1):
        spring_ds.append(getSpringImageCollection(origin_ds, year))

    for i in range(1, len(spring_ds)):
        spring_ds[0].merge(spring_ds[i])

    filtered_ds[year] = spring_ds[0].filterBounds(roi)

    # 调用去云算法
    images_no_cloud = filtered_ds[year].map(removeLandsatCloud).map(applyScaleFactors)
    origin_images[year] = images_no_cloud.mosaic().clip(roi)

    map.addLayer(
        origin_images[year],
        vis_params,
        str(year) + '年影像',
        bounds=roi.getBounds()
    )

map


## 引入地表覆盖分类数据集进行分类

In [None]:
CLCD = aie.ImageCollection('DAMO_AIE_CHINA_LC') \
    .filterBounds(roi) \
    .filterDate("2020-01-01", "2020-12-31") \
    .mosaic()\

# 用Map波段作为标签
label = "Map"

# 用正则表达式选择样本数据
sample_img = origin_images['2020'].select('SR_B.*')

# 获取样本点
samples = sample_img.addBands(CLCD.select(label)) \
    .sample(region=aie.Geometry(geojson),
            scale=30,
            numPixels=10000)

# 在训练样本中增加一列随机数, 选取80%的样本为训练样本, 选取20%的样本为验证样本
sample = samples.randomColumn()
training_sample = sample.filter(aie.Filter.lte('random', 0.8))
validation_sample = sample.filter(aie.Filter.gt('random', 0.8))


# 训练分类器
trained_classifier = aie.Classifier.adaBoost(10) \
                        .train(features=samples,
                               classProperty=label,
                               inputProperties=['SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B7'])


train_accuracy = trained_classifier.confusionMatrix()
print('Training error matrix:')
pprint.pprint(train_accuracy.getInfo())
print('Training overall accuracy:', train_accuracy.accuracy().getInfo())


# 使用验证集对分类器进行评估
validation = validation_sample.classify(trained_classifier)
# 验证集混淆矩阵包含的类别，对应class_values中的类别为[10, 40, 50, 60, 70]
validation_accuracy = validation.errorMatrix(label, 'classification')
print('Validation error matrix:')
pprint.pprint(validation_accuracy.getInfo())
print('Validation accuracy:', validation_accuracy.accuracy().getInfo())

classified = {}

# 使用训练好的分类器对影像进行分类
for year, image in origin_images.items():
    classified[year] = image.classify(trained_classifier).clip(roi)

# 地图渲染

In [None]:


color = ['#006400' ,'#ffbb22', '#ffff4c', '#f096ff', '#fa0000', '#b4b4b4',
         '#f0f0f0', '#0064c8', '#0096a0', '#00cf75', '#fae6a0']

map = aie.Map(
    center=roi.getCenter(),
    height=400,
    zoom=9
)

# 地表覆盖分类图层
map.addLayer(
    CLCD,
    {
        'bands': 'Map',
        'min': 1,
        'max': 9,
        'palette': color
    },
    'AI Earth中国10米地物分类数据集',
    bounds=roi.getBounds()
)

for year, image in origin_images.items():

    # 待分类影像真彩色图层
    map.addLayer(
        image,
        {
            'bands': ['SR_B3', 'SR_B2', 'SR_B1'],
            'min': 0.0,
            'max': 0.3,
        },
        str(year) + '年影像',
        bounds=roi.getBounds()
    )

    # 自适应集成分类图层
    map.addLayer(
        classified[year],
        {
            'bands': 'classification',
            'min': 0,
            'max': 10,
            'palette': color
        },
        str(year) + '年影像分类结果',
        bounds=roi.getBounds()
    )

map

# 导出数据

In [None]:
import time

tasks = []

for year, image in classified.items():
    tasks.append(aie.Export.image.toAsset(classified[year], 'classified_' + str(year), 10))
    tasks.append(aie.Export.image.toAsset(origin_images[year], 'origin_' + str(year), 10))

for task in tasks:
    task.start()

while len(tasks) > 0:
    time.sleep(5)
    finished = []
    for idx, task in enumerate(tasks):
        if not task.active():
            finished.append(idx)
    for idx in finished:
        tasks.pop(idx)

print('All tasks finished.')