In [1]:
from elasticsearch import Elasticsearch, helpers
import json, time
from dateutil import parser

#### Elasticsearch Connection

In [2]:
#connect to elasticsearch
es_connection = Elasticsearch("http://localhost:9200") 

#### Mapping

In [None]:
mappings = {
  "mappings":{
        "properties":{
            "id_str":{
                "type": "keyword"
            }, 
            'hashtags':{
                "type": "keyword"
            },
            "text": {
                "type": "text"
            },
            "created_at": {
                "type": "date",
                "format": "strict_date_optional_time"
            }, 
            "coordinates":{
                "type": "geo_shape"
            }, 
        }
    }
}

##### Make sure from the connection

In [None]:
#remove index if exists
es_connection.indices.delete(index = "tweets_with_mapping2") 
#create a new index
es_connection.indices.create(index = "tweets_with_mapping2", ignore = 400, body = mappings)

##### Using bulk insertion to insert the documents to the index 

In [3]:
file_name = "./boulder_flood_geolocated_tweets.json" 
def gen_data():
    counter = 0
    for line in open(file_name, 'r', encoding="utf8"):
        #get lines from the json file where each line represents a document
        doc_as_line = json.loads(line)
        #convert the date to an acceptance format to elasticsearch
        date_time = parser.parse(doc_as_line['created_at']) 
        doc_as_line['created_at'] = str(str(date_time.date()) + "T" + str(date_time.time()))
        hashtags = []
        for hashtag in doc_as_line["entities"]['hashtags']:
            hashtags.append(hashtag["text"])
        yield {  
            "_index": "streamed_tweets",
            "id_str": doc_as_line["id_str"],
            "text": doc_as_line["text"],
            "created_at": doc_as_line["created_at"],
            "coordinates": doc_as_line["coordinates"],
            "hashtags": hashtags
        }
        counter = counter + 1
        if counter % 2000 == 0: 
            time.sleep(10)
        

In [None]:
helpers.bulk(es_connection, gen_data(), chunk_size = 2000) 