# Purpose

This code downloads Sentinel 2 Images belonging to DHS Survey clusters to your Google Drive.
To download the images for a specific DHS-Survey you need the Geodata zipfile belonging to it. This zipfile needs to have a dbf-file which is used to extract the location and the residence type (urban or rural).
Please note that cluster belonging to the refugee type will be ignored (only very few cluster belong to this type). Furthermore, to avoid that the Google Drive runs out of memory space, you may use the gdrive programs (also uploaded to github).



In [2]:
import sys
print(sys.version)


3.8.10 (default, Jun  2 2021, 10:49:15) 
[GCC 9.4.0]


In [3]:
import ee
#Authenticate may be require the first time, afterwards usually (hence at the beginning uncommenting may be desired)
#ee.Authenticate()
ee.Initialize()

In [4]:
import geemap
import os
import functools
from zipfile import ZipFile
from dbfread import DBF
from dbfread import FieldParser
import shutil
import pandas as pd
from csv import DictReader
import time

#to dos:
#please add more comments what a function or a group of functions does
#generalize as a script with variables at the top
#ensure a whole year of data gets used for 2015 - use 2015-06-01 until 2016-07-01

In [5]:
# Function to get a square around point of interest
def bounding_box(loc:ee.geometry.Geometry, urban_rural:str, urban_radius:int, rural_radius:int):
    #Different size of the box for getting the sentinel images depending on the residence type due to different scattering (see DHS Survey)
    if urban_rural == 'U'or  urban_rural == 'u':
        size = urban_radius
    elif urban_rural == 'R' or urban_rural =='r':
        size = rural_radius

    intermediate_buffer = loc.buffer(size) #buffer radius, half your box width in m
    intermediate_box = intermediate_buffer.bounds() #Draw a bounding box around the circle
    
    return(intermediate_box)

In [6]:
#Masking of clouds -> Filters pixel w.r.t to their probability of being cloudy or not 
def maskClouds(img:ee.image.Image, MAX_CLOUD_PROBABILITY:int):
    
    clouds = ee.Image(img.get('cloud_mask')).select('probability')
    isNotCloud = clouds.lt(MAX_CLOUD_PROBABILITY)
    
    return img.updateMask(isNotCloud)

In [7]:
#Masking of edges
def maskEdges(s2_img:ee.image.Image):
    return s2_img.updateMask(s2_img.select('B8A').mask().updateMask(s2_img.select('B9').mask()))

In [8]:
def get_image(cluster:dict, survey_name:str, urban_radius:int, rural_radius:int, MAX_CLOUD_PROBABILITY:int):
    '''Main function to extract a single Sentinel2  image given a cluster dictionary containing the 
    latidude, longitude; the output is an image constructed from multiple Sentinel images such that the output image
    is as cloudless as possible given a certain timeframe'''
    
    #Get images collections
    s2Sr = ee.ImageCollection('COPERNICUS/S2')
    s2Clouds = ee.ImageCollection('COPERNICUS/S2_CLOUD_PROBABILITY')

    #Get time span -> if survey year is before 2016 we use a predefined time frame as given below, because the acquistion
    # of sentinel data started in May 2015
    year_uncut = str(cluster["year"])
    year = year_uncut[:year_uncut.rfind('.')]
    if int(year)<2016:
        START_DATE = ee.Date('2015-06-01')
        END_DATE = ee.Date('2016-07-01')
    else:
        START_DATE = ee.Date(year+'-01-01')
        END_DATE = ee.Date(year+'-12-31')
    
   
    #Point of interest (longitude, latidude); this is the center of the output image 
    lat_float = float(cluster["latidude"])
    lon_float = float(cluster["longitude"])                 
    loc = ee.Geometry.Point([lon_float, lat_float])
    #Region of interest; around the point of interest we define the region (size depends on rural or urban type)
    # the region of interest is a square
    region = bounding_box(loc, cluster['urban_rural'], urban_radius, rural_radius)

    # Filter input collections by desired data range and region.
    s2Sr = s2Sr.filterBounds(region).filterDate(START_DATE, END_DATE).map(maskEdges)
    s2Clouds = s2Clouds.filterBounds(region).filterDate(START_DATE, END_DATE)

    # Join S2 with cloud probability dataset to add cloud mask.
    s2SrWithCloudMask = ee.Join.saveFirst('cloud_mask').apply(
      primary =  s2Sr, 
      secondary = s2Clouds, 
      condition = ee.Filter.equals(
          leftField =  'system:index', rightField = 'system:index') 
        )
    
    #Masking the image
    maskCloudsWithProb = functools.partial(maskClouds, MAX_CLOUD_PROBABILITY = MAX_CLOUD_PROBABILITY)
    s2CloudMasked = ee.ImageCollection(s2SrWithCloudMask).map(maskCloudsWithProb).median()
    #Select which bands to keep
    s2CloudMasked = s2CloudMasked.select(['B1','B2','B3', 'B4', 'B5', 'B6', 'B7', 'B8','B8A', 'B9', 'B10'\
                                         ,'B11','B12']).clip(region)
    
    #Saving location/directory to Google Drive (#commented part if you want to save it locally directly, but the
    #size of the images has it maximum size much lower compared to downloading it to Google Drive first)
    #-> name of the img is the surveyname+0's+cluster_number
    #out_dir = os.path.join(survey_dir, cluster["ID-cluster"]+'.tif')
    #geemap.ee_export_image(s2CloudMasked, filename=out_dir, scale=10)
    filename = cluster["ID-cluster"]
    filename = filename.replace(filename[:6], survey_name)
    task = ee.batch.Export.image.toDrive(s2CloudMasked, description = filename, folder = 'sentinel', scale = 10)
    task.start()
    print('Created', filename)
    
    
    return loc

