# 1. \[40pts\] Implement the various functions for common Elasticsearch operations

In [1]:
!pip install pyyaml elasticsearch elasticsearch_dsl



In [2]:
# load packages
from src.elasticsearch_client import ElasticsearchClient
from data.index_mapping import mapping
import json

INDEX_NAME = "movie_reviews_homework8"

In [3]:
# Connect to Elasticsearch
es_client = ElasticsearchClient()
print(es_client.client.info())

{'name': 'es01', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'wHtB8J4jTvyxGs6FJBICDQ', 'version': {'number': '8.10.4', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': 'b4a62ac808e886ff032700c391f45f1408b2538c', 'build_date': '2023-10-11T22:04:35.506990650Z', 'build_snapshot': False, 'lucene_version': '9.7.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


In [4]:
# Delete index if exists before creating a new one
try:
    ###### EDIT HERE ######
    es_client.delete_index(index_name=INDEX_NAME)
    ###### EDIT HERE ######
except Exception as e:
    print(f"Error {str(e)}")

Index deleted


In [5]:
# create index based on mapping
###### EDIT HERE ######
mapping = {
    "mappings": {
        "properties": {
            "movieId": {
                "type": "integer" # Fill in the blank
            },
            "title": {
                "type": "text"
            },
            "genres": {
                "type": "text"
            },
            "imdbId": {
                "type": "integer"
            },
            "tmdbId": {
                "type": "integer"
            },
            "userId": {
                "type": "integer"
            },
            "rating": {
                "type": "float"
            },
            "timestamp": {
                "type": "date"
            }
        }
    }
}
es_client.create_index(index_name=INDEX_NAME, mapping=mapping)
###### EDIT HERE ######

In [6]:
# load data
with open("./data/movie_data_big.json", "r") as f:
    movie_data = json.load(f)

In [7]:
# insert only one document with doc_id == 0
doc_id = 0

###### EDIT HERE ######
for idx, document in enumerate(movie_data):
    if idx == doc_id:
        es_client.insert_one_document(index_name=INDEX_NAME, body=document, doc_id=doc_id)
        break
###### EDIT HERE ######

In [8]:
# delete document with doc_id == 0
doc_id = 0

###### EDIT HERE ######
es_client.delete_document(index_name=INDEX_NAME, doc_id=doc_id)
###### EDIT HERE ######

In [9]:
# insert all documents using bulk indexing
actions = []
for id_doc, doc in enumerate(movie_data):
    
###### EDIT HERE ######
    action = {
        "_op_type": "index",
        "_index": INDEX_NAME,
        "_id": id_doc,
        "_source": doc
    }
    actions.append(action)

es_client.bulk_request(actions=actions)
###### EDIT HERE ######

print(len(actions)) # should be 100789

100789


In [10]:
# get document with doc_id == 85453
doc_id = 85453

###### EDIT HERE ######
try:
    doc = es_client.get_document(index_name=INDEX_NAME, doc_id=doc_id)
except Exception as e:
    print(f"Error: {str(e)}")
    
###### EDIT HERE ######

print(f"document with id 85453: {doc}")

document with id 85453: {'movieId': 54259, 'title': 'Stardust (2007)', 'genres': 'Adventure|Comedy|Fantasy|Romance', 'imdbId': 486655, 'tmdbId': 2270.0, 'userId': 414, 'rating': 3.5, 'timestamp': 1203130241000}


In [11]:
# get count of all documents in the index
count = 0

###### EDIT HERE ######
count = es_client.count(index_name=INDEX_NAME)
###### EDIT HERE ######

print(count) # should be 100789

100789


In [12]:
# search for documents with title containing "star wars"
query = {
    "query": {
        "match_phrase": {
            "title": "star wars"
        }
    }
}
results = es_client.search(index_name=INDEX_NAME, query=query)
###### EDIT HERE ######
for res in results: # should be 10 documents
    print(f"ID: {res['_id']}, Title: {res['_source']['title']}, userID: {res['_source']['userId']}")

ID: 87925, Title: Star Wars: The Clone Wars (2008), userID: 21
ID: 87926, Title: Star Wars: The Clone Wars (2008), userID: 220
ID: 87927, Title: Star Wars: The Clone Wars (2008), userID: 298
ID: 87928, Title: Star Wars: The Clone Wars (2008), userID: 380
ID: 87929, Title: Star Wars: The Clone Wars (2008), userID: 414
ID: 87930, Title: Star Wars: The Clone Wars (2008), userID: 489
ID: 87931, Title: Star Wars: The Clone Wars (2008), userID: 534
ID: 98955, Title: The Star Wars Holiday Special (1978), userID: 514
ID: 100626, Title: Star Wars: The Last Jedi (2017), userID: 62
ID: 100627, Title: Star Wars: The Last Jedi (2017), userID: 98


# 2. \[10pts\] Compare the difference in execution time between basic and helper operations and report the result and your findings.

In [13]:
import time
from tqdm import tqdm

In [14]:
def delete_and_create_index(es_client, index_name, mapping):
    try:
        es_client.delete_index(index_name=index_name)
        es_client.create_index(index_name=index_name, mapping=mapping)
    except Exception as e:
        print(f"Error {str(e)}")

In [15]:
delete_and_create_index(es_client, INDEX_NAME, mapping)

Index deleted


In [16]:
# insert all documents not using bulk
start = time.time()
for id_doc, doc in tqdm(enumerate(movie_data)):
    es_client.insert_one_document(index_name=INDEX_NAME, body=doc, doc_id=id_doc)
end = time.time()
print(f"Inserting all documents without bulk took {end - start} seconds")

100789it [23:38, 71.07it/s]

Inserting all documents without bulk took 1418.2433474063873 seconds





In [17]:
# insert all documents using bulk
delete_and_create_index(es_client, INDEX_NAME, mapping)

start = time.time()
actions = []
for id_doc, doc in enumerate(movie_data):
###### EDIT HERE ######
    action = {
        "_op_type": "index",
        "_index": INDEX_NAME,
        "_id": id_doc,
        "_source": doc
    }
    actions.append(action)

es_client.bulk_request(actions=actions)
###### EDIT HERE ######

end = time.time()
print(f"Inserting all documents with bulk took {end - start} seconds")

Index deleted
Inserting all documents with bulk took 8.524917602539062 seconds


In [18]:
from elasticsearch import helpers

In [19]:
query1 = {
    "query": {
        "match_all": {}
    },
    "size": 1
}

query2 = {
    "query": {
        "bool": {
            "must": [
                {"match": {"genres": "Action"}},
                {"match": {"genres": "Crime"}}
            ]
        }
    },
    "size": 1
}

In [20]:
page_size=[10, 1000]

# retrieve all documents with scan
for query in [query1, query2]:
    for size in page_size:
        scroll = es_client.scan_index(query=query, index_name=INDEX_NAME, size=size)

        start = time.time()
        count = 0
        for hit in scroll:
            count += 1
        print(count)
        end = time.time()
        print(f"Retrieving all documents with {size} took {end - start} seconds")

87571
Retrieving all documents with 10 took 17.140448570251465 seconds
100789
Retrieving all documents with 1000 took 0.8979315757751465 seconds
6781
Retrieving all documents with 10 took 1.2961006164550781 seconds
6781
Retrieving all documents with 1000 took 0.12001395225524902 seconds


In [21]:
# retrieve all documents with search and scroll

for query in [query1, query2]:
    start = time.time()
    count = 0
    
    response = es_client.client.search(index=INDEX_NAME, body=query)
    scroll_response = es_client.client.search(index=INDEX_NAME, body=query, scroll='1m')
    
    scroll_id = scroll_response['_scroll_id']
    for hit in scroll_response['hits']['hits']:
        count += 1

    while True:
        scroll_response = es_client.client.scroll(scroll_id=scroll_id, scroll='1m')
        if len(scroll_response['hits']['hits']) == 0:
            break
        for hit in scroll_response['hits']['hits']:
            count += 1
    print(count)

    end = time.time()
    print(f"Retrieving all documents with search and scroll took {end - start} seconds")

100789
Retrieving all documents with search and scroll took 246.6975953578949 seconds
6781
Retrieving all documents with search and scroll took 16.07894802093506 seconds
