In [1]:
import gc
import glob
import os
import hashlib
import requests
import warnings
import numpy as np
import pandas as pd

from json.encoder import JSONEncoder

files = glob.glob('C:/Users/liams/Pictures/capstoneGIStiff/processed' + '/*.csv')

documents = pd.DataFrame()

for path in files:
    print('reading csv: ' + os.path.basename(path))
    df = pd.read_csv(path).rename(columns={'norm_value': os.path.splitext(os.path.basename(path))[0]})
    df['long'] = df['long'].round(4)
    df['lat'] = df['lat'].round(4)
    df = df.set_index(['long', 'lat'])
    documents = pd.concat([documents, df], axis=1).fillna(0)

    # collect garbage
    del df
    gc.collect()

print('formatting points')
# format coords as point
points = [point for point in map(lambda p: list(p), documents.index.tolist())]
documents['point'] = points

print('generating ids')
# hash points as doc ids
ids = [id_ for id_ in
       map(lambda coordinate: hashlib.md5(
           str(str(coordinate[0]) + ', ' + str(coordinate[1])).encode('utf8')).hexdigest(), points)]
documents['id'] = ids

# collect garbage
del points
del ids
gc.collect()

documents = documents.set_index(['id'])
documents

reading csv: WA_red_alder_stand_density.csv
reading csv: WA_sitka_spruce_stand_density.csv
formatting points
generating ids


Unnamed: 0_level_0,WA_red_alder_stand_density,WA_sitka_spruce_stand_density,point
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
4b2b4bfc8e196ded06b5c7d53ad7f8c3,0.218944,0.000000,"[-122.6325, 49.001]"
5996cc725cbfdd201993368134a0e75f,0.187888,0.000000,"[-122.6074, 49.001]"
1e985a35ab080756c10a315d9ec54e02,0.001553,0.000000,"[-122.1345, 49.001]"
a08be0f67b987554b8d4a7a55db0ec69,0.001553,0.000000,"[-122.1342, 49.001]"
8b20752a1dd72bfe75d565f5adebb048,0.068323,0.000000,"[-123.0655, 49.0009]"
...,...,...,...
81f91257f19cd02b2ecf100890effe47,0.000000,0.189655,"[-123.7689, 45.5719]"
b5755ba8524b74e0ef0ef9bb0dd442de,0.000000,0.189655,"[-123.7686, 45.5719]"
4d70d9369abcf0aefba7565d07525f48,0.000000,0.018678,"[-123.7209, 45.5719]"
4ed00ebb93fb9b04dfb39b6bb057aea0,0.000000,0.035920,"[-123.6794, 45.5719]"


In [10]:
# if previous file exists clean up
if os.path.exists('temp_request_body'):
    os.remove('temp_request_body')

index = 'data'
in_url = 'https://a5c25b2a219ff4e22a90c7e856963769-1337752311.us-west-2.elb.amazonaws.com'
batch_size = 500
num_batches = 1

last_index = 0
next_index = batch_size

if num_batches < 0:
    print('calculating batches')
    num_batches = int(np.ceil(documents.shape[0] / batch_size))

for batch in range(1, num_batches + 1):
    print(f'indexing batch {batch}/{num_batches}', end='\r')
    request_body = open('temp_request_body', 'a')
    json_documents = [doc for doc in map(lambda series: series.to_json(), documents[last_index:next_index].iloc)]

    for id, doc in zip(documents.index, json_documents):
        request_body.write(str(JSONEncoder().encode({"index": {"_index": index, "_id": id}}) + '\n' + doc + '\n'))
    request_body.close()

    # cleanup memory
    del json_documents
    gc.collect()

    # post to server
    url = in_url + ':9200/' + index + '/_bulk'
    headers = {'Content-Type': 'application/json'}
    data = open('temp_request_body', 'rb').read()

    warnings.filterwarnings("ignore")
    response = requests.post(url=url, headers=headers, data=data, verify=False, auth=('admin', 'admin'))
    warnings.filterwarnings("default")

    if not response.ok:
        raise RuntimeError('The file was unable to upload successfully got status code: ' + response)

    # clean up request body
    os.remove('temp_request_body')

    # select next batch
    last_index = next_index
    next_index += last_index

print('Done!')

indexing batch 1/1