# Clase Landsat

Vamos a crear la clase para trabajar con imágenes Landsat

In [1]:
import os
import numpy as np
import rasterio
from urllib.request import urlopen

class Landsat:
    
    
    """Clase para trabajar con las Landsat de la nueva Collection 2 Level 2 del USGS. Primer boceto creado
    como ejercicio del curso de Análisis Espacial con Python. Mirad bien los comentarios por favor!!"""
    
    def __init__(self, ruta_escena):
        
        """Definimos los atributos que tendrá nuestro objeto a partir del path a su carpeta"""
        
        #Definimos los paths necesarios
        self.ruta_escena = ruta_escena
        self.sr = os.path.split(self.ruta_escena)[0]
        self.escena = os.path.split(self.ruta_escena)[1]
        self.pro = os.path.join(os.path.split(self.sr)[0], 'PRO')
        
        if not os.path.exists(self.pro):
            os.makedirs(self.pro)
            
        self.pro_escena = os.path.join(self.pro, self.escena)
        if not os.path.exists(self.pro_escena):
            os.makedirs(self.pro_escena)
        
        #Definimos un grupo de variables de la escena
        data_escena = self.escena.split('_')
        self.sat = data_escena[0]
        self.nprocesado = data_escena[1]
        self.path = data_escena[2][:3]
        self.row = data_escena[2][-3:]
        self.escena_date = data_escena[3]
        self.escena_procesado_date = data_escena[4]
        self.collection = data_escena[5]
        self.tier = data_escena[6]
        
        #CReamos un diccionario a partir del MTL
        self.mtl = {}
        for i in os.listdir(self.ruta_escena):
            if i.endswith('MTL.txt'):
                mtl = os.path.join(self.ruta_escena, i)
                
                f = open(mtl, 'r')
                
                for line in f.readlines():
                    if "=" in line:
                        l = line.split("=")
                        self.mtl[l[0].strip()] = l[1].strip()
        
        #Creamos las variables con la franja del espectro de cada banda
        if self.sat in ['LC08', 'LC09']:
            
            for i in os.listdir(self.ruta_escena):
                if i.endswith('.TIF'):
                    banda = os.path.splitext(i)[0].split('_')[-1]
            
                    if banda == 'B2':
                        self.blue = os.path.join(self.ruta_escena, i)
                    elif banda == 'B3':
                        self.green = os.path.join(self.ruta_escena, i)
                    elif banda == 'B4':
                        self.red = os.path.join(self.ruta_escena, i)
                    elif banda == 'B5':
                        self.nir = os.path.join(self.ruta_escena, i)
                    elif banda == 'B6':
                        self.swir1 = os.path.join(self.ruta_escena, i)
                    elif banda == 'B7':
                        self.swir2 = os.path.join(self.ruta_escena, i)
                    elif i.endswith('QA_PIXEL.TIF'):
                        self.qa = os.path.join(self.ruta_escena, i)
                        print(self.qa)
                    else:
                        continue
                
        elif self.sat in ['LE07', 'LT05']:
            
            for i in os.listdir(self.ruta_escena):
                if i.endswith('.TIF'):
                    banda = os.path.splitext(i)[0].split('_')[-1]
            
                    if banda == 'B1':
                        self.blue = os.path.join(self.ruta_escena, i)
                    elif banda == 'B2':
                        self.green = os.path.join(self.ruta_escena, i)
                    elif banda == 'B3':
                        self.red = os.path.join(self.ruta_escena, i)
                    elif banda == 'B4':
                        self.nir = os.path.join(self.ruta_escena, i)
                    elif banda == 'B5':
                        self.swir1 = os.path.join(self.ruta_escena, i)
                    elif banda == 'B7':
                        self.swir2 = os.path.join(self.ruta_escena, i)
                    elif i.endswith('QA_PIXEL.TIF'):
                        self.qa = os.path.join(self.ruta_escena, i)
                        print(self.qa)
                    else:
                        continue
        
        else:
            print('No encuentro ninguna escena Landsat')
            
        
        # Descargamos el quicklook de la escena 
        url_base = 'https://landsatlook.usgs.gov/gen-browse?size=rrb&type=refl&product_id={}'.format(self.mtl['LANDSAT_PRODUCT_ID'].strip('""'))
        qk_name = os.path.join(self.ruta_escena, self.escena + '_Quicklook')
        
        if not os path.exists(qk_name):
            qk = open(qk_name, 'wb')
            qk_open = urlopen(url_base)
            urlimg = qk_open.read()
            qk.write(urlimg)
            qk.close()

            print('QuicKlook descargado')
            
        else:
            print('El Quicklook ya estaba previamente descargado')
        
        #print(url_base)
        print('Landsat iniciada con éxito') 
        
        #Creamos el json para instarlo en la base de datos MongoDB
        self.newesc = {'_id': self.escena, 'usgs_id': self.mtl['LANDSAT_SCENE_ID'], 'tier_id': self.mtl['LANDSAT_PRODUCT_ID'],
                           'lpgs': self.mtl['PROCESSING_SOFTWARE_VERSION'], 'Clouds': {'cloud_scene': self.mtl['CLOUD_COVER']},
                           'Info': {'Tecnico': 'LAST-EBD Auto', 'Iniciada': datetime.now(), 'Pasos': {'rad': '', 'nor': ''}}}

        # Conectamos con MongoDB 
        connection = pymongo.MongoClient("mongodb://localhost")
        # DataBase: teledeteccion, Collection: landsat
        db=connection.teledeteccion
        landsat = db.landsat

        try:

            landsat.insert_one(self.newesc)

        except Exception as e:

            landsat.update_one({'_id':self.escena}, {'$set':{'Info.Iniciada': datetime.now()}})
            #print "Unexpected error:", type(e), se Podria dar un error por clave unica, por eso en
            #ese caso, lo que hacemos es actualizar la fecha en la que tratamos la imagen

        print('Landsat instanciada y subida a la base de datos')
        
    
    def get_refl(self, banda):
        
        
        dbandasoli = {self.nir: ['REFLECTANCE_MULT_BAND_5', 
                        'REFLECTANCE_ADD_BAND_5'], 
           self.red: ['REFLECTANCE_MULT_BAND_4', 
                        'REFLECTANCE_ADD_BAND_4'],
               self.swir1: ['REFLECTANCE_MULT_BAND_6', 
                        'REFLECTANCE_ADD_BAND_6']}

        dbandasnoli = {self.nir: ['REFLECTANCE_MULT_BAND_4', 
                                'REFLECTANCE_ADD_BAND_4'], 
                   self.red: ['REFLECTANCE_MULT_BAND_3', 
                                'REFLECTANCE_ADD_BAND_3'],
                      self.swir1: ['REFLECTANCE_MULT_BAND_5', 
                        'REFLECTANCE_ADD_BAND_5']  }

        
        if self.sat in ['LC08', 'LC09']:
            
            prod = self.mtl[dbandasoli[banda][0]]     
            add = self.mtl[dbandasoli[banda][1]]
            
        else:
            
            prod = self.mtl[dbandasnoli[banda][0]]   
            add = self.mtl[dbandasnoli[banda][1]]            


        return (float(prod), float(add))
    
    
    
    def ndvi(self):
        
        
        #Creamos el archivo que escribiremos luego con el ndvi
        outfile = os.path.join(self.pro_escena, self.escena + '_ndvi_rf_i.tif')
        print(outfile)

        #ndvi = (self.nir - self.red) / (self.nir + self.red)
        #NIR = rasterio.open(self.nir).read() #CReo un obejto raster de rasterio
        #Abrimos y leemos los rasters con rasterio
        with rasterio.open(self.qa) as qa:
            QA = qa.read().astype('float')
            #Creamos una máscara binaria con los píxeles válidos
            QAA = np.where(((QA == 21824) | (QA == 21952)), 1, 0)
            
        with rasterio.open(self.nir) as nir:
            NIR = nir.read().astype('float')
            
            PROD = self.get_refl(self.nir)[0]
            ADD = self.get_refl(self.nir)[1]
            
            print('nir', PROD, type(PROD))
            print('nir', ADD, type(ADD))
            
            NIR = NIR * PROD + ADD

        with rasterio.open(self.red) as red:
            RED = red.read().astype('float')
            
            PROD = self.get_refl(self.red)[0]
            ADD = self.get_refl(self.red)[1]
            
            print('red', PROD, type(PROD))
            print('red', ADD, type(ADD))
            
            RED = (RED + ADD) * PROD
        
        
        ndvi = (NIR - RED) / (NIR + RED) 
        ndvi_ = np.where((QAA == 1), ndvi, -9999)

        #Actualizamos el profile
        profile = nir.meta
        profile.update(dtype=rasterio.float32)
        profile.update(nodata=-9999)

        with rasterio.open(outfile, 'w', **profile) as dst:
            dst.write(ndvi_) #.astype(rasterio.float32))

    def flood(self):
        
        
        #Creamos el archivo que escribiremos luego con el ndvi
        outfile = os.path.join(self.pro_escena, self.escena + '_flood_rf2.tif')
        print(outfile)

        #ndvi = (self.nir - self.red) / (self.nir + self.red)
        #NIR = rasterio.open(self.nir).read() #CReo un obejto raster de rasterio
        #Abrimos y leemos los rasters con rasterio
        with rasterio.open(self.qa) as qa:
            QA = qa.read().astype('float')
            #Creamos una máscara binaria con los píxeles válidos
            QAA = np.where(((QA == 21824) | (QA == 21952)), 1, 0)
            
        with rasterio.open(self.swir1) as swir1:
            SWIR1 = swir1.read()#.astype('float')
            
            PROD = self.get_refl(self.swir1)[0]
            ADD = self.get_refl(self.swir1)[1]
            
            print(PROD, ADD)
            
            SWIR1 = SWIR1 * PROD + ADD

        #ndvi = (NIR - RED) / (NIR + RED) 
        flood = np.where((QAA == 1) & (SWIR1 < 0.15) , 1, 
                         np.where(QAA == 0, -9999, 0))

        #Actualizamos el profile
        profile = swir1.meta
        profile.update(dtype=rasterio.int16)
        profile.update(nodata=-9999)

        with rasterio.open(outfile, 'w', **profile) as dst:
            dst.write(flood) #.astype(rasterio.float32))
            
            
            
    def get_cloud_pn(self, shape):

        """Este método está pensado para obtener el porcentaje de nubes sobre la marisma. Por tanto el 
        shape con la geometría que se le pase debería de ser ese, pero en realidad cualquiera que le
        pasemos funcionará y nos dará el porcentaje de nubes sobre esa geometría"""

        #shape = '/media/diego/Datos3/EBD/Cursos/AETPython/Landsat/Data/marisma_recintos_etrs.shp'
        crop = "-crop_to_cutline"

        if self.sat == 'L7' and self.escena > '20030714':
            ruta = self.gapfill
        else:
            ruta = self.ruta_escena

        cloud = self.qa 
        
        #usamos Gdalwarp para realizar las mascaras, llamandolo desde el modulo subprocess
        cmd = ["gdalwarp", "-dstnodata" , "0" , "-cutline", ]
        path_masks = os.path.join(ruta, 'masks')
        if not os.path.exists(path_masks):
            os.makedirs(path_masks)

        salida = os.path.join(path_masks, 'cloud_PN.TIF')
        cmd.insert(4, shape)
        cmd.insert(5, crop)
        cmd.insert(6, cloud)
        cmd.insert(7, salida)

        proc = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        stdout,stderr=proc.communicate()
        exit_code=proc.wait()

        if exit_code: 
            raise RuntimeError(stderr)


        ds = gdal.Open(salida)
        cloud = np.array(ds.GetRasterBand(1).ReadAsArray())


        mask = (cloud == 21824) | (cloud == 21952)


        cloud_msk = cloud[mask]
        #print(cloud_msk.size)
        clouds = float(cloud_msk.size*900) # número de píxeles por 900 porque es Landsat (30*30). Habría que coger ese valor de la banda
        PN = 340055760.83 #Aqui se debería cambiar esto para que leyera el área de la geometría que se le pase
        pn_cover = round(100 - (clouds/PN)*100, 2)
        ds = None
        cloud = None
        cloud_msk = None
        clouds = None
        #Insertamos la cobertura de nubes en la BD
        connection = pymongo.MongoClient("mongodb://localhost")
        db=connection.teledeteccion
        landsat = db.landsat

        try:

            landsat.update_one({'_id':self.escena}, {'$set':{'Clouds.cloud_PN': pn_cover}},  upsert=True)

        except Exception as e:
            print("Unexpected error:"), type(e), e

        print("El porcentaje de nubes en el Parque Nacional es de " + str(pn_cover))      
        
        

In [2]:
ruta = '/home/last/Escritorio/AEP/Dia2/Landsat/SR/LC08_L2SP_202034_20230602_20230607_02_T1'
#Instanciamos la clase
L = Landsat(ruta)
#Ejecutamos el NDVI
L.ndvi()

/home/last/Escritorio/AEP/Dia2/Landsat/SR/LC08_L2SP_202034_20230602_20230607_02_T1/LC08_L2SP_202034_20230602_20230607_02_T1_QA_PIXEL.TIF
Landsat iniciada con éxito


In [None]:
#Aplicamos el proceso en bucle a un directorio con varias escenas
import os

ruta_sr = '/home/last/Escritorio/AEP/Dia2/Landsat/SR'

for i in os.listdir(ruta_sr):
    nruta = os.path.join(ruta_sr, i)
    print(nruta)
    if os.path.isdir(nruta) and i.startswith('L'):
        try:
            a = Landsat(nruta)
            #a.ndvi()
            a.flood()
        except Exception as e:
            print(e)
            continue    