In [None]:
import gpudb
from arcgis import GIS
from arcgis import geometry
import json
import csv
import pytz
from datetime import datetime
from datetime import timedelta

In [None]:
dte_format = '%Y-%m-%d %H:%M:%S'

# kinetica 
kinetica_cred_file = '.env'
kinetica_host = 'admin.citizenscience.host'

# kinetica table query options
table_name = 'plastics_db'
limit = 1000000
encoding = 'json'
query_options = {
    'sort_by': 'captured_time',
    'sort_order': 'DESC'
}

# use for testing
# ex: '2020-02-02 00:00:00'
QUERY_TIMESTAMP_OVERRIDE = '2020-03-05 16:00:48'

dte_now = datetime.now(tz=pytz.timezone('GMT'))
if QUERY_TIMESTAMP_OVERRIDE:
    dte_now = datetime.strptime(QUERY_TIMESTAMP_OVERRIDE, dte_format)
    
query_timestamp_now = datetime.strftime(dte_now, dte_format)

dte_past = dte_now - timedelta(minutes=15)
query_timestamp_past = datetime.strftime(dte_past, dte_format)

# print (query_timestamp_past, query_timestamp_now)

log_file = f'logs/{query_timestamp_now.replace(":", "")}.csv'

query_options['expression'] = f'captured_time > \'{query_timestamp_past}\''

# ArcGIS Online config options
conn_profile_id = 'ago_ec2020_py'
plastics_layer_id = '08878e5ab81d4074932a1069db4ded75'
plastics_perimeter_layer_id = '5c952389060d4f199931f7c0622541bc'

### setup log file

In [None]:
# create log file
# log_filename = f'{query_timestamp_now.replace(":", "")}.csv'
with open(log_file, 'w') as csv_file:
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(['datetime', 'message'])
    
def log_message(message):
    with open(log_file, 'a+') as csv_file:
        csv_writer = csv.writer(csv_file)
        dte = datetime.now(tz=pytz.timezone('GMT'))
        log_ts = datetime.strftime(dte,dte_format)
        
        csv_writer.writerow([log_ts, message])

## Helper functions
### Parse Inventory Data for each database record

In [None]:
def parse_inventory_data(recordID, inv_data, cleanup_workflow):    
    inv_data_atts = {}
    try:
        inv_data_json = json.loads(inv_data)
    except:
        msg = f'unable to parse inventory_data values as JSON for recordID :: {recordID}'
        print(msg)
        log_message(msg)
        return inv_data_atts
    
    # if cleanup_workflow is None; user selected "wander" in the app
    if cleanup_workflow == 'sample' or cleanup_workflow is None:
        for att in inv_data_json:
            for child in att['childrens']:
                field_name = f"sample_{att['id']}_{child['id']}"
                inv_data_atts[field_name] = child['value']
    elif cleanup_workflow == 'report':
        for att in inv_data_json:
            in_name = att['type'].replace(' ', '')
            in_name = in_name[0].lower() + in_name[1:]
            field_name = f'report_{in_name}'
            inv_data_atts[field_name] = att['value']
        
    return inv_data_atts    

### Create and re-project geometry of collection point

In [None]:
def create_feature_geometry(lat, lng):
    geom = None
    try:
        geom = geometry.project([{'x': lng, 'y': lat, 'spatialReference': {'wkid': 4326}}], in_sr=4326, out_sr=3857)[0]
    except:
        msg = f'unable to project geometry of collection point for recordID :: {feature["attributes"]["recordID"]}'
        print(msg)
        log_message(msg)
        
    return geom

### Create and re-project geometry for perimeter feature

In [None]:
def create_perimeter_feature(attributes, perimeter_json):
    feature = {
        'attributes': {},
        'geometry': None
    }
    
    # copy observation attributes into perimeter attributes
    for att in attributes:
        feature['attributes'][att] = attributes[att]
    
    # project geographic coords to web mercator
    geom = None
    try:
        geom = geometry.project([perimeter_json], in_sr=4326, out_sr=3857)[0]
        feature['geometry'] = geom
    except:
        msg = f'unable to project geometry of perimeter geometry for recordID :: {feature["attributes"]["recordID"]}'
        print(msg)
        log_message(msg)
            
    return feature

### Break up an array into chunks

