Pour simplifier la réutilisation j'ai identifié toutes les modifications a faire par des #to_modify avec les descriptions associés


In [1]:
import io
import os 
import sys
import time
import random
import requests
import rasterio

import numpy as np
import pandas as pd
import geopandas as gpd

from PIL import Image
from PIL.JpegImagePlugin import JpegImageFile

from shapely.geometry import box, Polygon, shape
from shapely.ops import transform

from ipywidgets import IntProgress
from IPython.display import display

from typing import Union, Tuple 

In [2]:
def get_filenames(directory: str) -> np.ndarray:
    """retrieves shapefile name and size 

    Args:
        directory (str): directory path 

    Returns:
        np.ndarray: shapefile name and size
    """
    filenames = []
    for filename in os.listdir(directory):
        if os.path.isfile(os.path.join(directory, filename)) and filename.endswith('.shp'):
            file_size = round(os.path.getsize(os.path.join(directory, filename)) / 1048576, 0)
            filenames.append([filename, file_size])
         
    return np.array(filenames)

In [3]:
def image_region(filenames: np.ndarray, nb_images: int) -> np.ndarray:
    """calculates for each shapefile the number of images it must produce 

    Args:
        filenames (np.ndarray): shapefile name and size
        nb_images (int): number of images to be saved

    Returns:
        np.ndarray: number of images
    """
    prob = filenames[:, 1].astype(float) / np.sum(filenames[:, 1].astype(float))
    
    return (prob*nb_images).astype(int)

In [4]:
def transform_shapefile(shapefile: gpd.geodataframe.GeoDataFrame, crs: str) -> gpd.geodataframe.GeoDataFrame:
    """changing the geospatial reference system

    Args:
        shapefile (gpd.geodataframe.GeoDataFrame): parcels of a region
        crs (str): georeferenced name

    Returns:
        gpd.geodataframe.GeoDataFrame: parcels of a region in the new reference system 
    """
    shapefile = shapefile.to_crs(crs)
    ## peut être à modifier pour être sur d'avoir un point dans la parcelle
    shapefile['centroid'] = shapefile.geometry.centroid

    return shapefile

In [5]:
def random_coordinate(shapefile: gpd.geodataframe.GeoDataFrame, offset: list) -> list:
    """create a coordinate box around the center of a random parcel

    Args:
        shapefile (gpd.geodataframe.GeoDataFrame): parcels
        offset (list):box size

    Returns:
        list: coordinate box
    """
    x = random.randint(0, len(shapefile))

    while (shapefile['SURF_PARC'][x] > 1) == False:
        x = random.randint(0, len(shapefile))

    center = [shapefile['centroid'][x].x, shapefile['centroid'][x].y]

    min_lon = center[0] - offset[0]/2
    min_lat = center[1] - offset[1]/2
    max_lon = center[0] + offset[0]/2
    max_lat = center[1] + offset[1]/2
    
    return [min_lon, min_lat, max_lon, max_lat]

In [6]:
def _request(url: str) -> Union[requests.models.Response, None]:
    """api response

    Args:
        url (str): url request

    Returns:
        Union[requests.models.Response, None]: response
    """
    response = requests.get(url)
    if response.status_code != 200:
        sys.stderr.write(f"GET request failed. Status code: {response.status_code}\n")
        response = None
    return response

def _request_image_box(lon_min: np.float64, lat_min: np.float64, lon_max: np.float64, 
                       lat_max: np.float64, width: int, height: int, 
                       api_token: str) -> requests.models.Response:
    """api response for a satellite image

    Args:
        lon_min (np.float64): longitude min
        lat_min (np.float64): latitude min
        lon_max (np.float64): longitude max
        lat_max (np.float64): latitude max
        width (int): pixel width 
        height (int): pixel height
        api_token (str): API token

    Returns:
        requests.models.Response: response
    """
    url = f"https://api.mapbox.com/styles/v1/mapbox/satellite-v9/static/[{lon_min},{lat_min},{lon_max},{lat_max}]/{width}x{height}?logo=false&attribution=false&access_token={api_token}"
    return _request(url)

def _extract_content_from_reponse(response: requests.models.Response) -> Union[Tuple[JpegImageFile, str], Tuple[None, None]]:
    """image extraction from API response

    Args:
        response (requests.models.Response): API response

    Returns:
        Union[Tuple[JpegImageFile, str], Tuple[None, None]]: image with its format
    """
    result = (None, None)
    if response:
        content_type = response.headers["content-type"]
        format = content_type.split('/')[-1]
        image  = Image.open(io.BytesIO(response.content))
        result = (image, format)
    return result

