In [None]:
%env DB_HOST=mongodb://localhost/openpath_stage
import emission.core.get_database as edb
import emission.storage.timeseries.aggregate_timeseries as esta
import emission.storage.timeseries.builtin_timeseries as estb
import emission.core.get_database as gdb

In [None]:
pipeline_docs_cursor = gdb.get_timeseries_db().find({
    "metadata.key": "stats/pipeline_time",
})

# Display a sample of the documents
import pprint
pipeline_docs = list(pipeline_docs_cursor)
if pipeline_docs:
    single_doc = pipeline_docs[0]
    print("Single Document:")
    pprint.pprint(single_doc)
else:
    print("No documents found for 'stats/pipeline_time'.")

# Fetch multiple documents
pipeline_docs_sample = pipeline_docs[:5]  # Get first 5 documents
print("\nMultiple Documents:")
for doc in pipeline_docs_sample:
    pprint.pprint(doc)


In [None]:
import pandas as pd
from datetime import datetime, timedelta
import pytz

df = pd.json_normalize(pipeline_docs)
df.describe()


In [None]:
df.info()

In [None]:
name = df['data.name'].unique()
print(name)

# YES I KNOW WE CAN USE `estt.TimeQuery`

In [None]:
import pandas as pd

# Step 1: Filter for rows where data.name is "USERCACHE"
usercache_df = df[df['data.name'] == "USERCACHE"]

# Step 2: Convert metadata.write_ts to datetime
usercache_df['datetime'] = pd.to_datetime(usercache_df['metadata.write_ts'], unit='s')

# Step 3: Define the start date for filtering
start_date = pd.Timestamp('2024-11-08')  # Adjust as needed

# Step 4: Filter for rows since the start date
usercache_df = usercache_df[usercache_df['datetime'] >= start_date]

# Step 5: Group by hour and count executions
hourly_execution_counts = usercache_df.groupby(usercache_df['datetime'].dt.floor('H')).size()

# Step 6: Output the results
if hourly_execution_counts.empty:
    print("No executions of 'USERCACHE' since November 8.")
else:
    print("Hourly execution counts since November 8:")
    print(hourly_execution_counts)


In [None]:
import os


# Step 0: Define the list of 'data.name' entries to exclude
# These are the 'Parent' functions
exclude_data_names = [
    'TRIP_SEGMENTATION/segment_into_trips',
    'TRIP_SEGMENTATION/segment_into_trips_dist/loop'
]

# Step 1: Filter for function-level data only (entries with slashes in 'data.name') and exclude specified names
function_level_df = df[
    df['data.name'].str.contains('/') &
    ~df['data.name'].isin(exclude_data_names)
].copy()

# Step 2: Select the relevant columns
selected_columns = function_level_df[['data.reading', 'data.name']].copy()

# Step 3: Data Cleaning
# Drop rows with missing values in 'data.reading' or 'data.name'
selected_columns.dropna(subset=['data.reading', 'data.name'], inplace=True)

# Ensure 'data.reading' is numeric
selected_columns = selected_columns[pd.to_numeric(selected_columns['data.reading'], errors='coerce').notnull()]


# Step 5: Aggregate 'data.reading' by 'data.name'

# Aggregation Using Sum
aggregated_sum = selected_columns.groupby('data.name', as_index=False)['data.reading'].sum()
aggregated_sum.rename(columns={'data.reading': 'total_reading'}, inplace=True)

# Aggregation Using Mean
aggregated_mean = selected_columns.groupby('data.name', as_index=False)['data.reading'].mean()
aggregated_mean.rename(columns={'data.reading': 'average_reading'}, inplace=True)

# Step 6: Determine the 80th percentile threshold based on aggregated values

# For Sum Aggregation
threshold_sum = aggregated_sum['total_reading'].quantile(0.80)

# For Mean Aggregation
threshold_mean = aggregated_mean['average_reading'].quantile(0.80)

# For Total Aggregation
threshold_total = selected_columns['data.reading'].quantile(0.80)

# Step 7: Split the DataFrame into top 20% and bottom 80% based on aggregated values

# Using Sum Aggregation
top20_sum = aggregated_sum[aggregated_sum['total_reading'] >= threshold_sum].sort_values(by='total_reading', ascending=False)
bottom80_sum = aggregated_sum[aggregated_sum['total_reading'] < threshold_sum].sort_values(by='total_reading', ascending=False)
top20_total = selected_columns[selected_columns['data.reading'] >= threshold_total].sort_values(by='data.reading', ascending=False)
bottom80_total = selected_columns[selected_columns['data.reading'] < threshold_total].sort_values(by='data.reading', ascending=False)

