Skip to content

Commit

Permalink
fix prefill
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyl committed Sep 3, 2023
1 parent 1c994e7 commit 8234a5e
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 73 deletions.
97 changes: 60 additions & 37 deletions backend/app/insight/services/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class Metric:
def toDimensionSliceInfo(df: polars.DataFrame, metrics_name, baselineCount: int, comparisonCount, expected_value):
# calculate the change and std
df = df.with_columns(
(polars.col(metrics_name) - polars.col(f"{metrics_name}_baseline")).abs().alias("impact")
(polars.col(metrics_name) -
polars.col(f"{metrics_name}_baseline")).abs().alias("impact")
)

df = df.with_columns(
Expand Down Expand Up @@ -142,16 +143,20 @@ def parAnalyzeHelper(
columns_list: List[List[str]],
expected_value: float,
all_columns: List[str]):
overall_agg_methods = [build_polars_agg(name, method) for name, method in agg_method.items()]
overall_agg_methods = [build_polars_agg(
name, method) for name, method in agg_method.items()]
baseline_df = baseline_df.groupby(all_columns).agg(overall_agg_methods)
comparison_df = comparison_df.groupby(all_columns).agg(overall_agg_methods)

sub_df_agg_methods = [build_polars_agg(name, "sum") for name, method in agg_method.items()]
sub_df_agg_methods = [build_polars_agg(
name, "sum") for name, method in agg_method.items()]

def func(columns):
columns = list(columns)
baseline = baseline_df.groupby(columns).agg(sub_df_agg_methods).rename(metrics_name)
comparison = comparison_df.groupby(columns).agg(sub_df_agg_methods).rename(metrics_name)
baseline = baseline_df.groupby(columns).agg(
sub_df_agg_methods).rename(metrics_name)
comparison = comparison_df.groupby(columns).agg(
sub_df_agg_methods).rename(metrics_name)

joined: polars = comparison.join(
baseline,
Expand All @@ -164,33 +169,38 @@ def func(columns):
.drop(columns)

analyzing_metric_name = next(iter(metrics_name.values()))
metric_value_sum, baseline_metric_value_sum = joined.select((polars.col(analyzing_metric_name, f"{analyzing_metric_name}_baseline").sum())).row(0)
metric_value_sum, baseline_metric_value_sum = joined.select((polars.col(
analyzing_metric_name, f"{analyzing_metric_name}_baseline").sum())).row(0)

joined = joined \
.with_columns((polars.lit(metric_value_sum) + polars.lit(baseline_metric_value_sum)).alias("sum")) \
.with_columns(
((polars.col(analyzing_metric_name) + polars.col(f"{analyzing_metric_name}_baseline")) / (
((polars.col(analyzing_metric_name) + polars.col(f"{analyzing_metric_name}_baseline")) / (
polars.lit(metric_value_sum) + polars.lit(baseline_metric_value_sum))
).alias("weight"),
polars.when(
polars.col(f"{analyzing_metric_name}_baseline") == 0
).then(
).alias("weight"),
polars.when(
polars.col(analyzing_metric_name) > 0
).then(polars.lit(1)).otherwise(polars.lit(-1))
).otherwise(
(polars.col(analyzing_metric_name) - polars.col(f"{analyzing_metric_name}_baseline")) / polars.col(
f"{analyzing_metric_name}_baseline")
).alias("change")
).with_columns(
(polars.col("change") - polars.lit(expected_value)).alias("calibrated_change")
).with_columns(
(polars.col("weight") * polars.col("calibrated_change")).alias("weighted_change")
)
polars.col(f"{analyzing_metric_name}_baseline") == 0
).then(
polars.when(
polars.col(analyzing_metric_name) > 0
).then(polars.lit(1)).otherwise(polars.lit(-1))
).otherwise(
(polars.col(analyzing_metric_name) - polars.col(f"{analyzing_metric_name}_baseline")) / polars.col(
f"{analyzing_metric_name}_baseline")
).alias("change")
).with_columns(
(polars.col("change") - polars.lit(expected_value)
).alias("calibrated_change")
).with_columns(
(polars.col("weight") * polars.col("calibrated_change")
).alias("weighted_change")
)

weighted_change_mean = joined.select(polars.col("weighted_change").sum()).row(0)
weighted_change_mean = joined.select(
polars.col("weighted_change").sum()).row(0)
weighted_std = (joined.select(
(polars.col("weight") * (polars.col("weighted_change") - polars.lit(weighted_change_mean)).pow(2)).sum().sqrt()
(polars.col("weight") * (polars.col("weighted_change") -
polars.lit(weighted_change_mean)).pow(2)).sum().sqrt()
)).row(0)
joined = joined.with_columns(
polars.lit(weighted_std).alias("weighted_std")
Expand Down Expand Up @@ -252,7 +262,8 @@ def process_single_dimension_df(df: pd.DataFrame) -> pd.DataFrame:
df = df.reset_index().set_index('dimension_name')
df = df.reset_index().join(sum_df, how='inner', rsuffix='_sum', on='dimension_name')

df['weight'] = (df['metric_value_comparison'] + df['metric_value_baseline']) / (df['metric_value_comparison_sum'] + df['metric_value_baseline_sum'])
df['weight'] = (df['metric_value_comparison'] + df['metric_value_baseline']) / \
(df['metric_value_comparison_sum'] + df['metric_value_baseline_sum'])
df['change'] = np.where(df['metric_value_baseline'] == 0, 0,
(df['metric_value_comparison'] - df['metric_value_baseline']) / df['metric_value_baseline'])
# df['change'] = np.where(df['metric_value_baseline'] == 0, df['change'].max(),
Expand All @@ -262,13 +273,16 @@ def process_single_dimension_df(df: pd.DataFrame) -> pd.DataFrame:
df_with_weighted_mean_change = df.groupby(['dimension_name']).agg({
'weighted_change': 'sum', # sum of weight always = 1, no need to divide
}).rename(columns={'weighted_change': 'weighted_change_mean'})
merged_df = pd.merge(df, df_with_weighted_mean_change, on='dimension_name', how='inner')
merged_df['weighted_change_std_input'] = merged_df['weight'] * np.power(merged_df['change'] - merged_df['weighted_change_mean'], 2)
merged_df = pd.merge(df, df_with_weighted_mean_change,
on='dimension_name', how='inner')
merged_df['weighted_change_std_input'] = merged_df['weight'] * \
np.power(merged_df['change'] - merged_df['weighted_change_mean'], 2)

grouped_merged_df = merged_df.groupby(['dimension_name']).agg({
'weighted_change_std_input': 'sum'
})
grouped_merged_df['result'] = np.sqrt(np.array(grouped_merged_df['weighted_change_std_input'], dtype=np.float64))
grouped_merged_df['result'] = np.sqrt(
np.array(grouped_merged_df['weighted_change_std_input'], dtype=np.float64))

return grouped_merged_df.reset_index()

Expand Down Expand Up @@ -333,8 +347,10 @@ def __init__(self,

def gen_agg_df(self):
po_agg_method = self.po_agg_method()
baseline = self.baseline_df.select(po_agg_method).rename(self.metrics_name)
comparison = self.comparison_df.select(po_agg_method).rename(self.metrics_name)
baseline = self.baseline_df.select(
po_agg_method).rename(self.metrics_name)
comparison = self.comparison_df.select(
po_agg_method).rename(self.metrics_name)

return comparison.join(baseline, suffix='_baseline', how='cross').fill_nan(0).fill_null(0)

Expand All @@ -358,7 +374,8 @@ def slice(self):
columns_list = []
for i in range(1, min(4, len(self.columns_of_interest) + 1)):
columns_list.extend(combinations(self.columns_of_interest, i))
self.slices_df, self.keyDimensions = self.par_analyze(columns_list, self.metrics_name)
self.slices_df, self.keyDimensions = self.par_analyze(
columns_list, self.metrics_name)

def getDimensions(self) -> Dict[str, Dimension]:
dimensions = {}
Expand All @@ -371,9 +388,11 @@ def getDimensions(self) -> Dict[str, Dimension]:

for column in self.columns_of_interest:
rows = unique_values_df.filter(
(polars.col("key").eq(column)) & (polars.col("value").is_not_null())
(polars.col("key").eq(column)) & (
polars.col("value").is_not_null())
).select("value").rows(named=True)
dimensions[column] = Dimension(name=column, values=[row['value'] for row in rows])
dimensions[column] = Dimension(
name=column, values=[row['value'] for row in rows])
return dimensions

def getTopDrivingDimensionSliceKeys(self,
Expand Down Expand Up @@ -432,8 +451,10 @@ def buildMetrics(self, metricsName: str) -> Metric:

metrics.aggregationMethod = self.agg_method[metricsName]
metrics.expectedChangePercentage = self.expected_value
metrics.baselineValueByDate = self.gen_value_by_date(self.baseline_df, metricsName)
metrics.comparisonValueByDate = self.gen_value_by_date(self.comparison_df, metricsName)
metrics.baselineValueByDate = self.gen_value_by_date(
self.baseline_df, metricsName)
metrics.comparisonValueByDate = self.gen_value_by_date(
self.comparison_df, metricsName)

metrics.baselineDateRange = [self.baseline_date_range[0].strftime(
"%Y-%m-%d"), self.baseline_date_range[1].strftime("%Y-%m-%d")]
Expand Down Expand Up @@ -465,7 +486,8 @@ def par_analyze(self,
metrics_name: Dict[str, str]):
single_dimension_df = polars.DataFrame()
for column in self.columns_of_interest:
metric_name, aggregation_method = next(iter(self.agg_method.items()))
metric_name, aggregation_method = next(
iter(self.agg_method.items()))

baseline = self.baseline_df.rename({
column: 'dimension_value',
Expand All @@ -486,7 +508,8 @@ def par_analyze(self,
build_polars_agg('metric_value_comparison', aggregation_method)
)

joined = baseline.join(comparison, on=['dimension_value', 'dimension_name'], how='outer')
joined = baseline.join(
comparison, on=['dimension_value', 'dimension_name'], how='outer')
joined.fill_nan(0).fill_null(0)

single_dimension_df = polars.concat([single_dimension_df, joined])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,10 @@ const sampleDataPrefills: PrefillConfig = {
aggregationOption: "distinct",
expectedValue: 0.03,
fieldType: "VARCHAR",
},
eventTime: {
type: "date",
fieldType: "TIMESTAMP",
},
country: {
type: "dimension",
fieldType: "VARCHAR",
},
gender: {
type: "dimension",
fieldType: "VARCHAR",
},
majorOsVersion: {
type: "dimension",
fieldType: "VARCHAR",
},
phoneBrand: {
type: "dimension",
fieldType: "VARCHAR",
},
age: {
type: "dimension",
fieldType: "VARCHAR",
},
language: {
type: "dimension",
fieldType: "VARCHAR",
},
platform: {
type: "dimension",
fieldType: "VARCHAR",
},
}
},
dateColumn: "eventTime",
groupByColumns: ["country", "gender", "majorOsVersion", "phoneBrand", "age", "language", "platform"],
baseDateRange: {
from: createNewDateWithBrowserTimeZone("2022-07-01"),
to: createNewDateWithBrowserTimeZone("2022-07-31"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ function ReportConfig({
useEffect(() => {
if (prefilledConfigs) {
setSelectedColumns(prefilledConfigs.selectedColumns);
setDateColumn(prefilledConfigs.dateColumn);
setGroupByColumns(prefilledConfigs.groupByColumns);
}
}, [prefilledConfigs]);

Expand Down Expand Up @@ -394,9 +396,7 @@ function ReportConfig({
(h) => `${h} - ${rowCountByColumn[h]} distinct values`
)}
values={getValidDimensionColumns()}
selectedValues={Object.keys(selectedColumns).filter(
(c) => selectedColumns[c]["type"] === "dimension"
)}
selectedValues={groupByColumns}
onValueChange={onSelectDimension}
instruction={
<Text>
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/types/report-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export interface PrefillConfig {
selectedColumns: {
[key: string]: ColumnConfig;
};
dateColumn: string;
groupByColumns: string[];
baseDateRange: DateRangePickerValue;
comparisonDateRange: DateRangePickerValue;
}

0 comments on commit 8234a5e

Please sign in to comment.