In [9]:
def get_dbf(listOfFileNames:list, newpath:str, zipObject:zipfile.ZipFile):
    
    # Extract solely the dbf-file from the zip and save them into a seperate folder defined before (newpath)
    for fileName in listOfFileNames:
        if fileName.endswith('.dbf') or fileName.endswith('.DBF'):
            # Extract a single file from zip
            zipObject.extract(fileName, newpath)
            
#For the case if a zip folder includes zip folders: Take zip folder to get dbf within main zip folder which is bigger in size (bytes)           
def extract_bigger_zip(zip_dir:str,filenames:str,newpath:str):
    
    #Create temporary directory for this survey
    zips_dir = zip_dir[:zip_dir.find('.')]
    if not os.path.exists(zips_dir):
        os.makedirs(zips_dir)
        
    #Extract the zip folders into the directory    
    with ZipFile(zip_dir, 'r') as zipObj:
    #Extract all the contents of zip file in different directory
        zipObj.extractall(zips_dir)

    big_size = 0
    big_zip = None
    
    #Compare zip folders and take the biggest one
    for item in filenames:
        if item.endswith('.zip') or item.endswith('.ZIP'):
            file_dir = os.path.join(zips_dir, item)
            curr_size= os.stat(file_dir).st_size
            if curr_size >= big_size:
                big_size = curr_size
                big_zip = file_dir
    
    #Now check via check_dbf if dbf-file is given or if you have again zip-files within the current zip file
    check_dbf(big_zip,newpath)    
    
#Second mainpart for single zip file        
def check_dbf(zip_dir:str, newpath:str):
    
    big_zip = None

    #Get list of all elements within the zipfile
    with ZipFile(zip_dir, 'r') as zipObject:
        listOfFileNames = zipObject.namelist()

        #Check if any element within the zip is a dbf file if yes jump into function get_dbf
        if any((element.endswith('.dbf') or element.endswith('.DBF')) for element in listOfFileNames):
            get_dbf(listOfFileNames, newpath, zipObject)
        
        #If "if-clause" not true, check here if any element is a zip file if yes jump into extract_bigger_zip
        elif any((element.endswith('.zip') or element.endswith('.ZIP')) for element in listOfFileNames):
            extract_bigger_zip(zip_dir, listOfFileNames, newpath)  

def delete_zip_folders(dir_name,newpath):
    #Delete all directories which may be created during the process to figure out which zip folder is bigger             
    list_subfolders_with_paths = [f.path for f in os.scandir(dir_name) if f.is_dir()]
    for folder in list_subfolders_with_paths:
        if not folder == newpath:
            shutil.rmtree(folder)
            