# Using Mean Aggregation
top20_mean = aggregated_mean[aggregated_mean['average_reading'] >= threshold_mean].sort_values(by='average_reading', ascending=False)
bottom80_mean = aggregated_mean[aggregated_mean['average_reading'] < threshold_mean].sort_values(by='average_reading', ascending=False)

# Step 8: Define the base directory and file paths
base_dir = os.getcwd()  # Current working directory

# Paths for Sum Aggregation
aggregated_sum_path = os.path.join(base_dir, 'aggregated_sum_function_level.csv')
top20_sum_path = os.path.join(base_dir, 'top20_function_level_sum_sorted.csv')
bottom80_sum_path = os.path.join(base_dir, 'bottom80_function_level_sum_sorted.csv')
top20_total_path = os.path.join(base_dir, 'top20_function_level_sum_sorted.csv')
bottom80_total_path = os.path.join(base_dir, 'bottm80_function_level_sum_sorted.csv')

# Paths for Mean Aggregation
aggregated_mean_path = os.path.join(base_dir, 'aggregated_mean_function_level.csv')
top20_mean_path = os.path.join(base_dir, 'top20_function_level_mean_sorted.csv')
bottom80_mean_path = os.path.join(base_dir, 'bottom80_function_level_mean_sorted.csv')

# Step 9: Save the aggregated and categorized DataFrames to CSV files

# Saving Sum Aggregation
aggregated_sum.to_csv(aggregated_sum_path, index=False)
top20_sum.to_csv(top20_sum_path, index=False)
bottom80_sum.to_csv(bottom80_sum_path, index=False)
top20_total.to_csv(top20_total_path, index=False)
bottom80_total.to_csv(bottom80_total_path, index=False)

print(f"Aggregated Sum Function-Level Data saved to {aggregated_sum_path}")
print(f"Top 20% (Sum) function-level data saved to {top20_sum_path}")
print(f"Bottom 80% (Sum) function-level data saved to {bottom80_sum_path}")
print(f"Top 20%  function-level data saved to {top20_total_path}")
print(f"Bottom 80% function-level data saved to {bottom80_total_path}")

# Saving Mean Aggregation
aggregated_mean.to_csv(aggregated_mean_path, index=False)
top20_mean.to_csv(top20_mean_path, index=False)
bottom80_mean.to_csv(bottom80_mean_path, index=False)

print(f"\nAggregated Mean Function-Level Data saved to {aggregated_mean_path}")
print(f"Top 20% (Mean) function-level data saved to {top20_mean_path}")
print(f"Bottom 80% (Mean) function-level data saved to {bottom80_mean_path}")

# Step 10: Verify the splits
print(f"\nSum Aggregation - Top 20% row count: {len(top20_sum)}")
print(f"Sum Aggregation - Bottom 80% row count: {len(bottom80_sum)}")

print(f"\nMean Aggregation - Top 20% row count: {len(top20_mean)}")
print(f"Mean Aggregation - Bottom 80% row count: {len(bottom80_mean)}")

# Step 11: Inspect some entries
print("\nSample Top 20% Sum Aggregation Entries:")
print(top20_sum.head())

print("\nSample Bottom 80% Sum Aggregation Entries:")
print(bottom80_sum.head())

print("\nSample Top 20% Mean Aggregation Entries:")
print(top20_mean.head())

print("\nSample Bottom 80% Mean Aggregation Entries:")
print(bottom80_mean.head())


In [None]:


def pipeline_time_distribution(combined_df, step_name):
    step_df = combined_df[combined_df['data.name'] == step_name]
    
    if step_df.empty:
        print(f"No data found for step: {step_name}")
        return
    
    plt.figure(figsize=(10,6))
    sns.histplot(step_df['data.reading'], bins=30, kde=True)
    plt.title(f"Distribution of Pipeline Times for {step_name}")
    plt.xlabel("Pipeline Time (seconds)")
    plt.ylabel("Frequency")
    plt.show()

pipeline_time_distribution(df, 'TRIP_SEGMENTATION/segment_into_trips')

In [None]:
def pipeline_time_trends(combined_df, step_name, freq='D'):
    """
    Plots the trend of pipeline times over time for a specific step.
    :param freq: Resampling frequency ('D' for daily, 'W' for weekly, 'M' for monthly)
    """
    step_df = combined_df[combined_df['data.name'] == step_name].copy()
    
    if step_df.empty:
        print(f"No data found for step: {step_name}")
        return
    
    # Convert timestamp to datetime
    step_df['datetime'] = pd.to_datetime(step_df['metadata.write_ts'], unit='s')
    
    # Set datetime as index
    step_df.set_index('datetime', inplace=True)
    
    # Resample and calculate mean pipeline time
    resampled = step_df['data.reading'].resample(freq).mean()
    
    # Plotting
    plt.figure(figsize=(12,6))
    resampled.plot()
    plt.title(f"Trend of Pipeline Times Over Time for {step_name}")
    plt.xlabel("Time")
    plt.ylabel("Average Pipeline Time (seconds)")
    plt.show()

