# Parameter Setup

In [1]:
from elasticsearch_dsl import analyzer, Document, Date, Text, Integer, Keyword, Double
from elasticsearch_dsl.connections import connections
from elasticsearch.helpers import scan

from elastic_enterprise_search import AppSearch

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType, FloatType

import pandas as pd
import numpy as np
import math
import logging
import json
import os

from datetime import datetime, timezone
from random import sample 

In [2]:
# Initialize ElasticSearch
es = connections.create_connection(hosts=['localhost'])

In [3]:
# Initialize Elastic App Search
app_search = AppSearch(
    "http://localhost:3002",
    http_auth="private-6jj3ai4ckkq2xykcocosmv6o"
)

In [4]:
# Top-10 cities
locations = ["barcelona"]

# Get Document Distributions over Time

In [5]:
# Get all documents from a particular index using scan()
doc_dist = {}
for city in locations:
    index_name = "airbnb_history_" + city
    res = scan(
        es,
        index=index_name,
        query={
            "query": { "match_all" : {}},
            "fields": ["id", "crawled_date"]
        }
    )
    
    # Generate a list of documents 
    docs = []
    data = list(res)
    
    for item in data:  
        doc = {
            'id': item['_source']['id'],
            'crawled_date': item['_source']['crawled_date']
        }

        docs.append(doc)
    
    # Convert the list to a dataframe
    df = pd.DataFrame(docs)
    snapshots = df['crawled_date'].value_counts().sort_index(ascending=True)
    
    doc_dist[index_name] = snapshots

In [6]:
doc_dist

