# Ingest CORINE Land Cover Data

This notebook will ingest CORINE land cover data onto the DL platform. CORINE is the European Economic Area's land cover dataset and is available for the years 1990, 2000, 2006, 2012, and 2018. The dataset uses 44 classes to describe land cover, ranging from the natural to built environment. The dataset is built from vector data collected from the devolved EEA member states. The vector land cover data is developed using high- and medium-resolution earth observation imagery as well as in-situ data. The minimum mapping unit for the dataset is 0.25 square kilometers for areal phenomena and 100m for linear phenomena. See https://land.copernicus.eu/pan-european/corine-land-cover.

CORINE land cover data is provided as vector or 100m raster files. For various purposes, it would be useful to have CORINE land cover data as 10m rasters. Let's grab the CORINE vector data and store it as a DL product at 10m resolution.

**Load libraries**

In [10]:
import os, json, time, logging, sys, geojson, pickle

import pandas as pd
import geopandas as gpd
from shapely import geometry
import numpy as np
from shapely.affinity import affine_transform
from shapely import wkt
from skimage.measure import block_reduce
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw
from osgeo import ogr

import descarteslabs as dl
from descarteslabs.catalog import Product
from descarteslabs.catalog import Image as dl_Image
from descarteslabs.catalog import ClassBand, DataType, Resolution, ResolutionUnit

In [None]:
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

**Fetch the data**

The following steps will save the CORINE data to this local node for processing.
- Make a Copernicus account 
- For your selected year, generate a download link
- download the zip file: `wget <http://path/to/your/zipfile.zip>`
- open a terminal and navigate to your zipfile
- unzip the zipfile and its nested zipfile: 2x `unzip <path/to/your/zipfile.zip>`
- set the `corine_path` variable below to the root of the unzipped directory

**Workflow**

The workflow for creating the CORINE data will be as follows:

- Load the CORINE class encoding
- Create a DL product and single band
- Load the CORINE scope countries
- fetch the tiles for the scope countries
- load the vector database 
- for each tile:
  - filter the database and push a geojson FeatureCollection to DL Storage
  - deploy the rasterising function on DL Tasks which
    - fetches the FeatureCollection for the tile
    - rasterises it at 1m (for nice crisp lines between objects)
    - aggregates to 10m 
    - pushes the image to the DL product

**Parameters**

In [4]:
params = {}
params['corine_path'] = '/home/jovyan/solar-pv-global-inventory/data/CORINE/clc2012_clc2006_v2018_20_fgdb'  # path to the geodatabase
params['year'] = '2006' 
params['gdb_fname'] = 'CLC2012_CLC2006_V2018_20.gdb'
params['tilesize'] = 5000 # size of the tiles to be used for rasterisation
params['product_params'] = {'_id':'corine-land-cover',
                            'name':'CORINE land cover product for re-rasterised CORINE vector layers'}
params['band_params'] = {'name':'CLC_class',
                         'data_range':(0,255),
                         'display_range':(0,50),
                         'resolution':10, 
                         'index':0}

### Class Encoding

In [5]:
# get classes from the legend csv included with the data
classes = pd.read_csv(os.path.join(params['corine_path'],'Legend','CLC_legend.csv'), sep=';')

In [6]:
class_labels = (classes['CLC_CODE'].astype(str) +': '+ classes['LABEL3']).values.tolist()

In [7]:
class_labels

