This notebook exports Landsat satellite image composites from Google Earth Engine saved in gzipped TFRecord format (*.tfrecord.gz).

For this project, I download satellite images corresponding to 2 different datasets:

DHS: 19,669 clusters from DHS surveys, for which we predict cross-sectional (i.e., static in time) cluster-level asset wealth
DHSNL: 260,415 locations sampled near DHS survey locations, for which we train transfer learning models to predict nightlights values

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%cd c:/users/hp/desktop/kenya_poverty

c:\users\hp\desktop\kenya_poverty


# Downloading TFRecords

In [3]:
from __future__ import annotations

import math
import time 
import ee
import numpy as np
import pandas as pd
from collections.abc import Mapping
from typing import Any, Optional
from tqdm.auto import tqdm

In [4]:
ee.Authenticate()

Enter verification code: 4/1AX4XfWhpzjz_y-lUuSqHhkgEj_0MaWDJfZCHsGvWyTKi3cnq8iogrxftugI

Successfully saved authorization token.


In [5]:
ee.Initialize()

In [6]:
# export location parameters
DHS_EXPORT_FOLDER = 'dhs_tfrecords_raw'
DHSNL_EXPORT_FOLDER = 'dhsnl_tfrecords_raw'
#LSMS_EXPORT_FOLDER = 'lsms_tfrecords_raw'

# input data paths
DHS_CSV_PATH = 'data/ke_dhs_clusters.csv'
DHSNL_CSV_PATH = 'data/ke_dhsnl_locs.csv'
#LSMS_CSV_PATH = 'data/lsms_clusters.csv'

# band names
MS_BANDS = ['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2', 'TEMP1']

# image parameters
PROJECTION = 'EPSG:3857'  # see https://epsg.io/3857
SCALE = 30                # export resolution: 30m/px
EXPORT_TILE_RADIUS = 127  # image dimension = (2*EXPORT_TILE_RADIUS) + 1 = 255px

# Setting up utils for downloading data from Google Earth Engine API

In [7]:
def df_to_fc(df, lat_colname='lat', lon_colname='lon'):
    '''Create a ee.FeatureCollection from a pd.DataFrame.
    Args
    - csv_path: str, path to CSV file that includes at least two columns for
        latitude and longitude coordinates
    - lat_colname: str, name of latitude column
    - lon_colname: str, name of longitude column
    Returns: ee.FeatureCollection, contains one feature per row in the CSV file
    '''
    df = df.astype('object')
    ee_features = []
    for i in range(len(df)):
        props = df.iloc[i].to_dict()

        _geometry = ee.Geometry.Point([props[lon_colname], props[lat_colname],])
        ee_feat = ee.Feature(_geometry, props)
        ee_features.append(ee_feat)
        
    return ee.FeatureCollection(ee_features)


def surveyyear_to_range(survey_year, nl=False):
    '''Returns the start and end dates for filtering satellite images for a survey beginning in the specified year.
    Args
    - survey_year: int, year that survey was started
    - nl: bool, whether to use special range for night lights
    Returns
    - start_date: str, start date for filtering satellite images (yyyy-mm-dd)
    - end_date: str, end date for filtering satellite images (yyyy-mm-dd)
    '''
    if 2009 <= survey_year and survey_year <= 2011:
        start_date = '2009-1-1'
        end_date = '2011-12-31'
    elif 2012 <= survey_year and survey_year <= 2014:
        start_date = '2012-1-1'
        end_date = '2014-12-31'
    elif 2015 <= survey_year and survey_year <= 2017:
        start_date = '2015-1-1'
        end_date = '2017-12-31'
    else:
        raise ValueError(f'Invalid survey_year: {survey_year}.\nMust be between 2009 and 2017 (inclusive)')
    return start_date, end_date