{'airbnb_history_barcelona': 20150430    12033
 20150717    14642
 20150903    14703
 20151002    14539
 20160103    14855
 20161106    17036
 20161208    17369
 20170104    17412
 20170209    17323
 20170306    17539
 20170407    17653
 20170507    17929
 20170605    18362
 20170706    18284
 20170806    19060
 20170912    18284
 20171007    18126
 20171113    18380
 20171209    18690
 20180116    18760
 20180206    18531
 20180412    19168
 20180514    18919
 20180609    17221
 20180710    17788
 20180814    19261
 20180911    19200
 20181010    18473
 20181107    18346
 20181210    18871
 20190114    18033
 20190206    17763
 20190308    17807
 20190410    17899
 20190514    18302
 20190607    18837
 20190710    19833
 20190812    20556
 20190917    20404
 20191016    20147
 20191109    20428
 20191210    20843
 20200110    20708
 20200216    20981
 20200316    21116
 20200416    20838
 20200511    20858
 20200613    20864
 20200717    20517
 20200824    20703
 20200912    20337
 20

# Simulate Search over Time

In [7]:
# Define possible query parameters
valid_availability_30 = [1, 2, 3, 5, 7, 14, 30]
valid_superhost = [['t'], ['t', 'f']]
valid_instant_bookable  = [['t'], ['t', 'f']]
valid_room_type = ['Entire home/apt', 'Private room', 'Shared room']
valid_accommodates = [1, 2, 3, 4, 5]

# Define the time period of interest
start_date = "20190101"
end_date = "20210101"

page_size = 40

# Pending query parameters
# is_business_travel_ready = ['t', 'f']
# price: require a distribution analysis
# cancellation_policy

In [8]:
def dump_to_txt(data, filepath):
    """ Dump data to txt file format to a given filepath name 
    
    :param filepath: filepath name
    """
    with open(filepath, "w") as file :
        file.write(data)

In [9]:
def get_search_events(index_name, start_ts, end_ts, recent_searches, paging_offset):
    """ Retrieve the most recent search event logs from App Search """
    print("in get_search_events ... paging_offset: " + str(paging_offset))
    
    # Replace '_' with '-' due to index naming convention in App Search
    index_name = index_name.replace("_", "-")
    
    max_event = 100
    
    # Compute the number of Log API requests (with max. 100 results events per request)
    num_rounds = math.ceil(recent_searches / max_event)
    
    for p in range(1, num_rounds + 1):

        r = app_search.get_api_logs(
            engine_name=index_name,
            from_date=start_ts,
            to_date=end_ts,
            current_page=p,
            page_size=max_event,
            sort_direction="desc")
    
        data = json.dumps(r)
        
        output_ts = str(start_ts).replace(" ", "_")
        output_file = "../log/" + index_name + "_search_log_" + output_ts + "_" + str(paging_offset + p) + ".json"
        
        dump_to_txt(data, )
        #print("../log/" + index_name + "_search_log_" + str(start_ts) + "_" + str(paging_offset + p) + ".json")
        
    return num_rounds, recent_searches

In [10]:
def simulate_searches(index_name, page_size, num_searches):
    """ Simulate searches by randomly selecting query parameters """
    
    # Replace '_' with '-' due to index naming convention in App Search
    index_name = index_name.replace("_", "-")
    
    for s in range(1, num_searches):

        num_nights = sample(valid_availability_30, 1)[0]
        
        # Handle the case when num_nights = 1
        valid_nights = [i for i in range(1, num_nights+1)]
        invalid_nights = [i for i in range(0, num_nights)]

        superhost = sample(valid_superhost, 1)[0]
        instant = sample(valid_instant_bookable, 1)[0]
        persons = sample(valid_accommodates, 1)[0]
        room = sample(valid_room_type, 1)[0]
        
        # Retrieve top-40 results from the first page (p = 1)
        for p in range(1,2):

            resp = app_search.search(
                engine_name = index_name,
                body={
                    "query": "",
                    "page": {
                        "current": p,
                        "size": page_size
                    },
                    "filters": {
                        "all": [
                            { "crawled_date": crawled_date },
                            { "host_is_superhost": superhost },
                            { "instant_bookable": instant },
                            { "room_type": room },
                            { "minimum_nights": valid_nights },
                            { "availability_30": { "from": num_nights } },
                            { "accommodates": { "from": persons } }
                        ],
                        "none": [
                            { "maximum_nights": invalid_nights }
                        ]
                    },
                    "sort": [
                        { "_score": "desc" },
                        { "overall_rating": "desc" }
                    ]
                }
            )

In [11]:
# Iterate over each city's historical index
for index in doc_dist:

    res = None
    last_paging = 0
    total_pages = 0
    
    recent_searches = 0

    start_ts = datetime.now(timezone.utc).astimezone()
    start_ts.isoformat()

    end_ts = datetime.now(timezone.utc).astimezone()
    end_ts.isoformat()
    #print("Initialize start_ts: " + str(start_ts))

    max_request = 10000
    avg_occupation_rate = 0.234

    snapshots = doc_dist[index]

    for crawled_date, num_doc in snapshots.items():

        if crawled_date >= start_date and crawled_date < end_date:

            # Compute the number of searches to be simulated for the current snapshot
            num_searches = math.ceil(avg_occupation_rate * num_doc)
            #print(crawled_date + " num_searches: " + str(num_searches))

            if max_request - recent_searches < num_searches:
                #print("about to get_search_events for events: ", recent_searches, str(max_request - recent_searches))

                end_ts = datetime.now(timezone.utc).astimezone()
                end_ts.isoformat()
                #print("set end_ts: " + str(end_ts))

                # 1) Retrieve recent search events before performing searches
                last_paging, last_retrieved_events = get_search_events(index, start_ts, end_ts, recent_searches, total_pages)
                total_pages += last_paging
                #print("total_pages( +" + str(last_paging) + "): " + str(total_pages) + " total_retrieved_events(+" + str(last_retrieved_events) + "): " + str(total_retrieved_events))

                # 2) Reset recent_searches
                recent_searches = 0
                start_ts = datetime.now(timezone.utc).astimezone()
                start_ts.isoformat()
                #print("re-set start_ts: " + str(start_ts))

                # 3) Simulate searches for the current snapshot
                simulate_searches(index, page_size, num_searches)
                recent_searches += num_searches
                #print("3: " + str(recent_searches))
                #break
            else:
                # 1) Simulate searches for the current snapshot
                simulate_searches(index, page_size, num_searches)
                recent_searches += num_searches
                #print("\t1: " + str(recent_searches))

    # Retrieve the last set of recent search events
    if recent_searches > 0:
        get_search_events(index, start_ts, end_ts, recent_searches, total_pages)
        

KeyboardInterrupt: 