Skip to content

Commit

Permalink
More perf optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
dyang415 committed Sep 2, 2023
1 parent 67984de commit caa3a78
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions backend/app/insight/services/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,20 @@ def parAnalyzeHelper(
agg_method: Dict[str, str],
metrics_name: Dict[str, str],
columns_list: List[List[str]],
expected_value: float):
po_agg_method = [build_polars_agg(name, method) for name, method in agg_method.items()]
expected_value: float,
all_columns: List[str]):
overall_agg_methods = [build_polars_agg(name, method) for name, method in agg_method.items()]
baseline_df = baseline_df.groupby(all_columns).agg(overall_agg_methods)
comparison_df = comparison_df.groupby(all_columns).agg(overall_agg_methods)

res = polars.DataFrame()
sub_df_agg_methods = [build_polars_agg(name, "sum") for name, method in agg_method.items()]

def func(columns):
columns = list(columns)
baseline = baseline_df.groupby(columns).agg(po_agg_method).rename(metrics_name)
comparison = comparison_df.groupby(columns).agg(po_agg_method).rename(metrics_name)
baseline = baseline_df.groupby(columns).agg(sub_df_agg_methods).rename(metrics_name)
comparison = comparison_df.groupby(columns).agg(sub_df_agg_methods).rename(metrics_name)

joined: polars.DataFrame = comparison.join(
joined: polars = comparison.join(
baseline,
on=columns,
suffix='_baseline',
Expand Down Expand Up @@ -199,10 +202,8 @@ def func(columns):
func, columns
) for columns in columns_list]
wait(futures)
for future in futures:
res = polars.concat([res, future.result()])

return res
return polars.concat([future.result() for future in futures])


class NpEncoder(json.JSONEncoder):
Expand Down Expand Up @@ -361,11 +362,18 @@ def slice(self):

def getDimensions(self) -> Dict[str, Dimension]:
dimensions = {}
unique_values_df = polars.concat([
self.df.lazy()
.select(polars.col(column).unique().cast(str).alias("value"))
.with_columns(polars.lit(column).alias("key"))
for column in self.columns_of_interest
]).collect()

for column in self.columns_of_interest:
values = self.df[column].unique()
values = list(
filter(lambda x: x is not None and x is not np.NaN, values))
dimensions[column] = Dimension(name=column, values=values)
rows = unique_values_df.filter(
(polars.col("key").eq(column)) & (polars.col("value").is_not_null())
).select("value").rows(named=True)
dimensions[column] = Dimension(name=column, values=[row['value'] for row in rows])
return dimensions

def getTopDrivingDimensionSliceKeys(self,
Expand All @@ -391,13 +399,13 @@ def getTopDrivingDimensionSliceKeys(self,
for key in childSliceKey if key in slice_info_dict]
return list(map(lambda slice: slice.serializedKey, slice_info[:topN]))

def buildMetrics(self, metricsName: str, skip_details: bool) -> Metric:
def buildMetrics(self, metricsName: str) -> Metric:
metrics = Metric()
metrics.name = metricsName
metrics.baselineNumRows = self.aggs['count_baseline'].sum()
metrics.comparisonNumRows = self.aggs['count'].sum()
metrics.dimensions = self.getDimensions()
metrics.totalSegments = len(self.slices_df.rows())
metrics.totalSegments = self.slices_df.select(polars.count()).row(0)[0]
metrics.keyDimensions = self.keyDimensions

# Build dimension slice info
Expand Down Expand Up @@ -442,7 +450,7 @@ def getSlices(self):
def getMetrics(self) -> str:
logger.info(f'Building metrics for {self.metrics_name}')
ret = {
k: self.buildMetrics(k, idx > 0)
k: self.buildMetrics(k)
for idx, (k, v) in enumerate(self.metrics_name.items())
if k != self.date_column
}
Expand Down Expand Up @@ -481,7 +489,6 @@ def par_analyze(self,
joined = baseline.join(comparison, on=['dimension_value', 'dimension_name'], how='outer')
joined.fill_nan(0).fill_null(0)


single_dimension_df = polars.concat([single_dimension_df, joined])

multi_dimension_grouping_result = parAnalyzeHelper(
Expand All @@ -490,7 +497,8 @@ def par_analyze(self,
self.agg_method,
metrics_name,
columns,
self.expected_value
self.expected_value,
self.columns_of_interest
)

return multi_dimension_grouping_result, find_key_dimensions(single_dimension_df.to_pandas())

0 comments on commit caa3a78

Please sign in to comment.