def decode_qamask(img: ee.Image) -> ee.Image:
    '''
    Args
    - img: ee.Image, Landsat 5/7/8 image containing 'pixel_qa' band
    Returns
    - masks: ee.Image, contains 5 bands of masks
    Pixel QA Bit Flags (universal across Landsat 5/7/8)
    Bit  Attribute
    0    Fill
    1    Clear
    2    Water
    3    Cloud Shadow
    4    Snow
    5    Cloud
    '''
    qa = img.select('pixel_qa')
    clear = qa.bitwiseAnd(2).neq(0)  # 0 = not clear, 1 = clear
    clear = clear.updateMask(clear).rename(['pxqa_clear'])

    water = qa.bitwiseAnd(4).neq(0)  # 0 = not water, 1 = water
    water = water.updateMask(water).rename(['pxqa_water'])

    cloud_shadow = qa.bitwiseAnd(8).eq(0)  # 0 = shadow, 1 = not shadow
    cloud_shadow = cloud_shadow.updateMask(cloud_shadow).rename(['pxqa_cloudshadow'])

    snow = qa.bitwiseAnd(16).eq(0)  # 0 = snow, 1 = not snow
    snow = snow.updateMask(snow).rename(['pxqa_snow'])

    cloud = qa.bitwiseAnd(32).eq(0)  # 0 = cloud, 1 = not cloud
    cloud = cloud.updateMask(cloud).rename(['pxqa_cloud'])

    masks = ee.Image.cat([clear, water, cloud_shadow, snow, cloud])
    return masks

def mask_qaclear(img: ee.Image) -> ee.Image:
    '''
    Args
    - img: ee.Image, Landsat 5/7/8 image containing 'pixel_qa' band
    Returns
    - img: ee.Image, input image with cloud-shadow, snow, cloud, and unclear
        pixels masked out
    '''
    qam = decode_qamask(img)
    cloudshadow_mask = qam.select('pxqa_cloudshadow')
    snow_mask = qam.select('pxqa_snow')
    cloud_mask = qam.select('pxqa_cloud')
    return img.updateMask(cloudshadow_mask).updateMask(snow_mask).updateMask(cloud_mask)


def add_latlon(img: ee.Image) -> ee.Image:
    '''Creates a new ee.Image with 2 added bands of longitude and latitude coordinates named 'LON' and 'LAT', respectively
    '''
    latlon = ee.Image.pixelLonLat().select(opt_selectors=['longitude', 'latitude'], opt_names=['LON', 'LAT'])
    return img.addBands(latlon)


def composite_nl(year: int) -> ee.Image:
    '''Creates a median-composite nightlights (NL) image.
    Args
    - year: int, start year of survey
    Returns: ee.Image, contains a single band named 'NIGHTLIGHTS'
    '''
    img_col = ee.ImageCollection('NOAA/VIIRS/DNB/MONTHLY_V1/VCMSLCFG')

    start_date, end_date = surveyyear_to_range(year, nl=True)
    return img_col.filterDate(start_date, end_date).median().select([0], ['NIGHTLIGHTS'])


def tfexporter(collection, export, prefix, fname, selectors=None,dropselectors=None, bucket=None):
    '''Creates and starts a task to export a ee.FeatureCollection to a TFRecord file in Google Drive or Google Cloud 
        Storage.
    Args
    - collection: ee.FeatureCollection
    - export: str, 'drive' for Drive, 'gcs' for GCS
    - prefix: str, folder name in Drive or GCS to export to, no trailing '/'
    - fname: str, filename
    - selectors: None or ee.List of str, names of properties to include in
        output, set to None to include all properties
    - dropselectors: None or ee.List of str, names of properties to exclude
    - bucket: None or str, name of GCS bucket, only used if export=='gcs'
    Returns
    - task: ee.batch.Task
    '''
    if dropselectors is not None:
        if selectors is None:
            selectors = collection.first().propertyNames()
        selectors = selectors.removeAll(dropselectors)

    if export == 'gcs':
        task = ee.batch.Export.table.toCloudStorage(collection=collection, description=fname, bucket=bucket,
                                                    fileNamePrefix=f'{prefix}/{fname}', fileFormat='TFRecord',
                                                    selectors=selectors)
    elif export == 'drive':
        task = ee.batch.Export.table.toDrive(collection=collection, description=fname, folder=prefix, fileNamePrefix=fname,
                                             fileFormat='TFRecord', selectors=selectors)
    else:
        raise ValueError(f'export "{export}" is not one of ["gcs", "drive"]')

    task.start()
    return task


def sample_patch(point, patches_array, scale):
    '''Extracts an image patch at a specific point.
    Args
    - point: ee.Feature
    - patches_array: ee.Image, Array Image
    - scale: int or float, scale in meters of the projection to sample in
    Returns: ee.Feature, 1 property per band from the input image
    '''
    arrays_samples = patches_array.sample(region=point.geometry(), scale=scale, projection='EPSG:3857', factor=None,
                                          numPixels=None, dropNulls=False, tileScale=12)
    return arrays_samples.first().copyProperties(point)


