# 0. Installing requirements and importing packages

In [None]:
%pip install pyproj
%pip install boto3
%pip install rasterio
%pip install pystac_client
%pip install pyspark
%pip install numpy
%pip install matplotlib
%pip install -U attr
%pip install -U attrs
%pip install -U certifi
%pip install opencv-python

In [1]:
import boto3
import rasterio as rio
from rasterio.session import AWSSession
from json import load
from pystac_client import Client
from pyspark.sql import SparkSession
from rasterio.features import bounds
from pyproj import Transformer
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import cv2
import math
import os
from matplotlib.backends.backend_agg import FigureCanvasAgg
from io import BytesIO
os.environ["AWS_REQUEST_PAYER"] = "requester"
%matplotlib inline

In [2]:
aws_access_key_id=""                            #TODO: paste there your aws_access_key_id
aws_secret_access_key=""                        #TODO: paste there your aws_secret_access_key
output_bucket=""                                #TODO: paste there output bucket name

# 1. Functions

## 1.1. Creating area of land to fetch

In [33]:
def create_AOF():
    AOF = []
    for lon_deg in range(0,1):
        for lat_deg in range(0,1):
            area = {}
            area["type"] = "Polygon"
            area["coordinates"] = [[     
                    [
                      17.324752807617188+(lon_deg*0.802),
                      54.686335776526214-(0.584*lat_deg)
                    ],
                    [
                      17.573318481445312+(lon_deg*0.802),
                      54.686335776526214-(0.584*lat_deg)
                    ],
                    [
                      17.573318481445312+(lon_deg*0.802),
                      54.773563612845834-(0.584*lat_deg)
                    ],
                    [
                      17.324752807617188+(lon_deg*0.802),
                      54.773563612845834-(0.584*lat_deg)
                    ],
                    [
                      17.324752807617188+(lon_deg*0.802),
                      54.686335776526214-(0.584*lat_deg)
                    ]
                  ]]
            AOF.append(area)
    return AOF

## 1.2. Calculations used to classify water, desert, urban areas and forests

In [80]:
def create_images(bands):
    nir = bands[3].astype(float)
    green = bands[2].astype(float)
    blue = bands[1].astype(float)
    red = bands[0].astype(float)
    swir = bands[4].astype(float)
    bbox = bands[5]
    
    # water
    ndwi = (green - nir) / (green + nir)
    mndwi = (blue - nir) / (blue + nir)
    ndwi = np.nan_to_num(ndwi)
    mndwi = np.nan_to_num(mndwi)
    water_mask = ndwi + mndwi
    water_mask = (water_mask[:] > 0).astype(np.uint8) * 255
    water_mask = np.repeat(water_mask[..., None], 3, axis=2)
    
    # desert    
    ndsai = (swir - red) / (swir + red)
    ndsai = np.nan_to_num(ndsai)
    ndsai[ndsai < 0] = 0
    ndsai[ndsai > 0.1] = 0
    desert_mask = (ndsai[:] > 0).astype(np.uint8) * 255
    desert_mask = np.repeat(desert_mask[..., None], 3, axis=2)
    
    # forest
    ndvi = (nir - red)/(nir + red)
    ndvi[ndvi < 0.3] = 0
    ndvi = (ndvi[:] > 0).astype(np.uint8) * 255
    forest_mask = np.repeat(ndvi[..., None], 3, axis=2)

    # urban
    ndbi = (swir - nir) / (swir + nir)
    ndbi = np.nan_to_num(ndbi)
    ndbi[ndbi > 0.05] = 0
    ndbi[ndbi < -0.1] = 0
    urban_mask = (ndbi[:] != 0).astype(np.uint8) * 255
    urban_mask = np.repeat(urban_mask[..., None], 3, axis=2)

    # rgb
    red_scaled = ((red - np.min(red))/(np.max(red) - np.min(red)))
    green_scaled = ((green - np.min(green))/(np.max(green) - np.min(green)))
    blue_scaled = ((blue - np.min(blue))/(np.max(blue) - np.min(blue)))

    rgb = np.dstack((red_scaled, green_scaled, blue_scaled))
    
    # result
    mask = rgb.copy()
    mask[(desert_mask==255).all(-1)] = [255,255,0]
    mask[(urban_mask==255).all(-1)] = [255,0,0]
    mask[(forest_mask==255).all(-1)] = [0,255,0]
    mask[(water_mask==255).all(-1)] = [0,0,255]

    return (mask, rgb, bbox)

