## Package importing

In [1]:
# import packages
import os
from PIL import Image
import numpy as np
import pandas as pd
import pickle
import fnmatch
import json
import pyproj
import geopandas as gpd
import geojsonio
from datetime import datetime
from shapely.geometry import box,Point, Polygon
from shapely.geometry import mapping, shape

In [2]:
# import more packages
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col , column
import geopyspark as gps
from pyspark import SparkContext

## File Setup

In [4]:
file_location = '/mnt/sdb/nightlights/output/'
match_string_stable = '*stable_lights.avg_vis.tif'
match_string_raw = '*web.avg_vis.tif'
match_string_cf = '*web.cf_cvg.tif'
def file_list_find(match_string, file_location):
    list_files = []
    for r,d,f_list in os.walk(file_location):
        for f in f_list:
            if fnmatch.fnmatch(f,match_string):
                list_files.append((r[28:35],os.path.join(r,f)))
    
    list_files.sort()
    
    return(list_files)
list_files_raw = file_list_find(match_string_raw, file_location)
list_files_stable = file_list_find(match_string_stable, file_location)
list_files_cf = file_list_find(match_string_cf, file_location)

In [5]:
countries = gpd.read_file('countries.geojson')
print(countries.head())

         ADMIN ISO_A3                                           geometry