def get_array_patches(img: ee.Image, scale, ksize, points, export, prefix, fname, selectors=None, dropselectors=None,
                      bucket=None):
    '''Creates and starts a task to export square image patches in TFRecord format to Google Drive or Google Cloud Storage
    (GCS). The image patches are sampled from the given ee.Image at specific coordinates.
    Args
    - img: ee.Image, image covering the entire region of interest
    - scale: int or float, scale in meters of the projection to sample in
    - ksize: int or float, radius of square image patch
    - points: ee.FeatureCollection, coordinates from which to sample patches
    - export: str, 'drive' for Google Drive, 'gcs' for GCS
    - prefix: str, folder name in Drive or GCS to export to, no trailing '/'
    - fname: str, filename for export
    - selectors: None or ee.List, names of properties to include in output,
        set to None to include all properties
    - dropselectors: None or ee.List, names of properties to exclude
    - bucket: None or str, name of GCS bucket, only used if export=='gcs'
    Returns: ee.batch.Task
    '''
    kern = ee.Kernel.square(radius=ksize, units='pixels')
    patches_array = img.neighborhoodToArray(kern)

    # ee.Image.sampleRegions() does not cut it for larger collections,
    # using mapped sample instead
    samples = points.map(lambda pt: sample_patch(pt, patches_array, scale))

    # export to a TFRecord file which can be loaded directly in TensorFlow
    return tfexporter(collection=samples, export=export, prefix=prefix, fname=fname, selectors=selectors,
                      dropselectors=dropselectors, bucket=bucket)


