In [None]:
from google.cloud import bigquery
import pandas as pd
import numpy as np
import warnings
from tqdm.notebook import tqdm
from datetime import datetime, timedelta
from plotly import graph_objs as go
import plotly.express as px
from plotly.subplots import make_subplots
from plotly.offline import init_notebook_mode, iplot
import plotly.io as pio
import sys
from pathlib import Path
import statsmodels.api as sm
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
from prophet import Prophet
from prophet.diagnostics import performance_metrics, cross_validation
from prophet.plot import add_changepoints_to_plot
import itertools
import holidays

sys.path.append('../../src')

warnings.filterwarnings('ignore')

In [None]:
bigquery_client = bigquery.Client(project='analytics-dev-333113', location = 'europe-north1')

output_path = '../../../../..////' # path
output_dir = Path(output_path)
output_dir.mkdir(parents=True, exist_ok=True)

cache_path = 'cache_dir'
cache_dir = Path(cache_path)
cache_dir.mkdir(parents=True, exist_ok=True)

In [3]:
pio.templates.default = 'plotly_white'
pio.templates['plotly_white_custom'] = pio.templates['plotly_white']
pio.templates['plotly_white_custom']['layout']['xaxis'].update(showline=True, linecolor='black', tickfont=dict(size=10)) #, zeroline=True, zerolinecolor='black')
pio.templates['plotly_white_custom']['layout']['yaxis'].update(showline=True, linecolor='black', tickfont=dict(size=10))

pio.templates.default = 'plotly_white_custom'

In [None]:
geo_list = [1234,]

scheduler_dict = {
    1234:{
        'start_dt':'2026-01-01',
        'daily':{2:[(7,8),(17,18)],3:[(7,8),(17,18)],4:[(7,8),(17,18)],5:[(7,8),(17,18)],6:[(7,8),(17,18)],7:[(12,23)],1:[(12,23)]}
    }
}

#### Functions