0        Aruba    ABW  POLYGON ((-69.99694 12.57758, -69.93639 12.531...
1  Afghanistan    AFG  POLYGON ((71.04980 38.40866, 71.05714 38.40903...
2       Angola    AGO  MULTIPOLYGON (((11.73752 -16.69258, 11.73851 -...
3     Anguilla    AIA  MULTIPOLYGON (((-63.03767 18.21296, -63.09952 ...
4      Albania    ALB  POLYGON ((19.74777 42.57890, 19.74601 42.57993...


In [6]:
# Create the SparkContext
conf = gps.geopyspark_conf(appName="geopyspark-example", master="local[*]")
conf.set("spark.executor.memory", '13g')
conf.set('spark.executor.cores', '4')
conf.set('spark.cores.max', '24')
conf.set("spark.driver.memory",'14g')
sc = SparkContext(conf=conf)

In [7]:
sc.getConf().getAll()

[('spark.kryo.registrator',
  'geopyspark.geotools.kryo.ExpandedKryoRegistrator'),
 ('spark.app.id', 'local-1586796903124'),
 ('spark.executor.id', 'driver'),
 ('spark.local.dir', '/mnt/nvme0n1p5/tmp'),
 ('spark.memory.offHeap.enabled', 'true'),
 ('spark.executor.cores', '4'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.driver.maxResultSize', '20g'),
 ('spark.rdd.compress', 'True'),
 ('spark.ui.enabled', 'false'),
 ('spark.driver.port', '35431'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.memory', '13g'),
 ('spark.master', 'local[*]'),
 ('spark.repl.local.jars',
  'file:///opt/anaconda3/lib/python3.7/site-packages/geopyspark/jars/geotrellis-backend-assembly-0.4.3.jar'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '192.168.0.137'),
 ('spark.cores.max', '24'),
 ('spark.app.name', 'geopyspark-example'),
 ('spark.driver.memory', '14g'),
 ('spark.jars',
  '/opt/anaconda3/lib/python3.7/site-packages/geopyspark/jars/g

In [8]:
# read in a few files
list_tile = []
years = []
Image.MAX_IMAGE_PIXELS = 933120000
for cam, im_file in list_files_stable[0:1]:
    im = Image.open(im_file)
    imarray=np.array(im)
    list_tile.append(gps.Tile.from_numpy_array(numpy_array=imarray, no_data_value=255))

Have to choose the correct epsg code, which determines the geometry of the data. 
    See Harvard's view of this same data using the epsg 4326 format.
    http://worldmap.harvard.edu/data/geonode:global_nightlights_KpX
    
More info on this geography system can be found here - https://epsg.io/4326 

But this is actually not the epsg format that most maps use. We actually need 3857 if we want to plot the data on a world map that the public is used to seeing https://epsg.io/3857

In [9]:
extent = gps.Extent(-65.0,-180.0, 75.0, 180.0)
extent
projected_extent = gps.ProjectedExtent(extent=extent, epsg=4326)

There are server limitations to the maximum amount of memory available in an individual JVM. The Java heap space must be large enough to deal with the large amounts of memory that we're working with. Unfortunately, even with increasing the executor memory on each worker and trying out different number of cores per worker, combining multiple images together led to Java heap space errors. This happened when there were 6 cores and 20gb of memory per worker. The workaround is to process each image individually due to this potential memory leak error.

In [10]:
#rdd = sc.parallelize([(gps.TemporalProjectedExtent(extent=extent,instant = datetime.now(), proj4='+proj=longlat +datum=WGS84 +no_defs'), x) for x in list_tile])

In [11]:
#rdd = sc.parallelize([(gps.ProjectedExtent(extent=extent, proj4='+proj=longlat +datum=WGS84 +no_defs'), x) for x in list_tile])

In [12]:
#multiband_raster_layer = gps.RasterLayer.from_numpy_rdd(layer_type=gps.LayerType.SPATIAL, numpy_rdd=rdd)
#multiband_raster_layer

In [22]:
#tiledRaster_layer = multiband_raster_layer.tile_to_layout(layout=gps.LocalLayout())

In [13]:
# Main loop and code
t1 = datetime.now()
list_tile = []
df_lights = pd.DataFrame()
for cam, im_file in list_files_stable:
    countries_i = countries.copy()
    raster_layer = gps.geotiff.get(layer_type=gps.LayerType.SPATIAL, uri=im_file)
    tiledRaster_layer = raster_layer.tile_to_layout(layout=gps.LocalLayout())
    
    countries['mean_light'] = None
    for i in range(0,len(countries)):
        country_shape_i = countries.loc[i,'geometry']
        light_i = tiledRaster_layer.polygonal_mean(country_shape_i)
        countries.loc[i, 'mean_light'] = light_i
        #print(f"{countries.loc[i,'ADMIN']}:{light_i}")
        
    countries_i = countries_i.drop('geometry', axis = 1)
    countries_i['year'] = int(cam[-4:])
    countries_i['cam'] = cam
    df_lights = df_lights.append(countries_i)
    t2 = datetime.now()
    print(f"Done with {cam} in {t2-t1}")
t3 = datetime.now()

Done with F101992 in 0:06:27.145966


of pandas will change to not sort by default.

To accept the future behavior, pass 'sort=False'.


  sort=sort,


Done with F101993 in 0:12:49.401312
Done with F101994 in 0:19:07.701878
Done with F121994 in 0:25:25.263820
Done with F121995 in 0:31:43.764016
Done with F121996 in 0:37:59.883265
Done with F121997 in 0:44:15.901093
Done with F121998 in 0:50:34.886535
Done with F121999 in 0:56:53.154173
Done with F141997 in 1:03:09.765484
Done with F141998 in 1:09:27.610768
Done with F141999 in 1:15:44.604656
Done with F142000 in 1:22:01.475432
Done with F142001 in 1:28:15.239757
Done with F142002 in 1:34:31.898589
Done with F142003 in 1:40:54.958446
Done with F152000 in 1:47:18.700362
Done with F152001 in 1:53:38.370945
Done with F152002 in 2:00:00.861731
Done with F152003 in 2:06:22.880606
Done with F152004 in 2:12:42.384670
Done with F152005 in 2:19:08.309321
Done with F152006 in 2:25:28.357693
Done with F152007 in 2:31:46.020877
Done with F162004 in 2:38:05.793990
Done with F162005 in 2:44:24.228454
Done with F162006 in 2:50:43.503134
Done with F162007 in 2:57:03.877921
Done with F162008 in 3:03:25

In [45]:
print(f'Program finished in: {t3-t1}')

Program finished in: 3:34:59.769137


In [14]:
len(df_lights)

8670

In [21]:
df_lights_pd = pd.DataFrame(df_lights)

In [30]:
df_lights_pd['mean_light'].astype('float').describe()

count    8250.000000
mean        6.720403
std        11.564157
min         0.000000
25%         0.255506
50%         1.680038
75%         7.622372
max        62.947899
Name: mean_light, dtype: float64

In [40]:
print(f'Missing values: {str(sum(df_lights_pd.mean_light.isna()))}')

Missing values: 420


In [41]:
df_lights_pd.to_csv('lights_data.csv')

In [44]:
pickle.dump(df_lights_pd,open('lights_data.pkl','wb'))