In [None]:
from elasticsearch import Elasticsearch
import tweepy
import os
import json
from elasticsearch import helpers

In [None]:
#connecting to the server
es = Elasticsearch(HOST = "localhost", PORT = 9200)

In [None]:
#check connection
if not es.ping():
    raise ValueError("elasticsearch connection failed")
else:
    print("elasticsearch connection successful")

In [None]:
mapping = {
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "index.mapping.total_fields.limit": 2000,
    "analysis": {
      "analyzer": {
        "nlp_analyzer": {
          "type": "custom",
          "tokenizer": "tweeter_tokenizer",
          "filter": ["lowercase"]
        }
      },
      "tokenizer": {
        "tweeter_tokenizer": {
          "type": "pattern",
          "pattern": "(\\w+|\\S*[\\S*])",
          "group": 1
        }
      }
    }
  },  
    "mappings": {
      "properties": {
        "sentiment_score": {
          "type": "float"
        },
        "sentiment" : {
          "type": "keyword"
        },
        "created_at": {
          "type":   "date",
          "format": "EEE MMM dd HH:mm:ss Z yyyy"
        },
        "retweeted_status.created_at": {
          "type":   "date",
          "format": "EEE MMM dd HH:mm:ss Z yyyy"
        },
      
      "user.created_at": {
          "type":   "date",
          "format": "EEE MMM dd HH:mm:ss Z yyyy"
      },
      "retweeted_status.user.created_at": {
          "type":   "date",
          "format": "EEE MMM dd HH:mm:ss Z yyyy"
      },
      "coordinates.coordinates": {
          "type": "geo_point"
        },
      "place.bounding_box": {
          "type": "geo_shape",
          "coerce": True,
          "ignore_malformed": True
        } 
      }
  }
}

In [None]:
def createIndex(name,mappings):
    if not es.indices.exists(index = name):
        res = es.indices.create(index = name ,body = mappings ,ignore = 400)
        print(res)
        if 'acknowledged' not in res.keys() or res['acknowledged'] != True or res['index'] != name: 
            raise ValueError("index creating failed")
        else:
            print(name,":index created successfully")
    else:
        print("index exists")

In [None]:
#create the index (database name)
#EDIT NAME HERE
index_name = "twitter-corona-test-amogh"
createIndex(index_name,mapping)

In [None]:
#tweepy stuff
CONSUMER_KEY = ""
CONSUMER_SECRET = ""
OAUTH_TOKEN = ""
OAUTH_TOKEN_SECRET =  ""

In [None]:
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
api = tweepy.API(auth)

In [None]:
#getting the data
files = os.listdir('data/')
print(files)
for f in files:
    ids = open("data/"+f)
    ids = ids.readlines()

In [None]:
ids_list = ids[1:]

In [None]:
print(len(ids))

In [None]:
##MAKE CHANGES HERE - GIVE HELPER.BULK A LIST OF TWEET JSONS AT A TIME - 
max_count = 100
total_count = 0
count = 0
id_list = []
sent_list = []
tweets = []
for id in ids_list:
    id = id.split(",")
    sentiment = float(id[1])
    id = int(id[0])
#     print(id,sentiment)
    id_list.append(id)
    sent_list.append(sentiment)
    count = count + 1
    total_count = total_count + 1
    if count == max_count:
        tweets = api.statuses_lookup(id_list)
        tweets = [i._json for i in tweets]
        for i in range(len(tweets)):
            tweets[i]["sentiment_score"] = id_list[i]
            tweets[i]["_index"] = "twitter-corona-test-amogh"
        res = helpers.bulk(es, tweets)
        print(res)
        print(total_count,"done")
        count = 0
        tweets = []
        id_list = []
        sent_list = []
tweets = api.statuses_lookup(id_list)
tweets = [i._json for i in tweets]
for i in range(len(tweets)):
    tweets[i]["sentiment_score"] = id_list[i]
    tweets[i]["_index"] = "twitter-corona-test-amogh"
helpers.bulk(es, tweets)
print("done")

In [None]:
#please dont use this recklessly
#es.indices.delete(index='twitter-corona-test-amogh', ignore=[400, 404])