In [8]:
class LandsatSR:
    def __init__(self, filterpoly, start_date, end_date):
        '''
        Args
        - filterpoly: ee.Geometry
        - start_date: str, string representation of start date
        - end_date: str, string representation of end date
        '''
        self.filterpoly = filterpoly
        self.start_date = start_date
        self.end_date = end_date
        self.l8 = self.init_coll('LANDSAT/LC08/C01/T1_SR').map(self.rename_l8).map(self.rescale_l8)
        self.l7 = self.init_coll('LANDSAT/LE07/C01/T1_SR').map(self.rename_l57).map(self.rescale_l57)
        self.l5 = self.init_coll('LANDSAT/LT05/C01/T1_SR').map(self.rename_l57).map(self.rescale_l57)
        self.merged = self.l5.merge(self.l7).merge(self.l8).sort('system:time_start')

    def init_coll(self, name: str) -> ee.ImageCollection:
        '''
        Creates a ee.ImageCollection containing images of desired points
        between the desired start and end dates.
        Args
        - name: str, name of collection
        Returns: ee.ImageCollection
        '''
        return (ee.ImageCollection(name).filterBounds(self.filterpoly).filterDate(self.start_date, self.end_date))

    @staticmethod
    def rename_l8(img: ee.Image) -> ee.Image:
        '''
        Args
        - img: ee.Image, Landsat 8 image
        Returns
        - img: ee.Image, with bands renamed
        See: https://developers.google.com/earth-engine/datasets/catalog/LANDSAT_LC08_C01_T1_SR

        Name       Scale Factor Description
        B1         0.0001       Band 1 (Ultra Blue) surface reflectance, 0.435-0.451 um
        B2         0.0001       Band 2 (Blue) surface reflectance, 0.452-0.512 um
        B3         0.0001       Band 3 (Green) surface reflectance, 0.533-0.590 um
        B4         0.0001       Band 4 (Red) surface reflectance, 0.636-0.673 um
        B5         0.0001       Band 5 (Near Infrared) surface reflectance, 0.851-0.879 um
        B6         0.0001       Band 6 (Shortwave Infrared 1) surface reflectance, 1.566-1.651 um
        B7         0.0001       Band 7 (Shortwave Infrared 2) surface reflectance, 2.107-2.294 um
        B10        0.1          Band 10 brightness temperature (Kelvin), 10.60-11.19 um
        B11        0.1          Band 11 brightness temperature (Kelvin), 11.50-12.51 um
        sr_aerosol              Aerosol attributes, see Aerosol QA table
        pixel_qa                Pixel quality attributes, see Pixel QA table
        radsat_qa               Radiometric saturation QA, see Radsat QA table
        '''
        newnames = ['AEROS', 'BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2', 'TEMP1', 'TEMP2', 'sr_aerosol', 'pixel_qa', 
                    'radsat_qa']
        return img.rename(newnames)

    @staticmethod
    def rescale_l8(img: ee.Image) -> ee.Image:
        '''
        Args
        - img: ee.Image, Landsat 8 image, with bands already renamed
            by rename_l8()
        Returns
        - img: ee.Image, with bands rescaled
        '''
        opt = img.select(['AEROS', 'BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2'])
        therm = img.select(['TEMP1', 'TEMP2'])
        masks = img.select(['sr_aerosol', 'pixel_qa', 'radsat_qa'])

        opt = opt.multiply(0.0001)
        therm = therm.multiply(0.1)

        scaled = ee.Image.cat([opt, therm, masks]).copyProperties(img)
        # system properties are not copied
        scaled = scaled.set('system:time_start', img.get('system:time_start'))
        return scaled

    @staticmethod
    def rename_l57(img: ee.Image) -> ee.Image:
        '''
        Args
        - img: ee.Image, Landsat 5/7 image
        Returns
        - img: ee.Image, with bands renamed
        
        See: https://developers.google.com/earth-engine/datasets/catalog/LANDSAT_LT05_C01_T1_SR
             https://developers.google.com/earth-engine/datasets/catalog/LANDSAT_LE07_C01_T1_SR

        Name             Scale Factor Description
        B1               0.0001       Band 1 (blue) surface reflectance, 0.45-0.52 um
        B2               0.0001       Band 2 (green) surface reflectance, 0.52-0.60 um
        B3               0.0001       Band 3 (red) surface reflectance, 0.63-0.69 um
        B4               0.0001       Band 4 (near infrared) surface reflectance, 0.77-0.90 um
        B5               0.0001       Band 5 (shortwave infrared 1) surface reflectance, 1.55-1.75 um
        B6               0.1          Band 6 brightness temperature (Kelvin), 10.40-12.50 um
        B7               0.0001       Band 7 (shortwave infrared 2) surface reflectance, 2.08-2.35 um
        sr_atmos_opacity 0.001        Atmospheric opacity; < 0.1 = clear; 0.1 - 0.3 = average; > 0.3 = hazy
        sr_cloud_qa                   Cloud quality attributes, see SR Cloud QA table. Note:
                                          pixel_qa is likely to present more accurate results
                                          than sr_cloud_qa for cloud masking. See page 14 in
                                          the LEDAPS product guide.
        pixel_qa                      Pixel quality attributes generated from the CFMASK algorithm,
                                          see Pixel QA table
        radsat_qa                     Radiometric saturation QA, see Radiometric Saturation QA table
        '''
        newnames = ['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'TEMP1', 'SWIR2', 'sr_atmos_opacity', 'sr_cloud_qa', 
                    'pixel_qa', 'radsat_qa']
        return img.rename(newnames)

    @staticmethod
    def rescale_l57(img: ee.Image) -> ee.Image:
        '''
        Args
        - img: ee.Image, Landsat 5/7 image, with bands already renamed
            by rename_157()
        Returns
        - img: ee.Image, with bands rescaled
        '''
        opt = img.select(['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2'])
        atmos = img.select(['sr_atmos_opacity'])
        therm = img.select(['TEMP1'])
        masks = img.select(['sr_cloud_qa', 'pixel_qa', 'radsat_qa'])

        opt = opt.multiply(0.0001)
        atmos = atmos.multiply(0.001)
        therm = therm.multiply(0.1)

        scaled = ee.Image.cat([opt, therm, masks, atmos]).copyProperties(img)
        # system properties are not copied
        scaled = scaled.set('system:time_start', img.get('system:time_start'))
        return scaled


In [9]:
def export_images(df, country, year, export_folder):
    '''
    Args
    - df: pd.DataFrame, contains columns ['lat', 'lon', 'country', 'year']
    - country: str, together with `year` determines the survey to export
    - year: int, together with `country` determines the survey to export
    - export_folder: str, name of folder for export
    Returns: dict, maps task name tuple (export_folder, country, year) to ee.batch.Task
    '''
    subset_df = df[(df['country'] == country) & (df['year'] == year)].reset_index(drop=True)
    
    tasks = {}

    fc = df_to_fc(subset_df)
    start_date, end_date = surveyyear_to_range(year)

    # create 3-year Landsat composite image
    roi = fc.geometry()
    imgcol = LandsatSR(roi, start_date=start_date, end_date=end_date).merged
    imgcol = imgcol.map(mask_qaclear).select(MS_BANDS)
    img = imgcol.median()

    # add nightlights, latitude, and longitude bands
    img = add_latlon(img)
    img = img.addBands(composite_nl(year))

    fname = f'{country}_{year}'
    tasks[(export_folder, country, year)] = get_array_patches(img=img, scale=SCALE, ksize=EXPORT_TILE_RADIUS, points=fc, 
                                                              export='drive', fname=fname, bucket=None, prefix=export_folder)
    return tasks

