In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, date, timedelta
from dateutil.relativedelta import relativedelta, SU

from sklearn.preprocessing import StandardScaler
from functools import reduce
from pandas.api.types import is_numeric_dtype
from pandas.api.types import is_datetime64_any_dtype as is_datetime
print("hello")

hello


In [2]:
working_directory = "C:/Users/wachic/OneDrive - Milwaukee School of Engineering/Desktop/Undergrad Research/"

# Naming convention
# MMSD_sewerflow_all_dailyavg_df
# 1.  [MMSD, USGS] Where the source is
# 2.  [sewerflow, precip, streamflow] What the data measures
# 3.  [all, dry, wet] what season it includes
# 4-n What ever operation has been done to the data
# n+1 [df, periods, csv] data type

USGS_stream_flow_all_df = pd.read_csv(working_directory + "USGS 04087030 Streamflow Cleaned.csv")
MMSD_sewerflow_all_df = pd.read_csv(working_directory + "MMSD Sewer Flow Cleaned.csv")
MMSD_flow_and_precip_all_df = pd.read_csv(working_directory + "MMSD Flow and Precipitation Cleaned.csv")
MMSD_precip_all_df = pd.read_csv(working_directory + "MMSD Precipitation Raw Data Cleaned.csv")

df_list = [USGS_stream_flow_all_df,
           MMSD_sewerflow_all_df,
           MMSD_flow_and_precip_all_df,
           MMSD_precip_all_df]

In [3]:
for df in df_list:
    df['Date Time'] = df['Date Time'].apply(pd.to_datetime)

# Removing Diurnal Variation

In [4]:
def moving_avg(df, length=24):
    """ 
    Finds moving average for past 24 hour
    """
    columns_to_edit = df.drop(columns='Date Time').columns
    out_df = df.copy()
    for col in columns_to_edit:
        out_df[col] = df[col].rolling(window=length).mean()
    return out_df

In [5]:
def standardize(df1, df2):
    """
    Standardize 2 df with same columns
    Combines vertically, standardize, then separate and return
    Returns the two seprate std df and 2 df with mean and with std.dev in each column
    """
    scaler = StandardScaler()
    df = pd.concat([df1, df2], ignore_index=True)
    df = df.reset_index(drop=True)
    
    df_to_standardize = df.drop(columns=['Date Time'], inplace=False)
    standardized_values = scaler.fit_transform(df_to_standardize)

    # Create standardized DataFrame
    df = pd.concat([df['Date Time'], pd.DataFrame(standardized_values, columns=df_to_standardize.columns)], axis=1)

    # Compute standard deviation of each column
    mean_df = pd.DataFrame(df_to_standardize.mean(), columns=['Mean']).T
    std_dev_df = pd.DataFrame(df_to_standardize.std(), columns=['Standard Deviation']).T

    df.columns = df.columns
    df1_out = df.iloc[:len(df1)].reset_index(drop=True)
    df2_out = df.iloc[len(df1):].reset_index(drop=True)
    
    return df1_out, df2_out, std_dev_df, mean_df
    
def revert_standardize(df, std_dev_df, mean_df):
    """
    From a standardized df and its means and std.dev, reverts back to normal scale values
    """
    columns = df.drop(columns=['Date Time'], inplace=False).columns
    df_original = df.copy()
    
    # Apply the inverse transformation
    df_original[columns] = (df_original[columns] * std_dev_df.loc['Standard Deviation', columns]) + mean_df.loc['Mean', columns]
    return df_original
    

In [6]:
# Make a total column, used primarily for precipitation
def create_total_col(df):
    """
    Adds on a 'total' column
    Used for precipitaiton df when calculating dry period
    """
    df['total'] = df.loc[:, df.columns[df.columns.get_loc('Date Time') + 1:]].sum(axis=1)
    return df

In [7]:
def find_consecutive_dry_period(df, dry_length = 8, days_after = 7, precip_threshold = 0):  
    """
    Finds dry period from precipitation df. 
    Can select the length of days there must be without rain to be "dry"(dry_length)
    The grace period where data isnt used. Only days after this period is used (days_after)
    and what amount of precipitation is considered a rainfall (precip_threshold)
    """
    zero_ranges = []
    in_zero_sequence = False
    start_date = None
    
    for index, row in df.iterrows():
        if row['total'] <= precip_threshold:  
            if not in_zero_sequence:
                # Start of a new zero sequence
                in_zero_sequence = True
                start_date = row['Date Time']
        else:  # Found a non-zero value
            if in_zero_sequence:
                # End of the zero sequence
                end_date = row['Date Time'] - timedelta(days=1) 
                date_diff = (end_date - start_date).days
                if date_diff >= dry_length:  
                    zero_ranges.append((start_date + timedelta(days=days_after), end_date))
                in_zero_sequence = False
                start_date = None
    
    # Handle case where the last sequence of zeros extends to the last row
    if in_zero_sequence:
        end_date = row['Date Time'] 
        date_diff = (end_date - start_date).days
        if date_diff >= dry_length:
            zero_ranges.append((start_date + timedelta(days=days_after), end_date))
    return zero_ranges