#Main part-> runs through all zip files in the directory  
def extract_dbf(dir_name:str, newpath:str):
    '''The GEO DHS Survey retrieved contain usually a dbf-file from which we can gather all relevant information
    such as location cluster name and residence type; this part retrieves the dbf files, extracts the relevant
    information and saves them as csv file for each survey'''
    
    #Create folder for DBF files
    if not os.path.exists(newpath):
        os.makedirs(newpath)
    #List of all survey zips in the directory dir_name    
    folders = os.listdir(dir_name)
    
    #Extract dbf file (if existing) from each zip file via check_dbf
    for item in folders:
        if item.endswith('.zip') or item.endswith('.ZIP'):
            zip_dir = os.path.join(dir_name, item)
            check_dbf(zip_dir, newpath)
     
    #Delete all directories via the following function
    delete_zip_folders(dir_name,newpath)
    

NameError: name 'zipfile' is not defined

In [None]:
#Needed as a file may contain \x00\ which the standard libary is not able to process, hence an additional class based on the following issue https://github.com/olemb/dbfread/issues/20 is used 
class MyFieldParser(FieldParser):
    def parseN(self, field, data):
        data = data.strip().strip(b'*\x00')  # Had to strip out the other characters first before \x00, as per super function specs.
        return super(MyFieldParser, self).parseN(field, data)

    def parseD(self, field, data):
        data = data.strip(b'\x00')
        return super(MyFieldParser, self).parseD(field, data)
    
    
#Create single CSV file for the current dbf file belonging to a specific survey (dbf_path)   
def get_csv(dbf_path:str, dir_csv:str):

    clusters = list()
    
    #get surveyname
    filename = os.path.basename(dbf_path[:dbf_path.rfind('.')])
    
    #Ignore all filenames ending with SR as they are not containing the necessary information required to retrived the
    #sentinel data later one
    if not filename.endswith('SR'):
        #read in dbf file
        table = DBF(dbf_path, parserclass=MyFieldParser)
        #Extract for each record/cluster of the survey the relevant information and add them to the cluster list
        for record in table:
            cluster = {"ID-survey": filename, "ID-cluster" : record['DHSID'], "cluster": record['DHSCLUST'],\
                        "year": record["DHSYEAR"],"urban_rural": record['URBAN_RURA'], "latidude": record["LATNUM"],\
                      "longitude": record['LONGNUM']}
            clusters.append(cluster)
        
        #Transform cluster list into pandas dataframe
        clust_df = pd.DataFrame(clusters)
        
        #Define export directory; the name of the csv file is the survey name (identically wit the zip file name)
        export_name = filename+'.csv'
        export_dir = os.path.join(dir_csv, export_name)
        #Save the cluster dataframe to the defined export directory
        clust_df.to_csv(export_dir)
    else:
        #Print all surveys/dbf files where no csv file was creates as the dbf files were ending on SR
        print(filename)


def create_csv(dir_dbf:str, dir_csv:str):
    
    '''Create CSV files for each survey (if they have a dbf file); The information for the csv files are extracted
    from the dbf file; the final csv file contains the following columns ID-survey (survey name), ID-cluster (cluster name)
    , cluster (cluster number), urban_rural (the residence type), latitude (latidude geolocation), and longitude
    (the longitudal geolocation); for each cluster of the survey there is a seperate row'''
    
    #Create the general Csv-directory where all csv files are stored
    if not os.path.exists(dir_csv):
        os.makedirs(dir_csv)
    
    #List of all paths to the dbf files
    directory = os.listdir(dir_dbf)
    
    #Iterating through all dbf files and call the function get_csv
    for file in directory: 
        #print("This is the file", file)
        dbf_path = os.path.join (dir_dbf, file)
        get_csv(dbf_path, dir_csv)
        
    return 

In [None]:
#Move all csv-files from surveys which took place before 2013 to a seperate folder
def before_2013 (export_path:str, before_2013:str):
    
    if not os.path.exists(before_2013):
        os.makedirs(before_2013)

    directory = os.listdir(export_path)    
    for file in directory:
        
        if file.endswith('.csv'):
            csv_file = os.path.join(export_path, file)
            survey_year = pd.read_csv(csv_file, usecols = ['year'])
            #Move csv-file to seperate folder if the year <2013
            if survey_year['year'].max()< 2013:
                new_path = os.path.join(before_2013, file)
                os.rename(csv_file, new_path)

In [None]:

def get_survey_images(file_dir:str, survey_name:str, urban_radius:int, rural_radius:int, MAX_CLOUD_PROBABILITY:int):
    
    with open(file_dir, 'r') as read_obj:
    # pass the file object to DictReader() to get the DictReader object
        dict_reader = DictReader(read_obj)
    # get a list of dictionaries from dct_reader
        clusters = list(dict_reader)
    #Iterate through all clusters of the survey and execute get_image for each cluster to retrieve its corresponding sentinel image    
    for cluster in clusters:
        if not (cluster["latidude"] == 0 and cluster["longitude"]):
            loc = get_image(cluster, survey_name, urban_radius, rural_radius, MAX_CLOUD_PROBABILITY)
            #Wait a little bit (currently 45 s) until the code is further executed -> Reason: GEE takes a while to
            #retrieve the images. Consequently, to avoid too many queries to GEE at the same time and hence
            #, the undesirable stopping of the program, we need to implement this waiting function
            time.sleep(45)

#Main functions for getting the sentinel images; here: only the directory for each survey is created     
def sentinel_img_survey(img_dir:str, csv_dir:str, sentinel_done:str, urban_radius:int, rural_radius:int, MAX_CLOUD_PROBABILITY:int):
    '''Here is the main part to retrieve sentinel images (including the subfunctions); please not that with img_dir
    the diretory is created which should contain the zip folders for each survey containing the sentinel images for each
    cluster; however within this program only the directory and the temporary folders for each survey are created;
    the images are saved temporarily in a different directory via the gdrive-scripts as they need to be retrieved from
    Google Drive; furthermore after downloading all imags locally into this seperate directory you need to run 
    the program Sorting_sentinel-img_into_zips.ipynb to group the images w.r.t to their survey and create the zip folders'''
    if not os.path.exists(img_dir):
        os.makedirs(img_dir)
        
    #Create (if not already existing) sentinel_done txt file. This file stores the names of all surveys where
    #the images were already retrieved to skip them if the program is started again
    if not os.path.isfile(sentinel_done):
        open(sentinel_done, 'a').close()
     
    csv_directory = os.listdir(csv_dir)
    img_directory = os.listdir(img_dir)
    
    for file in csv_directory:
        if file.endswith('.csv'):
            filename  = file[:file.rfind('.')]
            #If survey is already done we skip this survey
            with open(sentinel_done) as f:
                if not filename in f.read():
                    #Create survey folder within the final directory of the images (zip folder directory see above)
                    survey_name = file[:file.rfind('.')]
                    survey_dir = os.path.join(img_dir, survey_name)
                    if not os.path.exists(survey_dir):
                        os.makedirs(survey_dir)
                    
                    #Retrieve the images via get_survey_images for current survey
                    file_dir = os.path.join(csv_dir, file)
                    get_survey_images(file_dir, survey_name, urban_radius, rural_radius, MAX_CLOUD_PROBABILITY)
                    
                    #Add survey to txt file which stores all surveys which are done to avoid downloading them again if you reload the program
                    file1 = open(sentinel_done,"w")#write mode
                    file1.write(file+"\n")
                    file1.close()
                    
                    print(file, 'finished')
                    

In [None]:
#Main Part

#Parameter
urban_radius = 1000 # meter
rural_radius = 5000 # meter
MAX_CLOUD_PROBABILITY = 20 # %


#Paths
#Directory of originally zip_files containing the dbf-files
zip_dir = "/mnt/datadisk/shannon/get_sentinel/GPS_Data"
#Directory where dbf files are stored
dbf_dir = os.path.join(zip_dir, "dbf_files")
#Directory where csv files are stored
csv_dir = os.path.join(zip_dir, "gps_csv")
#Subdirectory of csv_dir where all csv files are stored which belongs to surveys carried out before 2013
before_2013_dir = os.path.join(csv_dir, "before_2013")
#if directly to local computer
#img_dir = os.path.join(csv_dir, "tif_data")
#Directory where the final survey zips containing the sentinel images are stores
img_dir = '/mnt/datadisk/shannon/get_sentinel/sentinel_images_zip'
#Directory to txt files which contains all surveys where the images were already retrieved
sentinel_done = os.path.join(zip_dir, "sentinel_done.txt")


#Functions
extract_dbf(zip_dir, dbf_dir)
print('Extracted dbf files')
create_csv(dbf_dir, csv_dir)
print('created csv data')
before_2013(csv_dir, before_2013_dir)
print('Moved all surveys from before 2013 into seperate folder')
sentinel_img_survey(img_dir, csv_dir, sentinel_done, urban_radius, rural_radius, MAX_CLOUD_PROBABILITY)
