Helper notebook to load a panda dataframe by chunks into elasticsearch

https://www.analyticsvidhya.com/blog/2017/05/beginners-guide-to-data-exploration-using-elastic-search-and-kibana/

In [8]:
####### Libraries & settings
import os
#import sys
from contextlib import contextmanager
from time import time
#import datetime
import pandas as pd

from elasticsearch import Elasticsearch #needs to be version 6 max : https://stackoverflow.com/questions/48139986/elastichttperror-406-elastic-search-error-while-indexing-data
from elasticsearch.helpers import bulk #https://elasticsearch-py.readthedocs.io/en/master/helpers.html


######## functions

@contextmanager
def timer(name):
    t0 = time()
    print('\nstarting :', name)
    yield
    if time() - t0 < 120:
      print('done in', round(time() - t0, 1),  's')
    else:
      print('done in ', round((time() - t0)/60, 1), 'm')
 

def gendata(index_name, lst):
    "helper function for index_data()"
    for entry in lst:
        yield {
            "_index": index_name,
            "doc": entry
            } 

def index_data(data_path, chunksize, index_name):
    "sends data to elasticsearch"
    f = open(data_path)
    csvfile = pd.read_csv(f, iterator=True, chunksize=chunksize) 
    es = Elasticsearch() 
    try :
        es.indices.delete(index_name)
    except :
        pass
    es.indices.create(index=index_name, ignore=400)
    for i,df in enumerate(csvfile): 
        records=df.where(pd.notnull(df), None).T.to_dict()
        list_records=[records[it] for it in records]
        try :
            bulk(es, gendata(index_name, list_records))
            print(f'sending chunk {i} of {index_name}')
            #es.helpers.bulk(index_name, doc_type, list_records)
        except :
            print("error!, skiping chunk!")
            pass    
        
        
######### variables & load
now = os.getcwd()
subdirs = {'data' : 'data'}

files = {'train' : 'train_ctrUa4K.csv',
         'test': 'test_lAUu6dG.csv',
         'sample_sub' : 'sample_submission_49d68Cx.csv'
         }

train_data_path = os.path.join(now, subdirs['data'], files['train'])
test_data_path = os.path.join(now, subdirs['data'], files['test'])

CHUNKSIZE=100
index_name_train = 'loan_prediction_train'
index_name_test = "loan_prediction_test"



starting : hello there
done in 0.0 s
(614, 13)
(367, 12)


In [9]:
index_data(train_data_path, 100, index_name_train) # Indexing train data
index_data(test_data_path, 100, index_name_test) # Indexing train data

sending chunk! 0 of loan_prediction_train
sending chunk! 1 of loan_prediction_train
sending chunk! 2 of loan_prediction_train
sending chunk! 3 of loan_prediction_train
sending chunk! 4 of loan_prediction_train
sending chunk! 5 of loan_prediction_train
sending chunk! 6 of loan_prediction_train
sending chunk! 0 of loan_prediction_test
sending chunk! 1 of loan_prediction_test
sending chunk! 2 of loan_prediction_test
sending chunk! 3 of loan_prediction_test


In [None]:
#end - no need to run

#premiminary work
with timer('hello there'): 
    train = pd.read_csv(train_data_path)
    test = pd.read_csv(test_data_path)
    
print(train.shape)
print(test.shape)

#for step by step debug in the function
f = open(train_data_path)
chunksize = 100
csvfile = pd.read_csv(f, iterator=True, chunksize=chunksize) #type : pandas.io.parsers.TextFileReader

for i,df in enumerate(csvfile): 
    records=df.where(pd.notnull(df), None).T.to_dict()
    list_records=[records[it] for it in records]