In [8]:
def select_dry_period(df, dry_period):
    """
    Selects rows that are within the given dry_period
    """
    mask = reduce(lambda x, y: x | y, [(df['Date Time'].between(start, end)) for start, end in dry_period])
    
    filtered_df = df[mask].reset_index(drop=True)
    return filtered_df

In [9]:
def avg_days_of_week(df):
    """
    Calculates the average of each hour and day of week, and returns values. [Sun 0:00AM to Sat 23:00PM]
    Subtracts min() from each column, so min is 0
    """
    # Group by weekday (Sunday=0, Saturday=6) and hour
    df['weekday'] = df['Date Time'].dt.weekday  # Monday=0, Sunday=6
    df['hour'] = df['Date Time'].dt.hour
    
    # Adjust weekday order to start from Sunday (moving Sunday=6 to 0)
    df['weekday'] = (df['weekday'] + 1) % 7  # Convert Monday=0 → Sunday=0
    
    hourly_avg = df.groupby(['weekday', 'hour']).mean()
    df = df.drop(columns=['weekday', 'hour'], errors='ignore')
    hourly_avg = hourly_avg.drop(columns=['weekday', 'hour'], errors='ignore')
    hourly_avg.columns = df.columns
    
    # # Sort the values correctly from Sunday 00:00 to Saturday 23:00
    hourly_avg = hourly_avg.sort_values(by=['weekday', 'hour'])

    # just a refernce datetime from Sun 0:00 to Sat 23:00
    date_range = pd.date_range(start="2025-02-02 00:00", end="2025-02-08 23:00", freq="h")
    
    hourly_avg['Date Time'] = date_range
    
    num_cols = [col for col in hourly_avg.columns if col != 'Date Time']
    hourly_avg[num_cols] = hourly_avg[num_cols] - hourly_avg[num_cols].min()
    
    return hourly_avg[['Date Time'] + [col for col in hourly_avg.columns if col != 'Date Time']]

In [10]:
def subtract_diurnal(orig_df, correction_df): 
    """
    For each datapoint in orig_df, it subtracts the corresponding value from correction_df that has the same hour and day of week from.     
    """
    columns_to_rename = correction_df.drop(columns='Date Time').columns
    correction_df.columns = [f'{col} new' if col in columns_to_rename else col for col in correction_df.columns]
    
    orig_df['weekday'] = orig_df['Date Time'].dt.weekday
    correction_df['weekday'] = correction_df['Date Time'].dt.weekday
    
    orig_df['hour'] = orig_df['Date Time'].dt.hour
    correction_df['hour'] = correction_df['Date Time'].dt.hour
    
    correction_df.set_index(['weekday', 'hour'], inplace=True)

    # Set index for main DataFrame to match correction DataFrame
    orig_df.set_index(['weekday', 'hour'], inplace=True)
    
    # Reindex correction DataFrame to match the index of main DataFrame
    correction_df = correction_df.reindex(orig_df.index)
    
    # Concatenate both DataFrames to align corrections with the main DataFrame
    removed_df = pd.concat([orig_df, correction_df], axis=1)
    for col in orig_df.drop(columns='Date Time').columns:      
        removed_df[col] = removed_df[col] - removed_df[f'{col} new']
    
    removed_df.reset_index(drop=True, inplace=True)
    difference_df = removed_df.drop(columns=orig_df.columns)
    removed_df = removed_df[orig_df.columns]
    return removed_df.iloc[:, [i for i in range(removed_df.shape[1]) if i != 1]]

In [11]:
def repeat_df(time_series_df, df, num):
    """
    repeats the daily avg df so it's easier to compare when aligned with other df
    """
    extended_df = pd.concat([df] * num, ignore_index=True)
    extended_df['Date Time'] = time_series_df['Date Time']
    return extended_df

# Applying Lyne-Hollick Filter

