In [None]:
from elasticsearch import Elasticsearch
from random import randrange
import zipfile
import json

CLOUD_ID = ""
USER = "elastic"
PASSWORD = ""
INDEX = "tweets"

MAPPING = {
    "mappings": {
        "properties": {
            "country": {
                "type": "keyword"
            },
            "location": {
                "type": "geo_point",
                "ignore_malformed": "true"
            },
            "sourceapp": {
                "type": "text"
            },
            "user": {
                "type": "text"
            },
            "hashtags": {
                "type": "keyword"
            },
            "created_at": {
                "type": "text"
            },
            "text": {
                "type": "text"
            },
            "@timestamp": {
                "format": "dateOptionalTime",
                "type": "date"
            }
        }
    }
}

es = Elasticsearch(
    cloud_id=CLOUD_ID,
    http_auth=(USER, PASSWORD),
    http_compress=True
)

es.indices.delete(INDEX, ignore_unavailable=True)

es.indices.create(INDEX, body=MAPPING)

data_path = "../data/tweets.ndjson.zip"
bulk_data = []
ix = 0
total = 0

with zipfile.ZipFile(data_path) as zipped_data:
    for f in zipped_data.namelist():
        with zipped_data.open(f) as ndjson:
            for line in ndjson.readlines():
                data = json.loads(line.decode("utf-8"))
                bulk_data.append({"index": {"_index": INDEX, "_id": data["id"]}})
                bulk_data.append(data)
                ix = ix + 1
                total = total + 1
                
                if ix == 1000:             
                    es.bulk(bulk_data)
                    ix = 0
                    bulk_data = []
                    
                    if randrange(0, 5000, 1) % 5 == 0:
                        try:
                            indexed = es.count(index=INDEX)["count"]
                            print("Indexado hasta el momento: {indexed}".format(indexed=indexed))
                        except:
                            pass
    if ix > 0:
        es.bulk(bulk_data)
        ix = 0
        bulk_data = []

print("Documentos indexados: {total}".format(total=total))