## 1.3. Resizing images and convertet them to bytes

In [72]:
def resize_imgs(bands):
    bbox = bands[2]
    output = cv2.resize(bands[0], (400,400), interpolation = cv2.INTER_AREA)
    rgb = cv2.resize(bands[1], (400,400), interpolation = cv2.INTER_AREA)
    return (output, rgb, bbox)
    

## 1.4. Reading tiff files from aws bucket

In [61]:
def readtiff(geotiff, bbox):
    with rio.Env(aws_session):
        with rio.open(geotiff) as geo_fp:
            Transf = Transformer.from_crs("epsg:4326", geo_fp.crs)
            lat_north, lon_west = Transf.transform(bbox[3], bbox[0])
            lat_south, lon_east = Transf.transform(bbox[1], bbox[2])
            x_top, y_top = geo_fp.index(lat_north, lon_west)
            x_bottom, y_bottom = geo_fp.index(lat_south, lon_east)
            # Define window in RasterIO
            window = rio.windows.Window.from_slices((x_top, x_bottom), (y_top, y_bottom))
            subset = geo_fp.read(1, window=window)
        return subset
        
    

## 1.5. Count black pixels

In [62]:
def assign_nonzero_value(img):
    img = img.astype(np.uint8)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    count = cv2.countNonZero(gray)
    return count

# 2. Creating spark app and spark context

In [38]:
#creating spark app and spark context
spark = (
    SparkSession
    .builder
    .appName("Landsat_detection")
#     .config("spark.jars.packages", "com.twelvemonkeys.imageio:imageio-tiff:3.5,com.twelvemonkeys.imageio:imageio-core:3.5")
    .config('spark.jars.packages', 'com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:3.3.0')
    .config("spark.driver.memory", "32g")
    .config("spark.driver.maxResultSize", "16g")
    .getOrCreate())
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3.impl", "org.apache.hadoop.fs.s3.S3")
spark._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", aws_access_key_id)
spark._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", aws_secret_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3.endpoint", "s3.us-east-1.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader", "true")

sc = spark.sparkContext

# 3. Accessing and reading landsat data

## 3.1 Opening Landsatstac server and aws session in order to fetch landsat images

In [39]:
LandsatSTAC = Client.open("https://landsatlook.usgs.gov/stac-server", headers=[])
aws_session = rio.session.AWSSession(boto3.Session(), requester_pays=True)

## 3.2. Creating an area of intrest and time period from which we want to extract satellite images from

In [40]:
AOF = create_AOF()
timeRange = '2021-02-01/2022-08-09'

## 3.3. Loading dataset

In [41]:
bands = []
for i, geometry in enumerate(AOF):
    LandsatSearch = LandsatSTAC.search(
        intersects=geometry,
        datetime=timeRange,
        query=['eo:cloud_cover<1'],
        collections=["landsat-c2l2-sr"])
    Landsat_items = [i.to_dict() for i in LandsatSearch.items()]
    bbox = bounds(geometry)
    for i, item in enumerate(Landsat_items):
        try: 
            red = item['assets']['red']['alternate']['s3']['href']
            blue = item['assets']['blue']['alternate']['s3']['href']
            green = item['assets']['green']['alternate']['s3']['href']
            nir08 = item['assets']['nir08']['alternate']['s3']['href']
            swir16 = item['assets']['swir16']['alternate']['s3']['href']

            red_arr = readtiff(red, bbox)
            blue_arr = readtiff(blue, bbox)
            green_arr = readtiff(green, bbox)
            nir08_arr = readtiff(nir08, bbox)
            swir16_arr = readtiff(swir16, bbox)
            bands.append((red_arr, blue_arr, green_arr, nir08_arr, swir16_arr, bbox))
        except Exception as e:
            print(e)