In [7]:
def download_images(coord: list, pixels: int, api_token: str, loc: str, i: int) -> None:
    """download satellite image locally

    Args:
        coord (list): coordinate box 
        pixels (int): image size
        api_token (str): API token
        loc (str): localisation inside local directory 
        i (int): image number 
    """

    response = _request_image_box(coord[0], coord[1], coord[2], coord[3], pixels, pixels, api_token)
    image = _extract_content_from_reponse(response)[0]

    #to_modify
    #endroit de sauvegarde des images satellites data/images/
    image.save(f'../data/data_crop_group/images/{loc}/image_{i}.jpg')

In [8]:
def clip_shapefile(coord: list, shapefile: gpd.geodataframe.GeoDataFrame, crs: str) -> gpd.geodataframe.GeoDataFrame:
    """clip shapefile to manage parcels on image 

    Args:
        coord (list): coordinate box 
        shapefile (gpd.geodataframe.GeoDataFrame): parcels 
        crs (str): georeferenced name

    Returns:
        gpd.geodataframe.GeoDataFrame: parcel present only in the box 
    """

    polygone_box = box(coord[0], coord[1], coord[2], coord[3])
    gdf_box = gpd.GeoDataFrame(geometry=[polygone_box], crs=crs)
    
    shapefile_clip = gpd.overlay(shapefile, gdf_box, how='intersection', make_valid=True)
    
    return shapefile_clip

In [9]:
def create_save_labels(shapefile_clipped: gpd.geodataframe.GeoDataFrame, coord: list, loc: str, i: int) -> None:
    """_summary_

    Args:
        shapefile_clipped (gpd.geodataframe.GeoDataFrame): parcel present only in the box
        coord (list): coordinate box
        loc (str): localisation inside local directory 
        i (int): image number
    """

    polygones = shapefile_clipped.geometry

    lon_min, lat_min, lon_max, lat_max = coord
    x_scaled, y_scaled = [], []
    
    for polygone in polygones:
        try :
            x, y =np.stack(polygone.exterior.xy, axis=0)
            x_scaled.append(((x - lon_min) / (lon_max - lon_min)))
            y_scaled.append(1 - ((y - lat_min) / (lat_max - lat_min)))
        except AttributeError:
            continue

    polygones_points = [list(zip(x, y)) for x, y in zip(x_scaled, y_scaled)]
    
    #to_modify
    #endroit de sauvegarde des labels texte data/labels/
    with open(f'../data/data_crop_group/labels/{loc}/image_{i}.txt', 'w') as fichier:
        for polygones_points, code_group in zip(polygones_points, shapefile_clipped.CODE_GROUP):
            line = f'{code_group} ' #mettre code_group pour prendre en compte la culture sinon 0
            for lon, lat in polygones_points:
                line += f'{lon:.3f} {lat:.3f} '
            fichier.write(f'{line}\n')
        fichier.close()

In [10]:
def __main__():
    #to_modify
    #endroit où ce situe les shapefiles à utiliser ainsi que le token de l'api mapbox
    directory = "../data/shapefiles_filtered/"
    api_token = 'TOKEN_MAPBOX'
    offset = [0.0096, 0.0064] #2m pour 512p
    crs = "EPSG:4326"
    pixels = 512

    #to_modify
    #peut ajouter des valeurs dans la list pour faire tout d'un coup 
    localisations = ['train'] #,'train','val','test'
    nb_images = [2000] #4000,200,200
    
    
    filenames = get_filenames(directory)

    for nb_image, loc in zip(nb_images, localisations):
        print(f'##--- {loc} ---#')
        
        #to_modify
        #premier numero pour l'enregistrement
        #si c'est pour cérer un nouveau dataset mettre 0
        #si c'est pour compléter un ancien dataset avec de nouvelles données mettre le numéro du dernier label
        #risque de probleme pour les resultats si les nombre de ne suivent pas 
        image_count = 4190 
        nb_image_region = image_region(filenames, nb_image)

        for filename, nb_image_r in zip(filenames, nb_image_region):
            
            print(f' region : {filename[0][:-4]}, pour {nb_image_r} images')
    
            if nb_image <= 0:
                break

            shapefile = gpd.read_file(f'{directory}{filename[0]}')
            shapefile = transform_shapefile(shapefile, crs)

            #afficahe d'une barre de progression
            start = time.time()
            f = IntProgress(min=0, max=nb_image_r)
            display(f)
    
            for i in range(nb_image_r):

                coord = random_coordinate(shapefile, offset)
                
                download_images(coord, pixels, api_token, loc, image_count)
    
                shapefile_clipped = clip_shapefile(coord, shapefile, crs)
    
                create_save_labels(shapefile_clipped, coord, loc, image_count)
    
                f.value += 1
                image_count += 1
                
            print(f'{loc}, {filename[0][:-4]}, {nb_image_r}, temps : {time.time() - start}s')
            del(shapefile)

In [None]:
__main__()