In [12]:
def insert_mirrored_rows(df, num_rows=30):
    """
    Insert chronologically mirrored data point at head and tail of df, so Lyne-Hollick doesnt lose affect at ends of data
    """
    mirrored_rows_head = df.iloc[:num_rows].copy()
    mirrored_rows_head = mirrored_rows_head.iloc[::-1].reset_index(drop=True)

    mirrored_rows_tail = df.iloc[-num_rows:].copy()
    mirrored_rows_tail = mirrored_rows_tail.iloc[::-1].reset_index(drop=True)

    # Concatenate the mirrored rows at the bottom of the original DataFrame
    df_extended = pd.concat([mirrored_rows_head, df, mirrored_rows_tail], ignore_index=True)
    return df_extended

In [23]:
def lyne_hollick(df, passes=1, alpha=0.925):
    """
    Applies Lyne-Hollick filter recursively
    This method takes a super long time to compile, so if anyone has any idea on improvement, please let me know
    """
    if passes < 1:
        return df

    out_df = df.copy()

    if passes % 2 == 0:
        out_df = out_df.iloc[::-1].reset_index(drop=True)

    # Handle MultiIndex by flattening if necessary
    if isinstance(out_df.columns, pd.MultiIndex):
        out_df.columns = [' '.join(col).strip() if isinstance(col, tuple) else col for col in out_df.columns]
    print("pass")
    # Select columns excluding 'Date Time'
    columns_to_rename = [col for col in out_df.columns if col != 'Date Time']
    # This portion takes REALLY long. If anyone has insight on improving it, I would love to discuss
    for col in columns_to_rename: 
        out_df[f'{col} new'] = out_df[col] 
        for i in range(1, len(out_df)):
            out_df.at[i, f'{col} new'] = out_df.at[i, col] - np.maximum(
                alpha*(out_df.at[i-1,col] - out_df.at[i-1,f'{col} new']) + ((1+alpha)/2)*(out_df.at[i,col] - out_df.at[i-1,col]),
                0)
            if pd.isna(out_df.at[i, f'{col} new']):
                out_df.at[i, f'{col} new'] = out_df.at[i, col]
            elif out_df.at[i, f'{col} new'] <= 0: #need to make value dynamic
                out_df.at[i, f'{col} new'] = out_df.at[i-1, f'{col} new']
    print(passes)
    new_columns = [f'{col} new' for col in columns_to_rename]
    out_df = out_df[['Date Time'] + new_columns]

    # Match original column names for compatibility
    out_df.columns = df[['Date Time'] + columns_to_rename].columns

    if passes % 2 == 0:
        out_df = out_df.iloc[::-1].reset_index(drop=True)

    if passes > 1:
        return lyne_hollick(out_df, passes - 1, alpha)
    else:
        return out_df.iloc[30:-30]


In [24]:
# actually using the methods
USGS_stream_flow_all_df
MMSD_sewerflow_all_df
MMSD_flow_and_precip_all_df
MMSD_precip_all_df

# Create total precipitation level column to find dry season
MMSD_precip_all_df = create_total_col(MMSD_precip_all_df)

# Find dry period from precipitation data
MMSD_precip_dry_periods = find_consecutive_dry_period(MMSD_precip_all_df, 10, 9, 0.0)

# Extract dry season data from sewer flow to get dry season diurnal data
MMSD_sewerflow_dry_df = select_dry_period(MMSD_sewerflow_all_df, MMSD_precip_dry_periods)

# Get avg from [Sunday 0:00 to Saturday 23:00] of the dry season sewer flow
MMSD_sewerflow_dry_dailyavg_df = avg_days_of_week(MMSD_sewerflow_dry_df)

# Subtract the average from the full period sewer flow
MMSD_sewerflow_all_removed_df = subtract_diurnal(MMSD_sewerflow_all_df, MMSD_sewerflow_dry_dailyavg_df)

# Apply 7 passes of Lyne-Hollick
MMSD_sewerflow_all_removed_7pass_df = lyne_hollick(insert_mirrored_rows(MMSD_sewerflow_all_removed_df), 7, 0.925)


0.32
1
7
1
6
1
5
1
4
1
3
1
2
1
1


In [25]:
MMSD_sewerflow_all_removed_7pass_csv = MMSD_sewerflow_all_removed_7pass_df.to_csv('MMSD_sewerflow_all_removed_7pass_df.csv', index = False) 

In [None]:
# to do
# rewrite code for removing diurnal variation -> use FFT
# have to use processed precip data instead of day cumulative -> find hourly cumulation instead of daily

# for removing diurnal variation
# during dry season when theres no rain, is there no base water flow? -> assume diurnal is 0 at min
# lyne hollick
# maybe need another function since precipitation lingers quite longer -> apply other filter before lh
# the shape of orig function is very different