In [None]:
def chunk_it(in_list, chunk_size):
    chunked = [in_list[i * chunk_size:(i + 1) * chunk_size] for i in range((len(in_list) + chunk_size - 1) // chunk_size )] 
    return chunked

### Connect to Kinetica

In [None]:
user = None
pw = None
with open(kinetica_cred_file) as f:
    config = json.loads(f.read())
    user = config['user']
    pw = config['password']

In [None]:
h_db = gpudb.GPUdb(
    host=kinetica_host,
    username=user,
    password=pw
)
msg = f'connected to kinetica db @ {kinetica_host}'
print (msg)
log_message(msg)

### Query the Plastics table

In [None]:
qmsg = f'querying kinetica :: {query_options["expression"]}'
print (qmsg)
log_message(qmsg)

recs = h_db.get_records(table_name=table_name, limit=limit, encoding=encoding, options=query_options)

query_record_count = recs['total_number_of_records']
resmsg = f'({query_record_count}) records returned from query'
print (resmsg)
log_message(resmsg)

if query_record_count == 0:
    msg = 'no records returned from Kinetica. exiting ...'
    log_message(msg)
    raise SystemExit(msg)

### Connect to ArcGIS Online and setup feature layers

In [None]:
ecgis = None
try:
    ecgis = GIS(profile=conn_profile_id)
except:
    msg = f'unable to connect to ec2020 ArcGIS Online Org'
    print(msg)
    log_message(msg)
    raise SystemExit(msg)

plastics_layer = None
plastics_perimeter_layer = None
try:
    plastics_layer = ecgis.content.get(plastics_layer_id).layers[0]
    plastics_perimeter_layer = ecgis.content.get(plastics_perimeter_layer_id).layers[0]
except:
    msg = f'unable to connect to feature layer(s)'
    print(msg)
    log_message(msg)
    raise SystemExit(msg)

msg = 'successfully connected to EC2020 ArcGIS Online and connected to feature layer(s)'
print(msg)
log_message(msg)

### Loop through each record from kinetica, create and stage features to add to feature services

In [None]:
adds = []
adds_attachments = {}
perimeter_adds = []
msg = 'processing query results ...'
print (msg)
log_message(msg)

for rec in recs['records_json']:
    feature = {
        'attributes': {},
        'geometry': None
    }
    
    # load the record as a JSON object
    rec_json = json.loads(rec)
    
    # recordID
    recordID = rec_json['recordID']
    
    # individualID
    individualID = rec_json['individualID']
    
    # appInstallID
    appInstallID = rec_json['appInstallID']
    
    # captured_time
    captured_time = rec_json['captured_time']
    
    # submitted_time
    submitted_time = rec_json['submitted_time']
    
    # image_filepath
    image_filepath = rec_json['image_filepath'] if rec_json['image_filepath'] else None

    # adding attachments from URLs won't work. only from local disk
    # we'd have to download each image locally, then reference it in the attachment upload, then delete after
    # high LOE, not much payoff. leave sample code in for now
    
#     if image_filepath is not None:
#         adds_attachments[recordID] = image_filepath
           
    # action_taken
    action_taken = rec_json['action_taken']
    
    # plastics_mode
    plastics_mode = rec_json['plastics_mode']
    
    # cleanup_workflow
    cleanup_workflow = rec_json['cleanup_workflow']
    
    # inventory_data
    inv_data_atts = {}
    if not rec_json['inventory_data'] and not rec_json['inventory_data_pct']:
        msg = f'no values in either inventory_data or inventory_data_pct for recordID :: {recordID}'
        print (msg)
        log_message(msg)
    else:
        if cleanup_workflow == 'report':
            inventory_data = rec_json['inventory_data_pct']
        else:
            inventory_data = rec_json['inventory_data']
    
        inv_data_atts = parse_inventory_data(recordID, inventory_data, cleanup_workflow)
    
    # report_workflow
    report_workflow = rec_json['report_workflow']
    
    # polygon_perimeter
    polygon_perimeter = None
    try:
        polygon_perimeter = json.loads(rec_json['polygon_perimeter'])
    except:
        print (f'no perimeter found for recordID: {recordID}')
    
    # cleanup_zone_status
    cleanup_zone_status = rec_json['cleanup_zone_status']
    
    # terrain
    terrain = rec_json['terrain']
    
    # bag_count
    bag_count_json = None
    bag_count_sm = None
    bag_count_md = None
    bag_count_lg = None
    if rec_json['bag_count']:
        bag_count_json = json.loads(rec_json['bag_count'])
        bag_count_sm = int(bag_count_json['small'])
        bag_count_md = int(bag_count_json['medium'])
        bag_count_lg = int(bag_count_json['large'])
    
    # lat
    lat = None
    if rec_json['lat'] is not None:
        lat = float(rec_json['lat'])
    
    # lng
    lng = None
    if rec_json['long'] is not None:
        lng = float(rec_json['long'])
    
    # cleanup_event_id
    cleanup_event_id = int(rec_json['cleanup_event_id']) if rec_json['cleanup_event_id'] else None
    
    # cleanup_zone_id
    cleanup_zone_id = int(rec_json['cleanup_zone_id']) if rec_json['cleanup_zone_id'] else None
    
    # cleanup_grid_id
    cleanup_grid_id = int(rec_json['cleanup_grid_id']) if rec_json['cleanup_grid_id'] else None

    feature['attributes'] = {
        'recordID': recordID,
        'individualID': individualID,
        'appInstallID': appInstallID,
        'captured_time': captured_time,
        'submitted_time': submitted_time,
        'image_filepath': image_filepath,
        'action_taken': action_taken,
        'plastics_mode': plastics_mode,
        'cleanup_workflow': cleanup_workflow,
        'report_workflow': report_workflow,
        'cleanup_zone_status': cleanup_zone_status,
        'terrain': terrain,
        'bag_count_sm': bag_count_sm,
        'bag_count_md': bag_count_md,
        'bag_count_lg': bag_count_lg,
        'lat': lat,
        'lng': lng,
        'cleanup_event_id': cleanup_event_id,
        'cleanup_zone_id': cleanup_zone_id,
        'cleanup_grid_id': cleanup_grid_id
    }
    
    # combine inventory_data with base feature
    for att in inv_data_atts:
        feature['attributes'][att] = inv_data_atts[att]
    
    feature['geometry'] = create_feature_geometry(lat, lng)
    
    adds.append(feature)
    
    # create associated perimeter feature and stage
    if polygon_perimeter is not None:
        perimeter_feature = create_perimeter_feature(feature['attributes'], polygon_perimeter)
        perimeter_adds.append(perimeter_feature)

msg = f'done processing query results. ({len(adds)}) collection features to add and ({len(perimeter_adds)}) perimeter features to add.'
print (msg)
log_message(msg)

In [None]:
adds[1]

### Chunk up features into batches of 1000

In [None]:
msg = 'chunking features into chunks of 1000 ...'
print (msg)
log_message(msg)

feature_chunks = chunk_it(adds, 1000)
perim_chunks = chunk_it(perimeter_adds, 1000)

### Send updates to Plastics feature service

In [None]:
msg = 'adding features to plastics db ...'
print (msg)
log_message(msg)

chunk_length = len(feature_chunks)
for i, chunk in enumerate(feature_chunks):
    try:
        res = plastics_layer.edit_features(adds=chunk)
        added = len(res['addResults'])
        msg = f'({added}) features were added'
        print (msg)
        log_message(msg)
    except Exception as e:    
        msg = f'error adding chunk {i+1} of {chunk_length} to plastics layer :: {e}'
        print (msg)
        log_message(msg)

msg = 'done adding features to plastics layer'
print (msg)
log_message(msg)

### Send updates to Perimeter feature service

In [None]:
msg = 'adding features to plastics perimeter layer db ...'
print (msg)
log_message(msg)

chunk_length = len(perim_chunks)
for i, chunk in enumerate(perim_chunks):
    try:
        res = plastics_perimeter_layer.edit_features(adds=chunk)
        added = len(res['addResults'])
        msg = f'({added}) features were added'
        print (msg)
        log_message(msg)
    except Exception as e:    
        msg = f'error adding chunk {i+1} of {chunk_length} to plastics perimeter layer :: {e}'
        print (msg)
        log_message(msg)

msg = 'done adding features to plastics perimeter layer'
print (msg)
log_message(msg)

In [None]:
msg = 'script completed'
print (msg)
log_message(msg)