In [5]:
def outliers_to_avg(df: pd.DataFrame, metric_cols: list[str]) -> pd.DataFrame:
    """
    Replaces outliers in metrics with the average value of the previous and next day
    metric_cols: List of column names with metrics where outliers need to be replaced
    Returns: pandas.DataFrame with outliers replaced
    """
    df = df.reset_index(drop=True)
    for metric_col in metric_cols:
        Q1 = df[metric_col].quantile(0.25)
        Q3 = df[metric_col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        is_outlier = (df[metric_col] < lower_bound) | (df[metric_col] > upper_bound)

        for idx in df[is_outlier].index:
            if 0 < idx < len(df) - 1:
                prev_value = df.loc[idx - 1, metric_col]
                next_value = df.loc[idx + 1, metric_col]
                avg_value = np.mean([prev_value, next_value])
                df.loc[idx, metric_col] = avg_value
    
    return df

In [6]:
def rolling_avg(df: pd.DataFrame, metric_cols: list[str], n_lag: int) -> pd.DataFrame:
    """
    Applies a rolling average to specified columns of a DataFrame
    n_lag (int): Window size for the rolling average
    Returns: pandas.DataFrame with specified columns replaced by their rolling averages
    """
    df_copy = df.copy()
    for col in metric_cols:
        if col in df_copy.columns:
            df_copy[col] = df_copy[col].rolling(window=n_lag, min_periods=1).mean()
        else:
            raise ValueError(f"Column '{col}' not found in DataFrame")

    return df_copy

In [7]:
def yoy_ratio(data: pd.DataFrame, metric_cols: list[str], metric_type: str) -> pd.DataFrame:
    """
    data: pandas.DataFrame with columns 'dt_part' and specified in metric_cols columns of a DataFrame
    metric_type: absolute ('abs') or relative ('rel')
    Returns: pandas.DataFrame, copy of original one with column 'yoy_ratio'
    """
    cols = ['dt_part'] + metric_cols
    df  = data[cols].sort_values('dt_part').copy()
    df['dt_part'] = df['dt_part'].dt.tz_localize(None)
    cur_year = df['dt_part'].dt.year.max()
    prev_year = cur_year-1

    df = pd.concat([
        df[df['dt_part'].dt.year == cur_year].reset_index(drop=True),
        df[df['dt_part'].dt.year == prev_year].reset_index(drop=True).rename(columns=dict(zip(cols,[_+'_lag' for _ in cols]))),
    ], axis=1).dropna()

    if metric_type == 'abs':
        for col in metric_cols:
            df[f'{col}_yoy'] = df[col] / df[f'{col}_lag'] -1
    elif metric_type == 'rel':
        for col in metric_cols:
            df[f'{col}_yoy'] = df[col] - df[f'{col}_lag']
    else:
        raise ValueError(f"metric_type has to be one of the following: 'abs' for absolute values or 'rel' for relative ones")

    df = df[['dt_part','dt_part_lag']+metric_cols+[_+'_lag' for _ in metric_cols]+[_+'_yoy' for _ in metric_cols]]
    return df

In [8]:
def get_cities_dict(city_ids_list: list[int]) -> dict[int, dict]:
    """
    Returns a dict of format:
    {City ID: {
        'city_name': City name, 
        'country_id': Country ID, 
        'country_name': Country name}
        }
    }
    """
    if not city_ids_list:
        raise ValueError('Empty list found')
    elif not all(isinstance(_, int) for _ in city_ids_list):
        raise TypeError('The list of cities IDs must contain only integers')
    geo_query = f"SELECT country_id, country_name, city_id, city_name FROM `indriver-e6e40.heap.vw_geo_mapping` WHERE city_id IN {tuple(city_ids_list)}"
    geo_dict = {row[2]:{'city_name':row[3], 'country_id':row[0], 'country_name':row[1]} for row in bigquery_client.query(geo_query)}
    return geo_dict

In [9]:
def get_vrect(city_id:int, max_dt:str, figure:go._figure.Figure, scheduler_dict:dict):
    daynum_mapping = dict(zip(
        range(0,7),
        [_ + 2 if _ != 6 else 1 for _ in range(0,7)]
    ))
    start_dt = datetime.strptime(scheduler_dict[city_id]['start_dt'], '%Y-%m-%d')
    end_dt = datetime.strptime(max_dt, '%Y-%m-%d')
    current_dt = start_dt

    while current_dt <= end_dt:
        day_of_week = daynum_mapping[current_dt.weekday()]
        if day_of_week in scheduler_dict[city_id]['daily']:
            for start_time, end_time in scheduler_dict[city_id]['daily'][day_of_week]:
                start_dttm = datetime.combine(current_dt, datetime.min.time()) + timedelta(hours=start_time)
                end_dttm = datetime.combine(current_dt, datetime.min.time()) + timedelta(hours=end_time+1)
                figure.add_vrect(x0=start_dttm, x1=end_dttm, line_width=0, fillcolor='blue', opacity=.05)
        current_dt += timedelta(days=1)
    return

In [10]:
def filter_hours(data:pd.DataFrame, city_id:int, max_dt:str, scheduler_dict:dict):
    df = data.copy()
    start_dt = scheduler_dict[city_id]['start_dt']
    city_schedule = scheduler_dict[city_id]['daily']

    def is_within_schedule(row):
        day, hour = row['weekday'], row['hour']

        if day not in city_schedule:
            return False
        for start, end in city_schedule[day]:
            if start <= hour <= end:
                return True
        return False

    df = df[
        (df['city_id'] == city_id) &
        (df['dt_part'] <= max_dt) &
        df.apply(is_within_schedule, axis=1)
    ]
    return df.sort_values('dt_part').reset_index(drop=True)

In [11]:
def get_seasonality_data(data: pd.DataFrame, metric_cols: list[str], scale:bool) -> pd.DataFrame:
    """
    data: pandas.DataFrame with columns 'dt_part' and specified in metric_cols columns of a DataFrame
    Returns: pandas.DataFrame, copy of original one with column 'yoy_ratio'
    """
    cols = ['dt_part','year'] + metric_cols
    df  = data[cols].sort_values('dt_part').copy()
    df['dt_part'] = df['dt_part'].dt.tz_localize(None)
    years = sorted(df['year'].unique())[::-1]
    last_year = years[0]
    first_year = years[-1]

    res = df[df['year'] == last_year].reset_index(drop=True)
    datapoints = res.shape[0]
    current_year = last_year-1

    # scaling
    if scale:
        for col in metric_cols:
            res[col] = (res[col] - res[col].mean()) / res[col].mean()

    while current_year >= first_year:
        temp_df = df[df['year'] == current_year].reset_index(drop=True)[:datapoints]
        temp_df['dt_part'] = res[res['year'] == last_year]['dt_part'].values
        if scale:
            for col in metric_cols:
                temp_df[col] = (temp_df[col] - temp_df[col].mean()) / temp_df[col].mean()
        res = pd.concat([
            res,
            temp_df
        ], axis =0) # long
        current_year -= 1

    return res

In [12]:
def plot_forecast(df):
    # Продлим линии доверительного интервала и прогноза до конца факта, чтоб они красиво сошлись в одной точке
    df = df.copy().sort_index()
    last_fact_date = df.forecast.dropna().index[0] - pd.DateOffset(1)
    df.loc[last_fact_date, ['forecast', 'low', 'high']] = df.loc[last_fact_date, 'value']
    
    fig = go.Figure()
    
    # Fact line
    fig.add_scatter(x=df.index, y=df.value, name='fact')
    # Confidence interval
    fig.add_traces([
        go.Scatter(x = df.index, y=df.high, mode = 'lines',line_color = 'rgba(0,0,0,0)', showlegend = False),
        go.Scatter(x = df.index, y=df.low, mode = 'lines', line_color = 'rgba(0,0,0,0)', name = 'confidence interval', fill='tonexty', fillcolor = 'rgba(150, 150, 150, 0.5)')
    ])
    # Forecast line
    fig.add_scatter(x=df.index, y=df.forecast, name='forecast')
    
    # Last fact date used for forecast calculation
    fig.add_vline(x=last_fact_date, line_width=1, line_dash='dash', line_color='black')
    return fig

In [13]:
def df_filter_and_markup(data: pd.DataFrame, city_id:int, scheduler_dict:dict, metric_nm:str) -> pd.DataFrame:
    df = data[data['dt_part'].dt.date < datetime(2025,3,3).date()][['dt_part','year','weekday','hour',metric_nm]].copy()
    df['dt_part'] = df['dt_part'].dt.tz_localize(None)
    start_dt = pd.to_datetime(scheduler_dict[city_id]['start_dt']).date()
    city_schedule = scheduler_dict[city_id]['daily']

    df['after_treatment'] = df['dt_part'].dt.date >= start_dt
    df['weekend'] = df['weekday'].isin(range(2,6)) | ((df['weekday'] == 6) & (df['hour'] >= 16))

    def is_within_schedule(row):
        day, hour = row['weekday'], row['hour']
        if day not in city_schedule:
            return False
        for start, end in city_schedule[day]:
            if start <= hour <= end:
                return True
        return False

    df = df[df.apply(is_within_schedule, axis=1)].sort_values('dt_part').reset_index(drop=True)
    return df[['dt_part','after_treatment','weekend',metric_nm]].rename(columns={metric_nm:'metric_val'})

In [14]:
def get_significance(data: pd.DataFrame, weekend: bool):
    df = data[data['weekend'] == weekend]
    before_treatment = df[~df['after_treatment']]['metric_val'].values
    after_treatment = df[df['after_treatment']]['metric_val'].values
    
    ci = (np.quantile(before_treatment, .025), np.quantile(before_treatment, .975))
    p_values = []

    for val in after_treatment:
        p_values.append(val >= ci[0] and val <= ci[1])

    return np.round(ci, 3), np.round(np.mean(p_values), 3), before_treatment, np.round(np.mean(after_treatment), 3)

## Analysis (trasport_type = `any`)

In [None]:
cities_dict = get_cities_dict(geo_list)
cities_dict

### `any` transport type & order type

#### Select

In [None]:
main_metrics_hourly_query = """
"""

main_metrics_hourly_df = utils.read_sql_bq(main_metrics_hourly_query, client=bigquery_client, cache_dir=cache_dir)
print(main_metrics_hourly_df.shape)
main_metrics_hourly_df.head(3)

In [None]:
main_metrics_past_hourly_query = """
;"""

main_metrics_past_hourly_df = utils.read_sql_bq(main_metrics_past_hourly_query, client=bigquery_client, cache_dir=cache_dir)
print(main_metrics_past_hourly_df.shape)
main_metrics_past_hourly_df.head(3)

In [None]:
main_metrics_daily_to_forecast_query = """
;"""

main_metrics_daily_to_forecast_df = utils.read_sql_bq(main_metrics_daily_to_forecast_query, client=bigquery_client, cache_dir=cache_dir)
print(main_metrics_daily_to_forecast_df.shape)
main_metrics_daily_to_forecast_df.head(3)

In [None]:
get_sc_donors_query = """
;"""

sc_donors_dict = {row[0]:[int(_) for _ in row[2].split(',')] for row in bigquery_client.query(get_sc_donors_query)}
sc_donors_list = list(set(sum(sc_donors_dict.values(), [])))
sc_donors_dict

In [None]:
main_metrics_daily_to_sc_query = f"""
;"""

main_metrics_daily_to_sc_df = utils.read_sql_bq(main_metrics_daily_to_sc_query, client=bigquery_client, cache_dir=cache_dir)
print(main_metrics_daily_to_sc_df.shape)
main_metrics_daily_to_sc_df.head(3)

#### Data prep.

In [None]:
t_city = 1234
t_city_nm = cities_dict[t_city]['city_name']
t_city_nm_title = f'<b> {t_city_nm}</b>'
t_start_dt = pd.to_datetime(scheduler_dict[t_city]['start_dt']).date()
plot_start_date = datetime(2025,2,5).date()

# Current data
t_metrics_hourly_df = main_metrics_hourly_df[
    (main_metrics_hourly_df['city_id'] == t_city) &
    (main_metrics_hourly_df['dt_part'].dt.date >= plot_start_date) &
    (main_metrics_past_hourly_df['dt_part'].dt.date < datetime(2026,1,1).date())
].sort_values('dt_part').reset_index(drop=True)

#YoY data
t_past_hourly_df = main_metrics_past_hourly_df[
    (main_metrics_past_hourly_df['city_id']==t_city) &
    (main_metrics_past_hourly_df['dt_part'].dt.date < datetime(2026,1,1).date())
].sort_values('dt_part').reset_index(drop=True)

# Forecast
start_dt = pd.to_datetime(scheduler_dict[t_city]['start_dt']).date()
horizon = (datetime(2026,1,1).date() - start_dt).days

# Forecast - autoregression
ar_forecast_df = main_metrics_daily_to_forecast_df[
    (main_metrics_daily_to_forecast_df['city_id']==t_city)
].sort_values('dt_part').drop(columns=['city_id'])

ar_forecast_df['dt_part'] = pd.to_datetime(ar_forecast_df['dt_part'])
ar_forecast_df = ar_forecast_df.set_index('dt_part').resample('D').sum().astype(float)

# Forecast - SC
sc_forecast_df = main_metrics_daily_to_sc_df[main_metrics_daily_to_sc_df['city_id'].isin(sc_donors_dict[t_city])]
sc_forecast_df['dt_part'] = pd.to_datetime(sc_forecast_df['dt_part'])

print('City:\t\t', f'{t_city}: {t_city_nm}')
print('Start date:\t',start_dt)

#### Commission revenue

In [None]:
title=f'{t_city_nm_title}: Commission revenue hourly'
filename = f'{t_city_nm}: Commission revenue hourly'

fig = px.line(
    t_metrics_hourly_df,
    x='dt_part',
    y='commission_revenue_usd',
    title=title,
    labels={'dt_part': '', 'commission_revenue_usd': ''}
)
get_vrect(t_city, '2025-03-11', fig, scheduler_dict=scheduler_dict)
fig.add_vline(t_start_dt, line_width=1, line_dash='dash', line_color='black') #, annotation_text='start date', annotation_position='top left')

fig.update_layout(width=1400, height=500, hovermode='x')

fig.show()
# fig.write_image(f'{output_path}{filename}.png', scale=2)

#### Done rate

##### Current data

In [None]:
title=f'{t_city_nm_title}: Done rate hourly'
filename = f'{t_city_nm}: Done rate hourly'

fig = px.line(
    t_metrics_hourly_df,
    x='dt_part', 
    y='done_rate', 
    title=title,
    labels={'dt_part': '', 'done_rate':''}
)
get_vrect(t_city, '2025-03-02', fig, scheduler_dict=scheduler_dict)
fig.add_vline(t_start_dt, line_width=1, line_dash='dash', line_color='black')

fig.update_layout(width=1200, height=500, legend=dict(
    orientation='h',
    yanchor='bottom',
    y=1,
    xanchor='left',
    x=0
), hovermode='x')
fig.update_yaxes(tickformat='.0%')

fig.show()
fig.write_image(f'{output_path}{filename}.png', scale=2)

##### YoY

In [None]:
title=f'{t_city_nm_title}: Done rate YoY'
filename = f'{t_city_nm}: Done rate YoY'

fig = px.line(
    yoy_ratio(t_past_hourly_df, ['done_rate'], 'rel'),
    x='dt_part', 
    y='done_rate_yoy', 
    title=title,
    labels={'dt_part': '', 'done_rate_yoy':''}
)
get_vrect(t_city, '2025-03-02', fig, scheduler_dict=scheduler_dict)
fig.add_vline(t_start_dt, line_width=1, line_dash='dash', line_color='black')
fig.add_hline(0, line_width=1, line_dash='dash', line_color='black')

fig.update_layout(width=1200, height=500, legend=dict(
    orientation='h',
    yanchor='bottom',
    y=1,
    xanchor='left',
    x=0
), hovermode='x')
fig.update_yaxes(tickformat='.0%')
fig.update_xaxes(range=[plot_start_date, datetime(2025,3,2).date()])

fig.show()
fig.write_image(f'{output_path}{filename}.png', scale=2)

##### Seasonality

In [None]:
title=f'{t_city_nm_title}: Done rate seasonality'
filename = f'{t_city_nm}: Done rate seasonality'

fig = px.line(
    get_seasonality_data(t_past_hourly_df, ['done_rate'], scale=False),
    x='dt_part', 
    y='done_rate',
    color='year', 
    title=title,
    labels={'dt_part': '', 'done_rate':'', 'year':''}
)

fig.for_each_trace(
    lambda trace: trace.update(line=dict(color='blue', width=2.5)) 
    if trace.name == '2025'
    else trace.update(line=dict(color='gray', dash='dot'))
)

get_vrect(t_city, '2025-03-02', fig, scheduler_dict=scheduler_dict)

fig.update_layout(width=1200, height=500, showlegend=False)
fig.update_yaxes(tickformat='.0%')
fig.update_xaxes(range=[plot_start_date, datetime(2025,3,2).date()])
# fig.update_yaxes(showticklabels=False)

fig.show()
fig.write_image(f'{output_path}{filename}.png', scale=2)

##### Significance

In [52]:
t_metric_nm = 'done_rate'
t_sign_df = df_filter_and_markup(data=main_metrics_past_hourly_df, city_id=t_city, scheduler_dict=scheduler_dict, metric_nm=t_metric_nm)

In [None]:
title = f'{t_city_nm}: Done rate distribution (weekday)'

t_weekend=False
ci, p_value, before_val_array, after_avg_value = get_significance(t_sign_df, weekend=t_weekend)

fig, (ax_box, ax_hist) = plt.subplots(
    2, 1,
    sharex=True,
    gridspec_kw={'height_ratios': [1, 4]},
    figsize=(8, 6)
)

sns.boxplot(before_val_array, ax=ax_box, orient='h')
ax_box.set(xlabel='', ylabel=None, yticks=[])

sns.histplot(before_val_array, kde=False, ax=ax_hist)
ax_hist.set(xlabel='Before treatment', ylabel=None, yticks=[])

ax_box.axvline(x=after_avg_value, color='black', linestyle='--', linewidth=1)
ax_hist.axvline(x=after_avg_value, color='black', linestyle='--', linewidth=1)
ax_hist.text(after_avg_value*1.02, ax_hist.get_ylim()[1]*.95, 'After treatment', color='black', fontsize=9, ha='left', va='center', rotation=0)

plt.suptitle(title, fontsize=12, ha='right')
plt.tight_layout()
plt.savefig(f'{output_path}{title}.png', dpi=300, bbox_inches='tight')

print(f'Confidence interval:\t{ci}')
print(f'P-value:\t\t{p_value}')
plt.show()