## Analysis of pipeline execution time across different branches

In [78]:
# dependencies for this notebook that aren't already in the 'emission' environment
!pip install plotly
!pip install nbformat

#%env DB_HOST=mongodb://localhost/openpath_prod_ccebikes

from __future__ import annotations
import subprocess
import time
import pandas as pd
import plotly.express as px
import arrow
import emission.core.timer as ect
import emission.storage.decorations.stats_queries as esds
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.timeseries.timequery as estt



In [79]:
branches = {'sect_opt': {}, 'master': {}}

day = "shankari_2016-07-27"
opcode = "nrelop_dev-emulator-study_0"

curr_git_branch_name = subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD']).decode('utf-8').strip()

# for branch_name in branches.keys():
#     print(f"Checking out {branch_name} to measure pipeline runtime")
#     !git checkout $branch_name
#     !./e-mission-py.bash ./bin/reset_pipeline.py --all

#     with ect.Timer() as t:
#         !./e-mission-py.bash ./bin/intake_multiprocess.py 5
 
    
#     now = time.time()
#     tq = estt.TimeQuery("data.ts", now - t.elapsed, now)
#     print(f"Ran pipeline on {branch_name}, {tq}")
#     branches[branch_name]['tq'] = tq

#     print(f"Switching back to {curr_git_branch_name}")
#     !git checkout $curr_git_branch_name

for branch_name in branches.keys():
    print(f"Checking out {branch_name} to measure pipeline runtime")
    !git checkout $branch_name
    !./e-mission-py.bash ./bin/reset_pipeline.py --all
    !./e-mission-py.bash bin/debug/load_timeline_for_day_and_user.py emission/tests/data/real_examples/$day $opcode
    with ect.Timer() as t:
        !./e-mission-py.bash bin/debug/intake_single_user.py -e $opcode
    
    now = time.time()
    tq = estt.TimeQuery("data.ts", now - t.elapsed, now)
    print(f"Ran pipeline on {branch_name}, {tq}")
    branches[branch_name]['tq'] = tq

    print(f"Switching back to {curr_git_branch_name}")
    !git checkout $curr_git_branch_name


# !./e-mission-py.bash ./bin/reset_pipeline.py --all
# if db_host:=os.environ.get('DB_HOST'):
#     with ect.Timer() as t:
#         !./e-mission-py.bash ./bin/intake_multiprocess.py 5
#     esds.store_pipeline_time(None, f"{curr_git_branch_name}_intake_multiprocess/{db_host}", time.time(), t.elapsed)

Checking out sect_opt to measure pipeline runtime
Already on 'sect_opt'
Config file not found, returning a copy of the environment variables instead...
Retrieved config: {'DB_HOST': None, 'DB_RESULT_LIMIT': None}
URL not formatted, defaulting to "Stage_database"
Connecting to database URL localhost
Namespace(all=True, platform=None, user_list=None, email_list=None, date=None, dry_run=False)
INFO:root:About to delete 2072 analysis results
INFO:root:About to delete entries with keys ['analysis/cleaned_place', 'analysis/cleaned_section', 'analysis/cleaned_stop', 'analysis/cleaned_trip', 'analysis/composite_trip', 'analysis/confirmed_place', 'analysis/confirmed_trip', 'analysis/expected_trip', 'analysis/inferred_labels', 'analysis/inferred_section', 'analysis/inferred_trip', 'analysis/recreated_location', 'analysis/smoothing', 'inference/labels', 'inference/prediction', 'segmentation/raw_place', 'segmentation/raw_section', 'segmentation/raw_stop', 'segmentation/raw_trip']
INFO:root:About t

In [80]:
ts = esta.TimeSeries.get_aggregate_time_series()

pipeline_stats_dfs = [
    ts.get_data_df('stats/pipeline_time', time_query=stats['tq']).assign(branch_name=branch_name)
    for branch_name, stats in branches.items()
]
pipeline_stats_df = pd.concat(pipeline_stats_dfs)

pipeline_stages_df = pipeline_stats_df[
    pipeline_stats_df['name'].str.isupper() & ~pipeline_stats_df['name'].str.contains('/')
]
fig = px.bar(
    pipeline_stages_df,
    y="name",
    x="reading",
    color="branch_name",
    orientation="h",
)
fig.update_layout(
    title=f"Pipeline stage runtimes ({day})",
    barmode='group',
    yaxis=dict(dtick=1),
    legend=dict(
        yanchor="bottom",
        xanchor="right",
    )
)
fig.show()