print(f"Finished loading dataset with {len(bands)} pictures")

Finished loading dataset with 9 pictures


# 4. Parallelize data with RDD and conduct calculations

In [81]:
#print(len(bands))
#na 150 jest EOF 
print(len(bands[0:10]))
oper_length = math.ceil(len(bands)/100)
print(oper_length)
ALL_IMGS = []

9
1


In [82]:
for i in range(oper_length):
    bands_opt = bands[i:(i+1)*100]
    rdd = sc.parallelize(bands_opt)
    imgs = rdd.map(lambda x: create_images(x))
    resize_im = imgs.map(lambda x: resize_imgs(x))
    assign_nonblack = resize_im.map(lambda x: (x, assign_nonzero_value(x[0])))
    filtered = assign_nonblack.filter(lambda x: x[1]/160000 > 0.9)
    imges = filtered.map(lambda x: x[0])
    #byte_imgs = imges.map(lambda x: convert2bytes(x))
    x=imges.collect()
    ALL_IMGS.append(x)

#imgs.saveAsPickleFile("s3://ASIAQFBSKQCM5N5KJE2Z:+mC9hqNGkDIPIBVUZwHA/ai8oJelp4m53UNUeeMh@s180265landsat/")

# 5. Visualization and saving results in the s3 bucket

In [None]:
x = ALL_IMGS[0]

fig, axarr = plt.subplots(1, 2)

axarr[1].set_title('Result image')
axarr[0].set_title('RGB image')
axarr[0].set_xlabel('Latitude')
axarr[0].set_ylabel('Longitude')
axarr[1].set_xlabel('Latitude')
axarr[1].set_ylabel('Longitude')

number_pic = 0

axarr[1].imshow(x[number_pic][0])
axarr[0].imshow(x[number_pic][1])

colors = ["r", "g", "b", "y", "black"]
texts = ["Urban", "Forest", "Water", "Desert", "Unknown"]
patches = [ mpatches.Patch(color=colors[i], label="{:s}".format(texts[i]) ) for i in range(len(texts)) ]
axarr[1].legend(handles=patches, bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0., prop={'size': 10})

num = 5

xticks = np.round(np.linspace(0, (x[number_pic][0]).shape[1], num), 2)
yticks = np.round(np.linspace(0, (x[number_pic][0]).shape[0], num), 2)
xticklabels = np.round(np.linspace(x[number_pic][2][0], x[number_pic][2][2], num), 2)
yticklabels = np.round(np.linspace(x[number_pic][2][3], x[number_pic][2][1], num), 2)

plt.setp(axarr, xticks=xticks, xticklabels=xticklabels, yticks=yticks, yticklabels=yticklabels)

plt.rcParams['figure.figsize'] = [20, 10]
plt.rcParams['figure.dpi'] = 200 
plt.show()

In [None]:
session = boto3.Session()
s3 = session.client("s3")
for i,chunk in enumerate(ALL_IMGS):
    for i,img in enumerate(chunk):
        np_bytes = BytesIO()
        np.save(np_bytes, img[0])
        np_bytes_class=np_bytes.getvalue()
        np_bytesy = BytesIO()
        np.save(np_bytesy, img[1])
        np_bytes_rgb=np_bytesy.getvalue()
        s3.put_object(Body = np_bytes_class, Bucket=output_bucket, Key=f'classified/{i}.txt')
        s3.put_object(Body = np_bytes_rgb, Bucket=output_bucket, Key=f'rgb/r{i}.txt')