In [29]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.client import IngestClient
import os
import json

In [30]:
client = Elasticsearch()

index_name = 'moredata'

In [31]:
def make_documents(data, index_name):
    for event in data:
        doc = {
                '_index': index_name,
                '_type': 'document',
                '_id': event['_id'],
                'pipeline': 'testpipeline',
                '_source': { 
                    '@timestamp': event['source']['date'], 
                    'tag': event['source']['tags'],
                    'context': event['source']['context']
                }
        }
        yield(doc) 

In [32]:

ic = IngestClient(client)
pipeline_id = 'testpipeline'
pipeline_body = {
              "description": "hourly date-time index naming",
              "processors" : [
                {
                  "date_index_name" : {
                    "field" : "@timestamp",
                    "index_name_prefix" : "{{ _index }}-",
                    "date_rounding" : "w",
                  }
                }
              ]
}
ic.put_pipeline(id=pipeline_id, body=pipeline_body)



{'acknowledged': True}

In [33]:
ic.get_pipeline(id='testpipeline')



{'testpipeline': {'description': 'hourly date-time index naming',
  'processors': [{'date_index_name': {'date_rounding': 'w',
     'field': '@timestamp',
     'index_name_prefix': '{{ _index }}-'}}]}}

In [34]:
destdir = './testdata'
files = [ f for f in os.listdir(destdir) if os.path.isfile(os.path.join(destdir,f)) ]
result = ''
for filename in files:
    with open(os.path.join(destdir,filename)) as f:
        data = json.load(f)
        result = bulk(client, make_documents(data, index_name))

print(result)    


(17832, [])


In [46]:
client.indices.exists('morsdsdedata')

False