# ElasticSearch (Userway)

https://dom.tutu.ru/pages/viewpage.action?pageId=89036396 - примеры запросов<br>
https://dom.tutu.ru/pages/viewpage.action?pageId=73920579 - пример на агрегацию по разнице во времени<br>
https://kibana-analytics-v2.infra.tutu.ru - Кибана Userway<br>
https://kibana.logs.elc.tutu.ru/ - Кибана Эластик логов

## Импорты

In [1]:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q, A, __version__
from elasticsearch_dsl.connections import connections
import time
import pandas as pd
import datetime

#client_uw = Elasticsearch(hosts=["http://analytics-3-6.elc.tutu.ru:9200"], timeout=600)
client_uw = Elasticsearch(hosts=["http://analytics-query.elc.tutu.ru:9200"], timeout=600)

dt_str = ['2018-06-20 00:00:00', '2018-06-21 00:00:00']

def datetime_to_utc (strtime, format='%Y-%m-%d %H:%M:%S'):
    return int(time.mktime(time.strptime(strtime, format)) * 1000)

dt = [datetime_to_utc(i) for i in dt_str]

def from_timestamp(t):
    t = int(t/1000)
    return datetime.datetime.fromtimestamp(t)#.date()

In [2]:
dt

[1529442000000, 1529528400000]

## Пример запроса с использованием query_string

In [4]:
Q('query_string', query=
                                     'title:%s AND time:[%s TO %s] AND train_departure_station_code: (%s OR %s)' % (
                                         'train_schedule_with_date', dt[0], dt[1], '2000000', '2004000'))

QueryString(query='title:train_schedule_with_date AND time:[1529442000000 TO 1529528400000] AND train_departure_station_code: (2000000 OR 2004000)')

In [3]:
s_e = Search(using=client_uw).index('userway_alias').doc_type('event')

In [8]:
schedule_msk_spb_query = s_e.query(Q('query_string', query=
                                     'title:%s AND time:[%s TO %s] AND train_departure_station_code: (%s OR %s)' % (
                                         'train_schedule_with_date', dt[0], dt[1], '2000000', '2004000')))

schedule_msk_spb = schedule_msk_spb_query.execute()

In [11]:
schedule_msk_spb_query.execute()