['111: Continuous urban fabric',
 '112: Discontinuous urban fabric',
 '121: Industrial or commercial units',
 '122: Road and rail networks and associated land',
 '123: Port areas',
 '124: Airports',
 '131: Mineral extraction sites',
 '132: Dump sites',
 '133: Construction sites',
 '141: Green urban areas',
 '142: Sport and leisure facilities',
 '211: Non-irrigated arable land',
 '212: Permanently irrigated land',
 '213: Rice fields',
 '221: Vineyards',
 '222: Fruit trees and berry plantations',
 '223: Olive groves',
 '231: Pastures',
 '241: Annual crops associated with permanent crops',
 '242: Complex cultivation patterns',
 '243: Land principally occupied by agriculture, with significant areas of natural vegetation',
 '244: Agro-forestry areas',
 '311: Broad-leaved forest',
 '312: Coniferous forest',
 '313: Mixed forest',
 '321: Natural grasslands',
 '322: Moors and heathland',
 '323: Sclerophyllous vegetation',
 '324: Transitional woodland-shrub',
 '331: Beaches, dunes, sands',
 '332

### Create Product and Band

**Create Product**

In [None]:
product = Product.get('oxford-university:corine-land-cover')#(params['product_params']['_id'])

In [None]:
if not product:
    product = Product(id=params['product_params']['_id'], 
                      name=params['product_params']['name'])
    product.save()

In [None]:
bands = [bb for bb in product.bands().limit(2)]

**Create Band**

In [None]:
if not bands:
    band = ClassBand(name=params['band_params']['name'], product=product)
    band.data_type = DataType.BYTE
    band.data_range = params['band_params']['data_range']
    band.display_range = params['band_params']['display_range']
    band.resolution = Resolution(unit=ResolutionUnit.METERS, value=params['band_params']['resolution'])
    band.band_index = params['band_params']['index']
    band.class_labels = class_labels
    band.save()

**Delete product and images if necessary**

In [None]:
### delete the product if it needs to be remade
# status = product.delete_related_objects()

In [None]:
# status

In [None]:
# product.delete()

### Load Scope Countries and Fetch Country Shape

In [None]:
# load scope countries from the CLC metadata
scope_countries = pd.read_excel(os.path.join(params['corine_path'],'CLC_country_coverage_v20.xls'), skiprows=4)
scope_countries = scope_countries[scope_countries['ISO code'].str.len()==2] # drop trailing notes

In [None]:
country_shapes = {}
for country in scope_countries.Country.values.tolist():
    res = [r for r in dl.Places().find(country.lower().replace(' ','-')) if r['placetype']=='country']
    if res:
        country_shapes[country] = geometry.shape(dl.Places().shape(res[0]['slug'], geom='medium')['geometry'])
    else: 
        print ('Nothing found!', country)

In [None]:
### add montenegro and france manually
res = [r for r in dl.Places().find('montenegro') if r['placetype']=='country']
country_shapes['Monte Negro'] = geometry.shape(dl.Places().shape(res[0]['slug'], geom='medium')['geometry'])

In [None]:
### add France from natural earth
ne = gpd.read_file(os.path.join(params['corine_path'],'ne_10m_countries.gpkg'))
country_shapes['France'] = ne[ne.ISO_A2=='FR'].iloc[0].geometry

In [None]:
# cast dict to geopandas geoseries
gds_countries = gpd.GeoSeries(data=country_shapes)

In [None]:
gds_countries.plot(figsize=(6,6))

### Fetch tiles for all countries

In [None]:
extents_box = geometry.box(-40,20,60,88) #minx, miny, maxx, maxy

In [None]:
# clip countries to the bounding box of CORINE and reduce country shapes to a multipolygon
countries_mp = gpd.clip(gds_countries,extents_box).geometry.unary_union

In [None]:
# tile = dl.Raster().dltile_from_latlon(lat=51.744674, lon=-1.246004, resolution=10, tilesize=500, pad=0)

In [None]:
# fetch the tiles
tiles = dl.Raster().dltiles_from_shape(
    resolution=params['band_params']['resolution'],
    tilesize=params['tilesize'],
    pad=0,
    shape=countries_mp)
print (len(tiles['features']))

In [None]:
# use geopandas to quickly plot the tiles
tile_polys = [geometry.shape(t['geometry']) for t in tiles['features']]
gds_tiles = gpd.GeoSeries(tile_polys)
# gds_tiles.plot(figsize=(20,20))

In [None]:
gds_tiles.plot(figsize=(12,12))

### Load the gdb

In [None]:
### try loading into a GeoDataFrame...
# gdf = gpd.read_file(os.path.join(params['corine_path'],params['gdb_fname']))
# ... this takes way too long.

In [None]:
def tile2fcstorage(tile, params):
    logger = logging.getLogger(tile['properties']['key'])
    try:
        tic = time.time()
        storage_client = dl.Storage()


        # handle tile
        logger.info(f'handling file {time.time()-tic}')
        tile_shp = geometry.shape(tile.geometry)
        tile_gds = gpd.GeoSeries(tile_shp, crs='EPSG:4326')
        tile_LAEA = tile_gds.to_crs('EPSG:3035').geometry[0]
        tile_utm = tile_gds.to_crs(tile['properties']['cs_code']).geometry[0]

        # get fc
        logger.info(f'filtering gdb {time.time()-tic}')
        driver = ogr.GetDriverByName("OpenFileGDB")
        dataSource = driver.Open(os.path.join(params['corine_path'],params['gdb_fname']), 0)
        layer = dataSource.GetLayer()
        layer.SetSpatialFilter(ogr.CreateGeometryFromWkt(tile_LAEA.wkt))
        logger.info(f'getting features {time.time()-tic}')
        features=[]
        for feature in layer:
            #print (dir(feature))
            #print (feature.items())
            #print (dir (feature.geometry()))
            #print ('lingeom',feature.geometry().GetLinearGeometry())
            try:
                features.append(feature.ExportToJson())
            except Exception as e:
                logger.exception('message')
                logger.info('trying via wkt')
                ft_wkt = feature.geometry().ExportToWkt()
                print (ft_wkt[0:20])
                if ft_wkt.split(' ')[0]=='MULTISURFACE':
                    features.append(json.dumps(geojson.Feature(
                        geometry=wkt.loads(feature.geometry().GetLinearGeometry().ExportToWkt()),
                        properties=feature.items())
                    ))
                else:
                    raise
            
        if len(features)==0:
            logger.info(f'zero features. Write blank to Storage. {time.time()-tic}')
            json.dump(geojson.FeatureCollection([]), open(os.path.join(params['corine_path'],'tile_gj',params['year']+'_'+tile['properties']['key']+'.geojson'), 'w'))
            storage_client.set_file(params['year']+'_'+tile['properties']['key']+'.geojson',os.path.join(params['corine_path'],'tile_gj',params['year']+'_'+tile['properties']['key']+'.geojson'))
        
            logger.info(f'write log {time.time()-tic}')
            with open(os.path.join(params['corine_path'],'write_log.log'),'a+') as f:
                f.write('{}\r\n'.format(tile['properties']['key'],time.time()-tic))
        
            return 0
        # 
        logger.info(f'casting to gdf {time.time()-tic}')
        # LAEA projection
        gdf = gpd.GeoDataFrame.from_features([json.loads(ft) for ft in features], crs = 'EPSG:3035')

        # reproject geometry and buffer to try to fix topology errors
        logger.info(f'reprojecting geometry {time.time()-tic}')
        gdf.geometry = gdf.geometry.to_crs('+proj=longlat +datum=WGS84 +no_defs ').buffer(0)

        # clip to the tile to save space
        logger.info(f'clip to tile {time.time()-tic}')
        gdf.geometry = gdf.geometry.intersection(tile_shp)

        gdf = gdf[~gdf.geometry.is_empty]

        # dump to file
        logger.info(f'dump to file {time.time()-tic}')
        gdf.to_file(os.path.join(params['corine_path'],'tile_gj',params['year']+'_'+tile['properties']['key']+'.geojson'), driver='GeoJSON')

        logger.info(f'push to storage {time.time()-tic}')
        storage_client.set_file(params['year']+'_'+tile['properties']['key']+'.geojson',os.path.join(params['corine_path'],'tile_gj',params['year']+'_'+tile['properties']['key']+'.geojson'))
        
        logger.info(f'write log {time.time()-tic}')
        with open(os.path.join(params['corine_path'],'write_log.log'),'a+') as f:
            f.write('{}\r\n'.format(tile['properties']['key'],time.time()-tic))

        logger.info(f'DONE {time.time()-tic}')

        #print (gdf[725:750])
        #fig, ax = plt.subplots(1,1,figsize=(20,20))
        #gdf.plot(column='Code_18', ax=ax)
    except Exception as e:
        logger.exception("message")

In [None]:
### try running a random tile
tile = tiles['features'][np.random.choice(len(tiles['features']))]
tile2fcstorage(tile,params)

In [None]:
### run an individual tile
# tile = [tt for tt in tiles['features'] if tt['properties']['key']=='5000:0:10.0:28:1:127'][0]
# tile2fcstorage(tile,params)

In [None]:
### check if it made it to storage
storage_client_local = dl.Storage()
storage_client_local.exists(params['year']+'_'+tile['properties']['key']+'.geojson')

### Prep and store GeoJSONs using Python multiprocessing

In [None]:
## see which ones are done
if os.path.exists(os.path.join(params['corine_path'],'write_log.log')):
    with open(os.path.join(params['corine_path'],'write_log.log'),'r') as f:
        done_tiles = f.readlines()
else:
    done_tiles = []
done_tiles = [tt.rstrip() for tt in done_tiles]

In [None]:
len(done_tiles)

In [None]:
import multiprocessing as mp
print (mp.cpu_count())

In [None]:
pool = mp.Pool(4)

#### 

In [None]:
### run tiles that aren't done yet
run_tiles = [tt for tt in tiles['features'] if tt['properties']['key'] not in done_tiles]
print (len(run_tiles))

In [None]:
### multiprocess the feature fetching and storage
pool.starmap(tile2fcstorage, list(zip(run_tiles, [params]*len(run_tiles))))

### Parse Class labels

In [8]:
class_labels_dict = {kk+1:{'code':class_labels[kk].split(': ')[0],'description':class_labels[kk].split(': ')[-1]} for kk in range(len(class_labels))}
print(class_labels_dict)

{1: {'code': '111', 'description': 'Continuous urban fabric'}, 2: {'code': '112', 'description': 'Discontinuous urban fabric'}, 3: {'code': '121', 'description': 'Industrial or commercial units'}, 4: {'code': '122', 'description': 'Road and rail networks and associated land'}, 5: {'code': '123', 'description': 'Port areas'}, 6: {'code': '124', 'description': 'Airports'}, 7: {'code': '131', 'description': 'Mineral extraction sites'}, 8: {'code': '132', 'description': 'Dump sites'}, 9: {'code': '133', 'description': 'Construction sites'}, 10: {'code': '141', 'description': 'Green urban areas'}, 11: {'code': '142', 'description': 'Sport and leisure facilities'}, 12: {'code': '211', 'description': 'Non-irrigated arable land'}, 13: {'code': '212', 'description': 'Permanently irrigated land'}, 14: {'code': '213', 'description': 'Rice fields'}, 15: {'code': '221', 'description': 'Vineyards'}, 16: {'code': '222', 'description': 'Fruit trees and berry plantations'}, 17: {'code': '223', 'descrip

In [12]:
pickle.dump(class_labels_dict, open('./class_labels_CORINE.pkl','wb'))

### Deploy the rasteriser on DL.Tasks()

In [None]:
def tile2img(tile, params, class_labels_dict):
    
    import logging, json, sys
    from shapely import geometry
    
    import geopandas as gpd
    import pandas as pd
    import numpy as np
    
    import descarteslabs as dl
    print (dl.__version__)
    from descarteslabs.catalog import Image, Product
    
    from shapely.affinity import affine_transform
    from skimage.measure import block_reduce
    from PIL import ImageDraw
    from PIL import Image as pilImage
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    storage_client = dl.Storage()
    
    logger = logging.getLogger(tile['properties']['key'])
    logger.info('Get Product')
    
    product = Product.get_or_create('oxford-university:corine-land-cover')
    
    def xyz2xy(coord_seq):
        return [(t[0],t[1]) for t in list(coord_seq)]
    
    logger.info('handle tile...')
    tile_shp = geometry.shape(tile['geometry'])
    tile_gds = gpd.GeoSeries(tile_shp, crs='EPSG:4326')
    tile_LAEA = tile_gds.to_crs('EPSG:3035').geometry[0]
    tile_utm = tile_gds.to_crs(tile['properties']['cs_code']).geometry[0]
    
    BL = geometry.Point(tile_utm.bounds[0], tile_utm.bounds[1]) # bottom left in UTM
    
    logger.info(f'BL {BL}')
    
    logger.info('get GDF')
    # load data
    
    features  = json.loads(storage_client.get(params['year']+'_'+tile['properties']['key']+'.geojson'))['features']
    if len(features)==0:
        logger.info('No features!')
        return {'tile':tile['properties']['key'],
               'n_features':0}
    
    df = pd.DataFrame.from_records([ft['properties'] for ft in features])
    geometries = [geometry.shape(ft['geometry']) for ft in features]
  
    gdf = gpd.GeoDataFrame(df, geometry=geometries, crs = {'init':'epsg:4326'})
    #gdf = gpd.read_file(os.path.join(params['corine_path'],'gdf_'+tile['properties']['key']+'.geojson'))
    
    # convert to utm
    gdf = gdf.to_crs(tile['properties']['cs_code'])
    print (gdf)
    
    logger.info('class array')
    class_array = np.zeros((params['tilesize'], params['tilesize'],50),dtype = np.int8)
    
    slice_tilesize=int(params['tilesize']/10)   # 500px -> 5km
    
    for ii_x in range(int(params['tilesize']/slice_tilesize)):
        for ii_y in range(int(params['tilesize']/slice_tilesize)):
            # create the mask for the slice
            
            logger.info(f'ii_x, ii_y: {ii_x}, {ii_y}')
            slice_mask = np.zeros((slice_tilesize, slice_tilesize, 50), dtype = np.int8)
            big_slice_mask = np.zeros((slice_tilesize*10, slice_tilesize*10, 50), dtype = np.int8) # make slice tilesize 10x larger to 1m resolution
            
            # get the slice affine transform
            a = e = 1/1 # stretch along-axis 1m/px
            b = d = 0 # rotate across-axis 0m/px
            x_off = - (BL.x + ii_x*slice_tilesize*10) / 1 # offset from cartesian origin in pixel coordinates
            y_off = - (BL.y + ii_y*slice_tilesize*10) / 1 # offset from cartesian origin in pixel coordinates
            GT = [a,b,d,e,x_off,y_off] # GeoTransform Matrix
            
            ### draw all objects
            for class_code in list(class_labels_dict.keys()):
                # create Image object
                im = pilImage.fromarray(big_slice_mask[:,:,int(class_code)], mode='L')

                # create ImageDraw object
                draw = ImageDraw.Draw(im)
                
                # draw objects
                geometries = gdf[gdf.Code_06==class_labels_dict[class_code]['code']].geometry.values
                
                for geom_utm in geometries:
                    geom_px = affine_transform(geom_utm, GT)
                    
                    if geom_px.type=='Polygon':
                        # draw the polygon
                        draw.polygon(xyz2xy(geom_px.exterior.coords), fill=1)

                        # un-draw any holes in the polygon...
                        for hole in geom_px.interiors:
                            draw.polygon(xyz2xy(hole.coords), fill=0)
                        
                    elif geom_px.type=='MultiPolygon':
                        for subgeom in list(geom_px):
                            # draw the polygon
                            draw.polygon(xyz2xy(subgeom.exterior.coords), fill=1)

                            # un-draw any holes in the polygon...
                            for hole in subgeom.interiors:
                                draw.polygon(xyz2xy(hole.coords), fill=0)
                
                # return the image object to the mask array
                big_slice_mask[:,:,int(class_code)] = np.array(im)
                
            ### reduce the mask array
            ### draw all objects
            for class_code in class_labels_dict.keys():
                # create Image object
                slice_mask[:,:,int(class_code)] = block_reduce(big_slice_mask[:,:,int(class_code)], (10,10), np.sum)
                
            class_array[ii_y*slice_tilesize:(ii_y+1)*slice_tilesize,ii_x*slice_tilesize:(ii_x+1)*slice_tilesize,:] = slice_mask #np.swapaxes(slice_mask,0,1)
            
    logger.info(f'Got full array')
    #class_array = np.swapaxes(np.argmax(class_array, axis=-1),0,1)
    class_array = np.flip(np.argmax(class_array, axis=-1), axis=0)
    
    logger.info(f'Get raster info')
    raster_info = {
        'driverShortName':'MEM',
        'driverLongName': 'In Memory Raster',
        'files':[],
        'size':[params['tilesize'],params['tilesize']],
        'coordinateSystem':{'wkt':tile['properties']['wkt']},
        'geotrans': tile['properties']['geotrans'],
        'cs_code':tile['properties']['cs_code'],
         'metadata': {},
         'cornerCoordinates': {'upperLeft': [round(tt) for tt in list(tile_utm.exterior.coords)[3]],
                              'lowerLeft': [round(tt) for tt in list(tile_utm.exterior.coords)[0]],
                              'lowerRight': [round(tt) for tt in list(tile_utm.exterior.coords)[1]],
                              'upperRight': [round(tt) for tt in list(tile_utm.exterior.coords)[2]],
                              'center': [round(tile_utm.centroid.x), round(tile_utm.centroid.y)]},
         'wgs84Extent': geometry.mapping(geometry.box(*tile_shp.bounds)),
     'bands': [{'band': 1}],

    }
    
    
    logger.info(f'Generating image')

    image = Image(product=product, name=params['year']+'_'+tile['properties']['key'].replace(':','_'), acquired=params['year']+"-12-31", acquired_end=params['year']+"-12-31", cs_code=tile['properties']['cs_code'], geotrans=tile['properties']['geotrans'])

    logger.info(f'Uploading image')
    
    image.upload_ndarray(class_array.astype(np.int8),raster_meta=raster_info)

            
    return True

In [None]:
tasks = dl.Tasks()

In [None]:
fn = tasks.create_function(
            tile2img,
            image='us.gcr.io/dl-ci-cd/images/tasks/public/py3.7:v1.1.1',
            name='tile2img-v16-2006',
            requirements=[],
            maximum_concurrency=500,
            memory='3.5Gi',
            retry_count=0,
            task_timeout=21600,
            )

In [None]:
### run a local or test file
# tile = tiles['features'][np.random.choice(len(tiles['features']))]

## local:
# tile2img(tile, params, class_labels_dict)

## remote:
# fn(tile, params, class_labels_dict)

In [None]:
### Deploy all tiles
all_tasks = []
for ii_t, tile in enumerate(tiles['features']):
    if ii_t % 100==0:
        print (ii_t)
    all_tasks.append(fn(tile, params, class_labels_dict))