1. Read the "candidate_points.shp" or "sampling_points.shp" shapefile to generate the buffer and the range of each image is determined based on the buffer.

In [None]:
import geopandas as gpd
import shapely
from shapely.geometry.polygon import Polygon
from shapely.geometry.point import Point
import matplotlib.pyplot as plt
from numpy import float64
def generating_buffer(points,out_path):
    point = gpd.read_file(points)

    point['oid'] = list(range(len(point)))
    point["x"] = point.centroid.map(lambda p: p.x)
    point['y'] = point.centroid.map(lambda p:p.y)
    x_mid = point['x'].tolist()
    y_mid = point['y'].tolist()

    # The size of the image is 200 x 200, so add 10.0 to x and y to create an array
    buffer_size = 10.0
    top_left_x = list(map(lambda x: x - buffer_size,x_mid))
    top_left_y = list(map(lambda y: y + buffer_size,y_mid))
    top_left_xy = list(zip(top_left_x,top_left_y))
    top_right_x = list(map(lambda x: x + buffer_size,x_mid))
    top_right_y = list(map(lambda y: y + buffer_size,y_mid))
    top_right_xy = list(zip(top_right_x,top_right_y))
    bottom_right_x = list(map(lambda x: x + buffer_size,x_mid))
    bottom_right_y = list(map(lambda y: y - buffer_size,y_mid))
    bottom_right_xy = list(zip(bottom_right_x,bottom_right_y))

    bottom_left_x = list(map(lambda x: x - buffer_size,x_mid))
    bottom_left_y = list(map(lambda y: y - buffer_size,y_mid))
    bottom_left_xy = list(zip(bottom_left_x,bottom_left_y))
    my_data_copy = point
    for i in range(len(my_data_copy)):
        point_list = [top_left_xy[i],top_right_xy[i],bottom_right_xy[i],bottom_left_xy[i]]
        my_data_copy['geometry'][i] = Polygon(point_list)

    my_data_copy.to_file(out_path)
    
if __name__ == '__main__':
    points = '/path/XXX'
    out_path = '/path/XXX'
    generating_buffer(points,out_path)

2. Training data preparation. 70% of the random 10000 sampled data was used as the training set, including 3390 paved and 3610 unpaved. (It can be operated in ArcGIS. The result has been included in the file)

3. Model training. Based on our sample points, Qgis was used for online training using Google satellite mapping service. 

3. In order to run this code in Qgis, you need to install the relevant packages in Qgis's python environment.

3. This is an example using the vgg16 net, similar as other models.

In [None]:
import processing
from shapely.geometry.polygon import Polygon
from shapely.geometry.point import Point
import geopandas as gpd
import torch
import os
import torch.nn as nn
import numpy as np
import torch.optim as optim
from PIL import Image
import torchvision.models as models
import torchvision.datasets
import torchvision.transforms as transforms 
import time
from tempfile import TemporaryDirectory