pipeline_time_trends(df, 'TRIP_SEGMENTATION', 'W')

In [None]:
def identify_bottlenecks(combined_df, top_n=5):
    avg_time_df = combined_df.groupby('data.name')['data.reading'].mean().reset_index()
    avg_time_df.rename(columns={'data.reading': 'average_time'}, inplace=True)
    
    bottlenecks = avg_time_df.sort_values(by='average_time', ascending=False).head(top_n)
    
    print(f"\nTop {top_n} Bottleneck Pipeline Steps:")
    print(bottlenecks)
    
    # Optionally, visualize
    plt.figure(figsize=(10,6))
    sns.barplot(x='average_time', y='data.name', data=bottlenecks, palette='viridis')
    plt.title(f"Top {top_n} Bottleneck Pipeline Steps by Average Time")
    plt.xlabel("Average Pipeline Time (seconds)")
    plt.ylabel("Pipeline Step")
    plt.show()

identify_bottlenecks(function_level_df)

In [None]:
def heatmap_pipeline_times(combined_df, step_name):
    step_df = combined_df[combined_df['data.name'] == step_name].copy()
    
    if step_df.empty:
        print(f"No data found for step: {step_name}")
        return
    
    # Convert timestamp to datetime
    step_df['datetime'] = pd.to_datetime(step_df['metadata.write_ts'], unit='s')
    
    # Extract hour and day of week
    step_df['hour'] = step_df['datetime'].dt.hour
    step_df['day_of_week'] = step_df['datetime'].dt.day_name()
    
    # Create pivot table
    pivot = step_df.pivot_table(values='data.reading', index='day_of_week', columns='hour', aggfunc='mean')
    
    # Reorder days of the week
    days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
    pivot = pivot.reindex(days_order)
    
    # Plot heatmap
    plt.figure(figsize=(15,7))
    sns.heatmap(pivot, cmap='YlGnBu', annot=True, fmt=".2f")
    plt.title(f"Heatmap of Average Pipeline Times for {step_name}")
    plt.xlabel("Hour of Day")
    plt.ylabel("Day of Week")
    plt.show()


heatmap_pipeline_times(df, 'TRIP_SEGMENTATION')

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from itertools import combinations

def define_pipeline_executions(df):
    grouped = df.groupby(['user_id', 'metadata.key'])
    pipeline_executions = []
    
    for (user_id, key), group in grouped:
        execution = {
            'user_id': user_id,
            'pipeline_key': key,
            'start_time': group['metadata.write_ts'].min(),
            'end_time': group['metadata.write_ts'].max(),
            'total_time': group['data.reading'].sum(),
            'num_steps': group.shape[0],
            'steps': list(group.sort_values('metadata.write_ts')['data.name']),
            'steps_reading': list(group.sort_values('metadata.write_ts')['data.reading'])
        }
        pipeline_executions.append(execution)
    
    executions_df = pd.DataFrame(pipeline_executions)
    return executions_df


# combined_pipeline_df is  DataFrame with all users' data
executions_df = define_pipeline_executions(df)
print("Pipeline Executions Defined:")
print(executions_df.head())


In [None]:
def execution_time_variability_per_step(df):
    variability_df = df.groupby('data.name')['data.reading'].agg(['mean', 'std', 'var']).reset_index()
    variability_df.rename(columns={'mean': 'average_time_sec', 'std': 'std_dev_sec', 'var': 'variance_sec2'}, inplace=True)
    
    # Sort by standard deviation descending
    variability_df = variability_df.sort_values(by='std_dev_sec', ascending=False)
    
    print("\nExecution Time Variability per Pipeline Step:")
    print(variability_df)
    
    # Visualization: Box Plots to visualize variability
    plt.figure(figsize=(14,10))
    sns.boxplot(x='data.reading', y='data.name', data=df, palette='coolwarm')
    plt.title("Execution Time Variability per Pipeline Step")
    plt.xlabel("Execution Time (seconds)")
    plt.ylabel("Pipeline Step")
    plt.tight_layout()
    plt.show()
    
    # Save to CSV
    variability_df.to_csv('execution_time_variability_per_step.csv', index=False)
    print("Saved execution time variability to 'execution_time_variability_per_step.csv'")


execution_time_variability_per_step(function_level_df)