In [10]:
tasks = {}

In [11]:
dhs_df = pd.read_csv(DHS_CSV_PATH, float_precision='high', index_col=False)
dhs_surveys = list(dhs_df.groupby(['country', 'year']).groups.keys())

for country, year in dhs_surveys:
    new_tasks = export_images(df=dhs_df, country=country, year=year, export_folder=DHS_EXPORT_FOLDER)
    tasks.update(new_tasks)

"dhs_df = pd.read_csv(DHS_CSV_PATH, float_precision='high', index_col=False)\ndhs_surveys = list(dhs_df.groupby(['country', 'year']).groups.keys())\n\nfor country, year in dhs_surveys:\n    new_tasks = export_images(df=dhs_df, country=country, year=year, export_folder=DHS_EXPORT_FOLDER)\n    tasks.update(new_tasks)"

In [12]:
dhsnl_df = pd.read_csv(DHSNL_CSV_PATH, float_precision='high', index_col=False)
dhsnl_surveys = list(dhsnl_df.groupby(['country', 'year']).groups.keys())

for country, year in dhsnl_surveys:
    new_tasks = export_images(df=dhsnl_df, country=country, year=year, export_folder=DHSNL_EXPORT_FOLDER)
    tasks.update(new_tasks)

"dhsnl_df = pd.read_csv(DHSNL_CSV_PATH, float_precision='high', index_col=False)\ndhsnl_surveys = list(dhsnl_df.groupby(['country', 'year']).groups.keys())\n\nfor country, year in dhsnl_surveys:\n    new_tasks = export_images(df=dhsnl_df, country=country, year=year, export_folder=DHSNL_EXPORT_FOLDER)\n    tasks.update(new_tasks)"

# Downloading shapefiles

In [None]:
#!/bin/bash

# This Bash script downloads shapefiles from GADM v3.6 into the
# data/shapefiles/ directory.
#
# Run this script from within the preprocessing/ directory.
#
# Prerequisites: None.


africa_country_codes=("KEN") # Kenya


dhs_country_codes=("KEN")  # Kenya)

echo "Getting shapefiles for KEN"

# download ZIP'ed shapefiles from GADM v3.6
wget --no-verbose --show-progress "https://biogeo.ucdavis.edu/data/gadm3.6/shp/gadm36_KEN_shp.zip"

unzip -o "gadm36_KEN_shp.zip" *_2.* -d "gadm36_KEN_shp"

# delete the zip file
rm "gadm36_${code}_shp.zip"


In [16]:
import os
#os.mkdir('data/shapefiles')

In [2]:
%cd data/shapefiles

C:\Users\HP\Desktop\kenya_poverty\data\shapefiles


In [3]:
%pwd

'C:\\Users\\HP\\Desktop\\kenya_poverty\\data\\shapefiles'

In [25]:
%pip install wget

Collecting wget
  Downloading wget-3.2.zip (10 kB)
Building wheels for collected packages: wget
  Building wheel for wget (setup.py): started
  Building wheel for wget (setup.py): finished with status 'done'
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9680 sha256=2f000c0a56a09737cd61747e7756e5cd272c5fbbdd997ced9abf69373bb5de95
  Stored in directory: c:\users\hp\appdata\local\pip\cache\wheels\bd\a8\c3\3cf2c14a1837a4e04bd98631724e81f33f462d86a1d895fae0
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2
Note: you may need to restart the kernel to use updated packages.


In [4]:
import wget

url = "https://biogeo.ucdavis.edu/data/gadm3.6/shp/gadm36_KEN_shp.zip"
wget.download(url)

100% [........................................................................] 20994263 / 20994263

'gadm36_KEN_shp.zip'

In [5]:
import zipfile

with zipfile.ZipFile('gadm36_KEN_shp.zip', 'r') as zipf:
    zipf.extractall()