-
Notifications
You must be signed in to change notification settings - Fork 0
/
aggregation.py
114 lines (95 loc) · 5.15 KB
/
aggregation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from datetime import datetime
from typing import List, Any
import pandas as pd
from elasticsearch import Elasticsearch
PRS_REVIEWED_AND_MERGED: str = "prs_reviewed_and_merged"
PRS_AUTHORED_AND_MERGED: str = "prs_authored_and_merged"
PRS_AUTHORED: str = "prs_authored"
def do_query_with_aggregation(elastic_search: Elasticsearch, pull_request_index: str, aggregation_name: str,
query: dict[str, Any],
date_histogram: dict[str, Any]) -> pd.DataFrame:
search_results: dict[str, dict] = elastic_search.search(index=pull_request_index,
size=0,
query=query,
aggs={
aggregation_name: {
"date_histogram": date_histogram
}
})
event_date_field: str = "event_date"
results: List[dict[str, Any]] = [
{event_date_field: datetime.strptime(data["key_as_string"], "%Y-%m-%dT%H:%M:%S.%fZ"),
aggregation_name: data["doc_count"]}
for data in
search_results['aggregations'][aggregation_name]['buckets']]
result_dataframe: pd.DataFrame = pd.DataFrame(results)
if not result_dataframe.empty:
result_dataframe = result_dataframe.set_index(event_date_field)
return result_dataframe
def get_prs_reviewed_and_merged(es: Elasticsearch, pull_request_index: str, user_login: str,
calendar_interval: str = "month") -> pd.DataFrame:
result_dataframe: pd.DataFrame = do_query_with_aggregation(es, pull_request_index, PRS_REVIEWED_AND_MERGED, query={
"bool": {
"must": {
"match": {"merged_by.login": user_login}
},
"must_not": {
"match": {"user.login": user_login}
}
}
}, date_histogram={
"field": "merged_at",
"calendar_interval": calendar_interval
})
return result_dataframe
def get_prs_authored(elastic_search: Elasticsearch, pull_request_index: str, user_login: str,
calendar_interval: str = "month") -> pd.DataFrame:
result_dataframe: pd.DataFrame = do_query_with_aggregation(elastic_search, pull_request_index,
PRS_AUTHORED,
query={"bool": {
"must": {
"match": {"user.login": user_login}
},
"must_not": {
"match": {"merged_by.login": user_login}
}
}
}, date_histogram={"field": "created_at",
"calendar_interval": calendar_interval
})
return result_dataframe
def get_prs_authored_and_merged(es: Elasticsearch, pull_request_index: str, user_login: str,
calendar_interval: str = "month"):
result_dataframe: pd.DataFrame = do_query_with_aggregation(es, pull_request_index, PRS_AUTHORED_AND_MERGED, query={
"bool": {
"must": [
{
"match": {"user.login": user_login}
},
{
"match": {"merged": "true"}
}],
"must_not": {
"match": {"merged_by.login": user_login}
}
}
}, date_histogram={
"field": "merged_at",
"calendar_interval": calendar_interval
})
return result_dataframe
def get_all_mergers(es: Elasticsearch, pull_request_index: str, ) -> List[str]:
aggregation_name: str = "frequent_mergers"
search_results: dict[str, dict] = es.search(index=pull_request_index,
size=0,
aggs={
aggregation_name: {
"terms": {
"field": "merged_by.login.keyword"
}
}
})
mergers: List[str] = [merger_data['key'] for merger_data in
search_results['aggregations'][aggregation_name]['buckets']]
print("mergers", mergers)
return mergers