# Querying Elasticsearch

Delvin So

This notebook queries our elasticsearch instance for high-engagement tweets related to vaccines and masking.

In [20]:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q

import pandas as pd
import time
from pprint import pprint
import os 

def make_if_not_exists(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)
        
        
def add_doc_md(hit) -> dict:  
    """
    extracts tweet id and es score, adding it to the dictionary (_source
    """
    d = hit.to_dict()
    d['status_id'] = hit.meta.id
    d['es_score'] = hit.meta.score
    
    if 'date' in hit: 
        del hit['date']
    
    return d


from itertools import islice
import csv

# https://stackoverflow.com/questions/63298676/processing-csv-iteratively-3-rows-at-a-time-in-python       
def chunks(iterable, n) -> tuple:
    it = iter(iterable)
    while True:
       chunk = tuple(islice(it, n))
       if not chunk:
           return
       yield chunk

    

In [3]:
    
client = Elasticsearch(timeout=60, max_retries=10, retry_on_timeout=True)

In [4]:
# # https://github.com/elastic/elasticsearch-dsl-py/issues/817#issuecomment-372271460
# from multiprocessing import Pool

# SLICES = 4                                                       

# def dump_slice(slice_no):                                                       
#     s = Search(using=client, index="twitter").query('match', **{'text': 'mask'})#.filter('term', **{'verified':True})                                                               
#     s = s.extra(slice={"id": slice_no, "max": SLICES})                          
#     #for d in s.scan():                                                          
#     #   print(d.meta.id)
#     df = pd.DataFrame([add_doc_id(hit) for hit in s.scan()])
#     return df
# pool = Pool(SLICES)                                                             

## Setting up the query

In [4]:
#s = Search(using=client, index = 'twitter').query('query_string', **{"default_field" : "text", "query" : "*vaccin* OR *vax*"})
# s = Search(using=client, index = 'twitter').query('query_string', **{"default_field" : "text", "query" : "*vaccin* OR *vax*"})
# s = Search(using=client, index = 'twitter').query('wildcard', **{"text" :  {"value" : "*vaccin*"}}) # 36 minutes
# s = Search(using=client, index = 'twitter').query('wildcard', **{"text" :  {"value" : "*vax*"}}) # few minutes
# s = Search(using=client, index = 'twitter').query('wildcard', **{"text" :  {"value" : "*mask*"}}) # 23 minutes


q = Q('bool',
    must=[Q("query_string",  **{"default_field" : "text", "query" : "*vaccin* OR *vax*"})],
    should=[Q("range", **{ "followers_count": { "gte" : 5000 }} ), 
            Q("range", **{ "favorite_count": { "gte" : 5000 }} ),
            Q("range", **{ "retweet_count": { "gte" : 5000 }}),
#             Q("range", **{ "quote_count": { "gte" : 500 }}),
#             Q("range", **{ "reply_count": { "gte" : 500 }} ),
#             Q("term", **{ "verified": True }) 
           ],
    minimum_should_match=1,
)


s = Search(using = client, index = 'twitter').query(q)

# sanity check
s.to_dict()

{'query': {'bool': {'must': [{'query_string': {'default_field': 'text',
      'query': '*vaccin* OR *vax*'}}],
   'should': [{'range': {'followers_count': {'gte': 5000}}},
    {'range': {'favorite_count': {'gte': 5000}}},
    {'range': {'retweet_count': {'gte': 5000}}}],
   'minimum_should_match': 1}}}

In [16]:
import requests
import json 

response = requests.post('http://localhost:9200/twitter/_count', 
                         headers={ 'Content-Type': 'application/json',},
                         data=json.dumps(s.to_dict()))

response.json()

{'count': 3764161,
 '_shards': {'total': 5, 'successful': 5, 'skipped': 0, 'failed': 0}}

In [None]:
# for hit in s[:10]:
#     print(add_doc_md(hit))
#     break

The following code takes an iterable (`s.scan()`)and converts it into 'chunks', which are individually processed and saved down into a csv. This is far more efficient in contrast to iterating through the entire iterable, storing it in memory and then saving it down to a csv. 

In [22]:
make_if_not_exists('data_out')

start_time = time.time() 
chunk_size = 100000
for i, chunk in enumerate(chunks(s.scan(), chunk_size)):
    if (i % 10) == 0:
        print(i)
    with open(os.path.join('data_out', f"wildcard_{str(i).zfill(2)}.csv"), "w") as f:
        header_present  = False
        for hit in chunk:
            my_dict = add_doc_md(hit) 
            if not header_present:
                w = csv.DictWriter(f, my_dict.keys())
                w.writeheader()
                header_present = True
            w.writerow(my_dict)
end_time = time.time()
print('Done! Took {} minutes'.format((end_time - start_time)/60))

0
10
20
30
Done! Took 11.916052277882894 minutes