<Response: [<Hit(userway_alpha/event/AWQaOuLH24KhZwUVgVnS): {'mtime': 1529448424136, 'type': 'page', 'title': 'train_sch...}>, <Hit(userway_alpha/event/AWQaq3n124KhZwUVhuRk): {'mtime': 1529455802871, 'type': 'page', 'title': 'train_sch...}>, <Hit(userway_alpha/event/AWQbm1TS24KhZwUVpaGf): {'mtime': 1529471522000, 'type': 'page', 'title': 'train_sch...}>, <Hit(userway_alpha/event/AWQb9wjQ24KhZwUVxP67): {'mtime': 1529477531856, 'type': 'page', 'title': 'train_sch...}>, <Hit(userway_alpha/event/AWQcVFQVbBWT9FmHmMnt): {'mtime': 1529483645975, 'type': 'page', 'title': 'train_sch...}>, <Hit(userway_alpha/event/AWQcVFUabBWT9FmHmMtE): {'mtime': 1529483646236, 'type': 'page', 'title': 'train_sch...}>, <Hit(userway_alpha/event/AWQcVHLebBWT9FmHmQiP): {'mtime': 1529483653856, 'type': 'page', 'title': 'train_sch...}>, <Hit(userway_alpha/event/AWQcUoEebBWT9FmHl5Wk): {'mtime': 1529483526432, 'type': 'page', 'title': 'train_sch...}>, <Hit(userway_alpha/event/AWQcUr9MbBWT9FmHl-8E): {'mtime': 1529483542

In [15]:
schedule_msk_spb.to_dict()#['hits']['total']

{'took': 255,
 'timed_out': False,
 '_shards': {'total': 144, 'successful': 144, 'failed': 0},
 'hits': {'total': 297422,
  'max_score': 4.0956903,
  'hits': [{'_index': 'userway_alpha',
    '_type': 'event',
    '_id': 'AWQaOuLH24KhZwUVgVnS',
    '_score': 4.0956903,
    '_routing': '40200618-8631-434f-b161-fddf90ead226',
    '_parent': '40200618-8631-434f-b161-fddf90ead226',
    '_source': {'mtime': 1529448424136,
     'type': 'page',
     'title': 'train_schedule_with_date',
     'time': 1529448376000,
     'ip': '185.89.101.48',
     'page_id': '5rMpPc8zMhU',
     'route_type': 'oneway',
     'search_date_timestamp': 1529528400000,
     'site_version': 'full',
     'train_arrival_station_code': 2024750,
     'train_departure_station_code': 2004000,
     'trains_available_count': 1,
     'trains_count': 1,
     'trains_elreg_count': 0,
     'zone': 'main'}},
   {'_index': 'userway_alpha',
    '_type': 'event',
    '_id': 'AWQaq3n124KhZwUVhuRk',
    '_score': 4.0956903,
    '_routing

In [13]:
schedule_msk_spb.hits.total

297422

## Пример запроса с использованием term

In [16]:
s_g = Search(using=client_uw).index('userway_alias').doc_type('goal')

In [17]:
time_query = Q('range', time={'gt':dt_str[0], 'lte':dt_str[1], 
                                                'format':'yyyy-MM-dd HH:mm:ss', 'time_zone':'+03:00'})

In [18]:
dt_str[0]

'2018-06-20 00:00:00'

In [19]:
cheap_orders_query = s_g.query(Q('term', type='zhd_order')&
                               time_query&
                               Q('terms', count=[2, 3, 4]))

## Scan

In [20]:
cheap_orders = pd.DataFrame(columns=['object_id', 'session_id'])

i = 0
for res in cheap_orders_query.scan():
    cheap_orders.loc[i] =[res.object, res.meta.parent]
    i += 1

In [21]:
cheap_orders.head()

Unnamed: 0,object_id,session_id
0,18373587,2c200618-9515-47ff-a828-3cfd86b4dca4
1,18375992,ad180618-e7c2-4d24-9219-54f78724ecb4
2,18368199,9d200618-472a-4bac-bb7f-de986de860d3
3,18370462,6f230816-dcbd-461f-866a-4aa05ac78ac7
4,18372739,5b211216-d570-45e7-9c71-d17783982744


In [13]:
with open('cheap_orders.csv','w') as file:
    for res in cheap_orders_query.scan():
        res_d = res.to_dict()
        print('\t'.join(map(str,[res.object, res.meta.parent, res_d.get('profit', 'None')])),file=file)

In [14]:
pd.read_csv('cheap_orders.csv', sep='\t', names=['object_id', 'session_id']).head()

Unnamed: 0,object_id,session_id
0,18373587,2c200618-9515-47ff-a828-3cfd86b4dca4
1,18375992,ad180618-e7c2-4d24-9219-54f78724ecb4
2,18368199,9d200618-472a-4bac-bb7f-de986de860d3
3,18370462,6f230816-dcbd-461f-866a-4aa05ac78ac7
4,18372739,5b211216-d570-45e7-9c71-d17783982744


## Запросы parent-child

In [15]:
schedule_msk_spb_query = s_e.query(Q('prefix', title='train_schedule')&
                                   Q('range', time={'gte':dt_str[0], 'lte':dt_str[1], 
                                                    'format':'yyyy-MM-dd HH:mm:ss', 'time_zone':'+03:00'})&
                                   Q('terms', train_departure_station_code=[2000000, 2004000])&
                                   Q('has_parent', query=
                                     Q('has_child', query=
                                       Q('term', device_type='smartphone'), 
                                       type='client'), 
                                     type='session'))

schedule_msk_spb = schedule_msk_spb_query.execute()

In [None]:
ss.query(~Q('has_child', query=
          Q('term', title='tours_jarvel'),
          type='event')|
         Q('has_child', query=
          Q('term', title='tours_hotel'),
          type='event'))

In [16]:
schedule_msk_spb.hits.total

165023

In [None]:
s_e.query(Q('term', title='tours_search')&
                                   Q('has_parent', query=
                                     Q('has_child', query=
                                       Q('term', title='tours_order'), 
                                       type='event'), 
                                     type='session'))

In [17]:
# Выгрузить session id по запросу выше

## Агрегации

In [22]:
dt_str_1 = ['2018-06-20 00:00:00', '2018-06-21 23:59:59']

#### Агрегация по времени

In [24]:
schedule_smartphone_query = s_e.query(Q('term', title='train_schedule_with_date')&
                                      Q('range', time={'gte':dt_str_1[0], 'lte':dt_str_1[1], 
                                                       'format':'yyyy-MM-dd HH:mm:ss', 'time_zone':'+03:00'})&
                                      Q('has_parent', query=
                                        Q('has_child', query=
                                          Q('term', device_type='smartphone'), 
                                          type='client'), 
                                        type='session'))

In [25]:
schedule_smartphone_query.aggs.bucket('per_day', 'date_histogram', field='time', interval='day', time_zone = 'Europe/Moscow')\
               .metric('visitors','cardinality', field = '_parent')

DateHistogram(aggs={'visitors': Cardinality(field='_parent')}, field='time', interval='day', time_zone='Europe/Moscow')

In [26]:
schedule_smartphone = schedule_smartphone_query.execute()

In [27]:
schedule_smartphone.to_dict()

{'took': 15935,
 'timed_out': False,
 '_shards': {'total': 144, 'successful': 144, 'failed': 0},
 'hits': {'total': 869182,
  'max_score': 4.3910675,
  'hits': [{'_index': 'userway_alpha',
    '_type': 'event',
    '_id': 'AWQZ9F_O24KhZwUVekQo',
    '_score': 4.3910675,
    '_routing': '36311017-2542-4435-b01f-28bf6fc0c0bf',
    '_parent': '36311017-2542-4435-b01f-28bf6fc0c0bf',
    '_source': {'mtime': 1529443803087,
     'type': 'page',
     'title': 'train_schedule_with_date',
     'time': 1529443709000,
     'ip': '37.168.26.178',
     'page_id': '5rMls-WGrBk',
     'route_type': 'oneway',
     'search_date_timestamp': 1533330000000,
     'site_version': 'mobile',
     'train_arrival_station_code': 8700400,
     'train_departure_station_code': 2000000,
     'trains_available_count': 0,
     'trains_count': 0,
     'trains_elreg_count': 0,
     'zone': 'main'}},
   {'_index': 'userway_alpha',
    '_type': 'event',
    '_id': 'AWQZ9GBk24KhZwUVekUO',
    '_score': 4.3910675,
    '_rou

In [29]:
print('date', 'count', 'unique')
for i in schedule_smartphone.aggregations.per_day.buckets:
    print(from_timestamp(i.key), i.doc_count, i.visitors.value)

date count unique
2018-06-20 00:00:00 439200 144937
2018-06-21 00:00:00 429982 135971


#### Агрегация по другим полям

In [31]:
schedule_msk_spb_1_query = s_e.query(Q('term', title='train_schedule_with_date')&
                                   Q('range', time={'gte':dt_str[0], 'lte':dt_str[1], 
                                                    'format':'yyyy-MM-dd HH:mm:ss', 'time_zone':'+03:00'})&
                                   Q('terms', train_departure_station_code=[2000000, 2004000])&
                                   Q('has_parent', query=
                                     Q('has_child', query=
                                       Q('term', device_type='smartphone'), 
                                       type='client'), 
                                     type='session'))

In [32]:
schedule_msk_spb_1_query.aggs.bucket('arr_stations', 'terms', field ='train_arrival_station_code', size=0)

Terms(field='train_arrival_station_code', size=0)

In [33]:
schedule_msk_spb_1 = schedule_msk_spb_1_query.execute()

In [34]:
schedule_msk_spb_1.to_dict()

{'took': 16544,
 'timed_out': False,
 '_shards': {'total': 144, 'successful': 144, 'failed': 0},
 'hits': {'total': 120544,
  'max_score': 4.1794524,
  'hits': [{'_index': 'userway_alpha',
    '_type': 'event',
    '_id': 'AWQbm1TS24KhZwUVpaGf',
    '_score': 4.1794524,
    '_routing': '9b080118-e535-4c25-8c30-38aabaedd28f',
    '_parent': '9b080118-e535-4c25-8c30-38aabaedd28f',
    '_source': {'mtime': 1529471522000,
     'type': 'page',
     'title': 'train_schedule_with_date',
     'time': 1529471462000,
     'ip': '188.162.65.9',
     'page_id': '5rMLjeWFGbI',
     'route_type': 'oneway',
     'search_date_timestamp': 1534280400000,
     'site_version': 'mobile',
     'train_arrival_station_code': 2064788,
     'train_departure_station_code': 2004000,
     'trains_available_count': 4,
     'trains_count': 4,
     'trains_elreg_count': 4,
     'zone': 'main'}},
   {'_index': 'userway_alpha',
    '_type': 'event',
    '_id': 'AWQcVFQVbBWT9FmHmMnt',
    '_score': 4.1794524,
    '_rout

In [59]:
print('arr_station', 'count')
for i in schedule_msk_spb_1.aggregations.arr_stations.buckets:
    print(i.key, i.doc_count)

arr_station count
2004000 12811
2000000 9992
2060001 4894
2064188 4438
2064150 2627
2060615 2367
2064130 2253
2064110 2012
2024000 1657
2014370 1535
2020500 1406
2020000 1305
2064788 1224
2064030 1163
2000150 1145
2064900 1142
2014000 1134
2060600 1130
2060340 1093
2000427 1017
2064140 951
2024700 890
2064000 856
2024600 847
2064040 804
2000170 801
2000140 783
2060630 782
2010130 774
2100000 763
2004600 762
2010000 754
2078001 734
2030000 718
4200300 672
2060620 658
2200000 622
2044700 619
2004300 615
2030400 588
2040000 582
2060150 526
2014700 517
2004200 504
2060500 503
2000174 484
2024120 477
2000125 475
2010050 475
2020600 470
2024900 463
2010041 462
1000001 447
2010441 421
2004500 414
2058000 414
2064170 408
2040210 401
2064001 390
2060410 367
2014130 358
2044800 356
2204000 327
2024550 317
2044000 310
2000160 308
2064020 308
4200379 298
2004400 297
2060370 281
2064050 279
2004579 275
2064070 272
2010290 269
2060350 266
2010090 265
2024810 253
2210793 252
2004460 245
2000351 244
2

2004549 3
2004558 3
2004577 3
2004663 3
2004683 3
2004714 3
2004826 3
2005048 3
2005309 3
2010011 3
2010053 3
2010113 3
2010183 3
2010186 3
2010201 3
2010313 3
2011229 3
2014499 3
2014532 3
2014743 3
2020748 3
2020780 3
2020896 3
2020899 3
2024514 3
2024630 3
2024690 3
2024713 3
2024730 3
2028033 3
2028052 3
2028064 3
2030148 3
2030264 3
2030290 3
2030314 3
2030653 3
2034650 3
2038175 3
2040357 3
2040580 3
2044677 3
2044850 3
2050350 3
2050418 3
2050490 3
2054320 3
2060075 3
2060341 3
2060480 3
2061534 3
2064038 3
2064098 3
2064141 3
2064265 3
2064789 3
2068400 3
2078775 3
2100014 3
2100025 3
2100066 3
2100145 3
2100153 3
2100210 3
2100225 3
2100253 3
2100254 3
2100276 3
2200062 3
2200144 3
2200198 3
2204580 3
2208002 3
2208410 3
2208460 3
2208640 3
2210821 3
2210865 3
2214200 3
2218060 3
2218070 3
2218095 3
2219095 3
2300145 3
2300578 3
2500480 3
2700001 3
2700152 3
2704817 3
2708700 3
2900970 3
5100022 3
5100158 3
5399100 3
5433295 3
7300415 3
8000102 3
8000600 3
8010110 3
8030000 3


## Exists

In [29]:
s1 = s_e.query(Q('term', title='etrain_buy_button_click')
              & time_query
              & ~Q('exists',field='train_found_train_hash')
              & Q('has_parent',  query = Q('has_child', query = Q('term', is_real='true'), type='client'),\
                  type='session'))

s1.aggs.bucket('hashs','terms', field = 'train_found_train_hash', size=7000)
res1 = s1.execute()

## Фильтры

In [30]:
# пример построения воронки из реальной АБ

In [31]:
dt_str_ab = ['2018-03-27 13:30:59', '2018-04-04 10:45:59']

In [32]:
s_uw = Search(using=client_uw).index('userway_alias').doc_type('session')

In [33]:
visit = A('filter', Q('has_child', query=
                      Q('prefix', title='train')&
                      Q('range', time={'gte':dt_str_ab[0], 'lte':dt_str_ab[1], 
                                       'format':'yyyy-MM-dd HH:mm:ss', "time_zone":"+03:00"}),
                      type='event').to_dict())

seats = A('filter', Q('has_child', query=
                      Q('term', title='train_seats')&
                      Q('range', time={'gte':dt_str_ab[0], 'lte':dt_str_ab[1], 
                                       'format':'yyyy-MM-dd HH:mm:ss', "time_zone":"+03:00"}),
                      type='event').to_dict())

passengers = A('filter', Q('has_child', query=
                                 Q('term', title='train_passengers')&
                                 Q('range', time={'gte':dt_str_ab[0], 'lte':dt_str_ab[1], 
                                                  'format':'yyyy-MM-dd HH:mm:ss', "time_zone":"+03:00"}), 
                           type='event').to_dict())

payment = A('filter', Q('has_child', query=
                                 Q('term', title='train_payment')&
                                 Q('range', time={'gte':dt_str_ab[0], 'lte':dt_str_ab[1], 
                                                  'format':'yyyy-MM-dd HH:mm:ss', "time_zone":"+03:00"}),
                        type='event').to_dict())

final = A('filter', Q('has_child', query=
                                 Q('term', title='train_final')&
                                 Q('range', time={'gte':dt_str_ab[0], 'lte':dt_str_ab[1], 
                                                  'format':'yyyy-MM-dd HH:mm:ss', "time_zone":"+03:00"}),
                      type='event').to_dict())

billing = A('filter', Q('has_child', query=
                                 Q('term', title='train_billing')&
                                 Q('range', time={'gte':dt_str_ab[0], 'lte':dt_str_ab[1], 
                                                  'format':'yyyy-MM-dd HH:mm:ss', "time_zone":"+03:00"}),
                        type='event').to_dict())

zhd_order = A('filter', Q('has_child', query=
                                 Q('term', type='zhd_order')&
                                 Q('range', time={'gte':dt_str_ab[0], 'lte':dt_str_ab[1], 
                                                  'format':'yyyy-MM-dd HH:mm:ss', "time_zone":"+03:00"}),
                          type='goal').to_dict())

In [34]:
ab = ['train_toilets_and_conds_on_v04', 'train_toilets_and_conds_off_v04']

In [35]:
def get_funnel(ab):
    #все сессии не ботов с нужным вариантом АБ
    sq = s_uw.query(Q('has_child', query=
                      Q('term', device_type='smartphone'), 
                      type='client')&
                    Q('term', ab='%s' % (ab[0]))&
                    ~Q('term', ab='%s' % (ab[1]))).params(size=0)
    
    #навешиваем фильтры
    sq.aggs.bucket('train_seats', seats).\
    bucket('train_passengers', passengers).bucket('train_payment', payment).bucket('train_final', final).\
    bucket('train_billing', billing).bucket('zhd_order', zhd_order)
    
    return sq.execute()

In [36]:
#строим воронку для варианта on
res_on = get_funnel(ab)

In [37]:
res_on.to_dict()

{'took': 8227,
 'timed_out': False,
 '_shards': {'total': 144, 'successful': 144, 'failed': 0},
 'hits': {'total': 154329, 'max_score': 0.0, 'hits': []},
 'aggregations': {'train_seats': {'doc_count': 115631,
   'train_passengers': {'doc_count': 20287,
    'train_payment': {'doc_count': 11223,
     'train_final': {'doc_count': 10012,
      'train_billing': {'doc_count': 9156,
       'zhd_order': {'doc_count': 8478}}}}}}}}

In [38]:
#строим воронку для варианта off
res_off = get_funnel(ab[::-1])

In [39]:
res_off.to_dict()

{'took': 10747,
 'timed_out': False,
 '_shards': {'total': 144, 'successful': 144, 'failed': 0},
 'hits': {'total': 153289, 'max_score': 0.0, 'hits': []},
 'aggregations': {'train_seats': {'doc_count': 114846,
   'train_passengers': {'doc_count': 20377,
    'train_payment': {'doc_count': 11252,
     'train_final': {'doc_count': 9988,
      'train_billing': {'doc_count': 9168,
       'zhd_order': {'doc_count': 8514}}}}}}}}

# Запрос к эластику логов 

In [8]:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q, A

HOST_LOGS = "http://kibana.logs.elc.tutu.ru:9280"
CLIENT_LOGS = Elasticsearch(hosts=HOST_LOGS, timeout=300)

def logs_search():
    return Search(using=CLIENT_LOGS).doc_type('logs')

dt_str = ['2019-06-01 00:00:00', '2019-06-30 23:59:59']

dt = [datetime_to_utc (i) for i in dt_str]

In [9]:
sms_subsсription_search = logs_search()
local_time_query = sms_subsсription_search.query(
    Q('term', log='usage_log')
    & Q('query_string',
        query='message:train_schedule_local_time AND @timestamp:[{} TO {}]'.format(*dt)
       )
)

In [12]:
i = 0
for res in local_time_query.scan():
    print(type(res['@timestamp']))
    print(res['@timestamp'].strptime())
    i += 1
    if i>1:
        break

<class 'str'>
2019-06-01T12:59:14.372Z
<class 'str'>
2019-06-04T12:24:07.859Z


In [13]:
with open('data/local_time.csv', 'w',  encoding='utf-8') as f:
    start = time.time()
    for res in local_time_query.scan():
        fields = [res.code, res['@timestamp'], res.message]
        print('\t'.join(list(map(str, fields))), file=f)
        i += 1
        if i % 100 == 0 or i == total:
            print('read {} rows ({:.2%}) in {:.2f} sec.'.format(i, i / total, time.time() - start))

NameError: name 'total' is not defined

NameError: name 'ss' is not defined