def training_model(sampling_path,out_dir):
    
    train_transform = transforms.Compose([
        transforms.RandomAffine(degrees=15,scale=(0.8,1.5)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    learning_rate = 0.0001
    ex_batch_size = 64
    all_epoch = 50

    data = gpd.read_file(sampling_path)

    with TemporaryDirectory() as dirname:
        #print('dirname is:', dirname)
        os.makedirs(dirname+"/paved")
        os.makedirs(dirname+"/unpaved")
        
        for i in range(len(data)):
            pid = data['oid_1'][i]
            label = data['gt'][i]
            xx, yy = data['geometry'][i].exterior.coords.xy
            extent_0 = np.unique(np.array(xx))[0]+0.1 
            extent_1 = np.unique(np.array(xx))[1]-0.2  
            extent_2 = np.unique(np.array(yy))[0]+0.1  
            extent_3 = np.unique(np.array(yy))[1]-0.2  
            extent = str(extent_0) + ',' + str(extent_1) + ',' +  str(extent_2) + ',' + str(extent_3) + ' [EPSG:3857]'
            if label == 0:
                temp_path = dirname +'/paved/'+ str(pid) + '.tif'           
            if label == 1:
                temp_path = dirname +'/unpaved/'+ str(pid) + '.tif'

            if os.path.exists(temp_path):
                pass
            else:
                processing.run("native:rasterize", {'EXTENT':extent,'EXTENT_BUFFER':0,'TILE_SIZE':200,'MAP_UNITS_PER_PIXEL':0.1,'MAKE_BACKGROUND_TRANSPARENT':False,'MAP_THEME':None,'LAYERS':['type=xyz&zmin=0&zmax=20&url=https://mt1.google.com/vt/lyrs%3Ds%26x%3D{x}%26y%3D{y}%26z%3D{z}'],'OUTPUT':temp_path})

        trainset = torchvision.datasets.ImageFolder(root=dirname,transform = train_transform)
        train_loader = torch.utils.data.DataLoader(trainset,batch_size = ex_batch_size,shuffle = True,num_workers = 0)
    
        device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        model = models.vgg16(pretrained = True)
        if not os.path.isdir(out_dir):
            os.mkdir(out_dir)

        model.classifier._modules['6'] = nn.Linear(4096,2)
        model.to(device)

        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
    
        since = time.time()
        for epoch in range(all_epoch):
            # initialize 
            train_loss = 0.0
            train_accuracy = 0.0
            run_accuracy = 0.0
            run_loss =0.0
            total = 0.0
            model.train()
            for i,data in enumerate(train_loader,0):
                images, labels = data
                images = images.to(device)
                labels = labels.to(device)  

                optimizer.zero_grad()
                outs = model(images)
                loss = criterion(outs, labels)
                loss.backward()
                optimizer.step()

                total += labels.size(0)
                run_loss += loss.item()
                _,prediction = torch.max(outs,1)
                run_accuracy += (prediction == labels).sum().item()
                if i % 20 == 19:
                    print('epoch {},iter {},train accuracy: {:.4f}%   loss:  {:.4f}'.format(epoch, i+1, 100*run_accuracy/(labels.size(0)*20), run_loss/20))
                    train_accuracy += run_accuracy
                    train_loss += run_loss
                    run_accuracy, run_loss = 0., 0.
                
            time_elapsed = time.time() - since
            print('Epoch Finished in： {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed%60))
            if(epoch == 49):
                epoch_name = out_dir + 'epoch_50.pth'
                torch.save(model.state_dict(), epoch_name)

if __name__ == '__main__':
    sampling_path = '.../path/XXX' #generated buffer file from sampling_points.shp  ---absolute path
    out_dir = '.../path/XXX' #result weight file path
    training_model(sampling_path,out_dir)

4. Model Predict

In [None]:
import processing
from shapely.geometry.polygon import Polygon
from shapely.geometry.point import Point
import geopandas as gpd
import numpy as np
from PIL import Image
from tempfile import TemporaryDirectory
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.models as models


def prediction(weight_path,input_dir,out_path):

    data = gpd.read_file(input_dir)

    transform= transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    model = models.vgg16(pretrained = False,num_classes=2)
    model.to(device)
    model.load_state_dict(torch.load(weight_path))
    
    pred_list = []

    for i in range(len(data)):
        with TemporaryDirectory() as dirname:
            #print('dirname is:', dirname)
            pid = data['oid_1'][i]
            xx, yy = data['geometry'][i].exterior.coords.xy
            extent_0 = np.unique(np.array(xx))[0]+0.1 
            extent_1 = np.unique(np.array(xx))[1]-0.2  
            extent_2 = np.unique(np.array(yy))[0]+0.1  
            extent_3 = np.unique(np.array(yy))[1]-0.2  
            extent = str(extent_0) + ',' + str(extent_1) + ',' +  str(extent_2) + ',' + str(extent_3) + ' [EPSG:3857]'
            temp_path = dirname +'/'+ str(pid) + '.tif'
            processing.run("native:rasterize", {'EXTENT':extent,'EXTENT_BUFFER':0,'TILE_SIZE':200,'MAP_UNITS_PER_PIXEL':0.1,'MAKE_BACKGROUND_TRANSPARENT':False,'MAP_THEME':None,'LAYERS':['type=xyz&zmin=0&zmax=20&url=https://mt1.google.com/vt/lyrs%3Ds%26x%3D{x}%26y%3D{y}%26z%3D{z}'],'OUTPUT':temp_path})
            #print(temp_path)
            img = Image.open(temp_path)
            #print(img)
            img = transform(img)
            img = img.unsqueeze(0)
            img = img.to(device)
            out = model(img)
            out =  F.softmax(out,dim=1).cpu()
            _, predicted = torch.max(out, 1)
            pred = predicted.item()
            #print(pred)
            pred_list.append(pred)
            temp_path = None

    data['predict'] = pred_list
    data.to_file(out_path)

if __name__ == '__main__':
    weight_path = '.../vgg16_7000.pth'  #Has been included in the file  ---absolute path
    input_dir = '.../path/XXX' #generated buffer file from candidate_points.shp  ---absolute path
    out_path = '.../path/XXX' #result ---absolute path
    
    prediction(weight_path,input_dir,out_path)

5. Model adjustment

In [None]:
import pandas as pd
from collections import Counter
def majority_vote(series):
    counts = Counter(series)
    most_common = counts.most_common(2)
    if len(most_common) == 1 or most_common[0][1] > most_common[1][1]:
        return most_common[0][0]
    return None
def modification(input_path,out_path):
# Groups by original_id and then vote on the predict for each group
    data = gpd.read_file(input_path)
    data['predict_copy'] = data.groupby('ORIG_FID')['predict'].transform(majority_vote)
    data.to_file(out_path)

if __name__ == '__main__':
    input_path = '/path/XXX' # the out_path of step 4
    out_path = '/path/XXX'
    modification(input_path,out_path)

6. According to rule 2, count the unclassified roads, re-generat the candidate points in ArcGIS following rule 2, and then predict and vote again.

In [None]:
data = gpd.read_file('/path/XXX') #the out_path of step 5
select_data = data[data['predict_copy'] == None]
select_data.to_file('/path/XXX')

Repeat the above steps for full data generating.

Re-match the result point file with OSM road data in arcGIS to obtain the final result.