In [81]:
trip_segmentation_df = pipeline_stats_df[
    pipeline_stats_df['name'].str.contains('SECTION_SEGMENTATION')
]
fig = px.bar(
    trip_segmentation_df,
    y="name",
    x="reading",
    color="branch_name",
    orientation="h",
)
fig.update_layout(
    title=f"Section segmentation runtimes ({day})",
    barmode='group',
    legend=dict(
        yanchor="bottom",
        xanchor="right",
    )
)
fig.show()

In [None]:
db_calls_df = pipeline_stats_df[
    pipeline_stats_df['name'] == 'get_entries_for_timeseries'
]

# Filter rows where ts is in the desired range
filtered_df = db_calls_df[(db_calls_df['ts'] >= 1740517903.8663223)]

# Drop the _id and user_id columns
filtered_df = filtered_df.drop(columns=['_id', 'user_id', 'metadata_write_ts', 'name', 'ts'])
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)
#print(filtered_df['reading'])
marker = "in segment_into_sections"

def extract_from_marker(item):
    # If the item is a list, join it into a single string.
    if isinstance(item, list):
        text = "\n".join(item)
    else:
        text = item
    # Check if the marker exists in the text.
    if marker in text:
        # Return the portion starting from the marker.
        return text[text.find(marker):]
    else:
        # Return None so we can filter it out later.
        return None

# Apply the extraction function to the 'reading' column.
filtered_df['reading'] = filtered_df['reading'].apply(extract_from_marker)

# Drop rows where the marker was not found.
filtered_df = filtered_df[filtered_df['reading'].notnull()]

# Now print the resulting 'reading' column.
print(filtered_df)

# [1740517069.9262366, 1740517141.0189838)
# db_calls_counts_df = db_calls_df['branch_name'].value_counts(sort=False)

# fig = px.bar(
#     db_calls_counts_df,
#     color=db_calls_counts_df.index,
#     orientation="h",
# )
# fig.update_layout(
#     title=f"Calls to _get_entries_for_timeseries during pipeline ({day})",
#     barmode='group',
#     legend=dict(
#         yanchor="bottom",
#         xanchor="right",
#     )
# )
# fig.show()

               ts  \
743  1.740518e+09   
744  1.740518e+09   
745  1.740518e+09   
746  1.740518e+09   
747  1.740518e+09   
748  1.740518e+09   
753  1.740518e+09   
754  1.740518e+09   
755  1.740518e+09   
756  1.740518e+09   
757  1.740518e+09   
758  1.740518e+09   
763  1.740518e+09   
764  1.740518e+09   
765  1.740518e+09   
766  1.740518e+09   
767  1.740518e+09   
768  1.740518e+09   
773  1.740518e+09   
774  1.740518e+09   
775  1.740518e+09   
776  1.740518e+09   
777  1.740518e+09   
778  1.740518e+09   
783  1.740518e+09   
784  1.740518e+09   
785  1.740518e+09   
786  1.740518e+09   
787  1.740518e+09   
788  1.740518e+09   
793  1.740518e+09   
794  1.740518e+09   
795  1.740518e+09   
796  1.740518e+09   
797  1.740518e+09   
798  1.740518e+09   
803  1.740518e+09   
804  1.740518e+09   
805  1.740518e+09   
806  1.740518e+09   
807  1.740518e+09   
808  1.740518e+09   
813  1.740518e+09   
814  1.740518e+09   
815  1.740518e+09   
816  1.740518e+09   
817  1.740518

In [83]:
trip_segmentation_db_calls_dfs = []

branches_trip_segmentation_tqs = []
for branch_name in branches.keys():
    trip_segmentation = pipeline_stages_df[
        (pipeline_stages_df['name'] == 'SECTION_SEGMENTATION') &
        (pipeline_stages_df['branch_name'] == branch_name)
    ].iloc[0]
    trip_segmentation_db_calls = db_calls_df[
        (db_calls_df['ts'] < trip_segmentation['ts'])
        & (db_calls_df['ts'] > trip_segmentation['ts'] - trip_segmentation['reading'])
    ]
    trip_segmentation_db_calls_dfs.append(trip_segmentation_db_calls)

trip_segmentation_db_calls_counts_df = pd.concat(trip_segmentation_db_calls_dfs)['branch_name'].value_counts(sort=False)
fig = px.bar(
    trip_segmentation_db_calls_counts_df,
    color=trip_segmentation_db_calls_counts_df.index,
    orientation="h",
)
fig.update_layout(
    title=f"Calls to _get_entries_for_timeseries during section segmentation ({day})",
    barmode='group',
    legend=dict(
        yanchor="bottom",
        xanchor="right",
    )
)
fig.show()