<a href="https://colab.research.google.com/github/Bhekmuzi/water-usage-norm/blob/main/src/data_retrieval.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# !pip freeze > requirements.txt

In [2]:
# !pip install influxdb

In [3]:
# !pip install pandas pymongo

In [4]:
import numpy as np
from sklearn.cluster import KMeans
# pip install influxdb
# !python --version
import numpy as np
import pandas as pd
import datetime
import time
from influxdb import InfluxDBClient
from pymongo import MongoClient
import matplotlib.pyplot as plt
from google.colab import files
from sklearn.cluster import KMeans, DBSCAN
from sklearn.preprocessing import StandardScaler
from datetime import datetime, timedelta
# from scipy.stats import entropy

# Provide the IP address, username, password, database name, RFC3339 standard time format, and create a connection client for the 'db0' database
client = InfluxDBClient('59.120.114.133', 8086, 'telegraf', 'telegraf', 'db0', 'rfc3339', timeout=10)

In [5]:
# Function to calculate start and end times for the previous day
def calculate_previous_day_times():
    current_datetime = datetime.now()
    start_of_previous_day = current_datetime - timedelta(days=1)
    start_of_previous_day = start_of_previous_day.replace(hour=0, minute=0, second=0, microsecond=0)
    end_of_previous_day = current_datetime.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(microseconds=1)

    return start_of_previous_day, end_of_previous_day

In [6]:
# InfluxDB query
start_time, end_time = calculate_previous_day_times()

sql_string = f'SELECT DISTINCT("value") AS value FROM mbMQTT6 WHERE "topic" = \'mbMQTT2/home2127/C2BDF8/TH20\' AND time >= \'{start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}\' AND time <= \'{end_time.strftime("%Y-%m-%dT%H:%M:%SZ")}\' GROUP BY time(10s) FILL(previous) ORDER BY time ASC TZ(\'Asia/Taipei\');'

result = client.query(sql_string) #

In [7]:

homes_water_data = {
    'home2127':pd.DataFrame(result['mbMQTT6']),
    'home2128':pd.DataFrame(result['mbMQTT6'])
}

In [8]:
# Function to fill missing values for a given home
def fill_missing_values(home_data):
    home_data['time'] = pd.to_datetime(home_data['time'])
    home_data.set_index('time', inplace=True)
    expected_time_intervals = pd.date_range(start=home_data.index.min(), end=home_data.index.max(), freq='10S')
    home_data = home_data.reindex(expected_time_intervals)
    home_data['value'] = home_data['value'].fillna(method='pad')
    home_data.reset_index(inplace=True)
    return home_data

In [9]:
def process_home_data(home_water_df):
    # Step 1: Fill missing values
    filled_home_data = fill_missing_values(home_water_df)

    # Step 2: Convert 'value' column to numeric and perform division and multiplication
    filled_home_data['value'] = pd.to_numeric(filled_home_data['value'], errors='coerce')
    filled_home_data['value'] = filled_home_data['value'] / 100000 * 1000

    # Step 3: Calculate the difference between 'value' column
    filled_home_data['volume'] = filled_home_data['value'].diff()

    # Step 4: Replace NaN with 0 in the 'volume' column
    filled_home_data['volume'] = filled_home_data['volume'].fillna(0)

    # # Step 5: Filter values less than 0.2 in the 'volume' column
    # filtered_home_data = filled_home_data[filled_home_data['volume'] < 0.2]
    # Step 5: Filter values less than 0.2 in the 'volume' column and set them to 0
    filled_home_data['volume'] = filled_home_data['volume'].apply(lambda x: 0 if x < 0.2 else x)

    # Step 6: Rename 'index' column to 'time'
    filled_home_data.reset_index(drop=True, inplace=True)
    filled_home_data.rename(columns={'index': 'time'}, inplace=True)

    # return filtered_home_data
    return filled_home_data



In [10]:
# Example usage for each home
homes_data_processed = {}

for home_name, home_water_df in homes_water_data.items():
    processed_data = process_home_data(home_water_df)
    homes_data_processed[home_name] = processed_data
    # print(f"\nProcessed Data for {home_name}:\n{processed_data}")


In [11]:

import pandas as pd
import matplotlib.pyplot as plt
from datetime import timedelta
import numpy as np
from scipy.stats import entropy

def process_events(df):
    start_time = None
    end_time = None
    consecutive_zeros = 0
    total_time = timedelta()
    total_vol = 0
    num_records = 0

     # Create a new DataFrame to store event information
    event_df = pd.DataFrame(columns=['Start Time', 'End Time'])

    for index, row in df.iterrows():
        time = row['time']
        vol = max(0, float(row['volume']))

        if vol != 0:
            if start_time is None:
                start_time = time - timedelta(seconds=60)
            consecutive_zeros = 0
        else:
            consecutive_zeros += 1
            if start_time is not None and consecutive_zeros == 6:
                end_time = time
                # print(f"Event start_time: {start_time}, end_time: {end_time}")

                # Filter volume values between start_time and end_time
                event_data = df[(df['time'] >= start_time) & (df['time'] <= end_time)]

                # Calculate total time, total volume, and coefficient of variation
                time1 = start_time + timedelta(seconds=60)
                time2 = end_time - timedelta(seconds=60)
                event_data1 = df[(df['time'] >= time1) & (df['time'] <= time2)]

                event_df = pd.concat([event_df, pd.DataFrame({
                    'Start Time': [time1],
                    'End Time': [time2],
                })], ignore_index=True)

                start_time = None  # Reset start_time for the next event

    if event_df.empty:
        print("No events detected.")
        return pd.DataFrame()  # Return an empty DataFrame if no events are detected

    return event_df

In [12]:
def process_and_mark_usage(home_df):
    # Process events
    event_df = process_events(home_df)

    # Calculate the duration of each event
    event_df['Duration'] = (event_df['End Time'] - event_df['Start Time']).dt.total_seconds()

    # Filter out events with zero duration
    event_df = event_df[event_df['Duration'] != 0]

    # Filter events with duration less than 10 seconds
    event_atleast_10_df = event_df[event_df['Duration'] >= 10]

    # Initialize 'usage' column in home_df
    home_df['usage'] = 0

    # Iterate through rows in event_atleast_10_df and mark corresponding rows in home_df as 1
    for index, row in event_atleast_10_df.iterrows():
        mask = (home_df['time'] >= row['Start Time']) & (home_df['time'] <= row['End Time'])
        home_df.loc[mask, 'usage'] = 1

    return home_df


In [13]:
import pandas as pd
from pymongo import MongoClient

def process_resample_insert(home_df, collection):
    # Step 1: Process and mark usage
    processed_home_df = process_and_mark_usage(home_df)
    processed_home_df['time'] = pd.to_datetime(processed_home_df['time'])

    # Step 2: Resample to 15-minute intervals and take max values within each interval
    resampled_df = processed_home_df.set_index('time').resample('15T').max().reset_index()

    # Step 3: Convert to dictionary and insert into MongoDB collection
    data_dict = resampled_df.to_dict(orient='records')
    collection.insert_many(data_dict)

    return resampled_df



In [14]:
# Replace 'your_database' and 'your_collection' with your actual database and collection names
client = MongoClient('mongodb://34.81.144.96:27017/')
db = client['Taipower']
collection = db['SmartWaterMeterActiveHistory']

In [15]:
resampled_home_data = {}

for home_name, filtered_renamed_home_data in homes_data_processed.items():
    resampled_data = process_resample_insert(filtered_renamed_home_data, collection)
    resampled_home_data[home_name] = resampled_data
    print(f"\nResampled Data for {home_name}:\n{resampled_data}")


ServerSelectionTimeoutError: ignored

In [None]:

# Repeat for additional home datasets as needed
processed_home_df = {}
resampled_df = {}
data_dict = {}
for home_name, filtered_renamed_home_data in homes_data_processed.items():
  # Example usage for multiple home datasets
  # home1_df = pd.read_csv("path/to/home1_data.csv")
  processed_home_df[home_name] = process_and_mark_usage(filtered_renamed_home_data)

  processed_home_df[home_name]['time'] = pd.to_datetime(processed_home_df[home_name]['time'])  # Convert 'time' to datetime if not already

  # Resample to 15-minute intervals and take the max values within each interval
  resampled_df[home_name] = processed_home_df[home_name].set_index('time').resample('15T').max()

  # Reset the index to make 'time' a regular column again
  resampled_df[home_name] = resampled_df[home_name].reset_index()
  data_dict[home_name] = resampled_df[home_name].to_dict(orient='records')

  # # home2_df = pd.read_csv("path/to/home2_data.csv")
  # processed_home2_df = process_and_mark_usage(home2_df)
  print(f"\n{data_dict}")




In [None]:
collection.insert_many(data_dict)

In [None]:
print(type(data_dict))

In [None]:
# Assuming 'data_dict' is your list of dictionaries
result = collection.insert_many(data_dict)

# Print the IDs of the inserted documents
print("Inserted document IDs:", result.inserted_ids)

In [None]:
data_dict

In [None]:
# Assuming homes_data is a dictionary where each value is a DataFrame for a specific home
for home_name, filter_renamed_home_data in filtered_renamed_homes_data.items():
    # Assuming event_atleast_20_df is also a dictionary with DataFrames for each home
    if home_name in event_atleast_20_df:
        # Iterate through rows in event_atleast_20_df and mark corresponding rows in df for the current home as 1
        for index, row in event_atleast_20_df[home_name].iterrows():
            mask = (filter_renamed_home_data['time'] >= row['Start Time']) & (filter_renamed_home_data['time'] <= row['End Time'])
            filter_renamed_home_data.loc[mask, 'usage'] = 1

# # Print or process the updated DataFrames (replace with your specific requirements)
# for home_name, filter_renamed_home_data in filter_renamed_home_data.items():
#     print(f"\nProcessed Data for {home_name} with Usage:\n{filter_renamed_home_data}")


In [None]:
# prompt: process events for all home data

event_homes_data = {}
for home_name, filtered_home_data in filtered_renamed_homes_data.items():
    # Process events for each home
    event_homes_data[home_name] = process_events(filtered_home_data)

for home_name, event_home_data in event_homes_data.items():
    print(f"\nEvent Data for {home_name}:\n{event_home_data}")

In [None]:
# prompt: calculate the duration of each event
event_atleast_20_df = {}
for home_name, event_home_data in event_homes_data.items():
    # Calculate the duration of each event
    event_home_data['Duration'] = (event_home_data['End Time'] - event_home_data['Start Time']).dt.total_seconds() + 10
    event_home_data = event_home_data[event_home_data['Duration'] != 10]
    event_home_data['usage'] = 0


    # Filter events with duration less than 20 seconds
    event_atleast_20_df[home_name] = event_home_data[event_home_data['Duration'] >= 20]

     # Reset index to clean up DataFrame
    event_home_data.reset_index(drop=True, inplace=True)

    if home_name in event_atleast_20_df:
        # Iterate through rows in event_atleast_20_df and mark corresponding rows in df for the current home as 1
        for index, row in event_atleast_20_df[home_name].iterrows():
            mask = (filter_renamed_home_data['time'] >= row['Start Time']) & (filter_renamed_home_data['time'] <= row['End Time'])
            filter_renamed_home_data.loc[mask, 'usage'] = 1

    print(f"\n{filter_renamed_home_data}")


In [None]:
processed_home_df

In [None]:
event_df = process_events(merged_df)

# Calculate the duration of each event
event_df['Duration'] = (event_df['End Time'] - event_df['Start Time']).dt.total_seconds()

event_df = event_df[event_df['Duration'] != 0]

# Filter events with duration less than 20 seconds
event_atleast_20_df = event_df[event_df['Duration'] >= 10]

# event_df = process_events(merged_df)
merged_df['usage'] = 0

# Iterate through rows in df1 and mark corresponding rows in df as 1
for index, row in event_atleast_20_df.iterrows():
    mask = (merged_df['time'] >= row['Start Time']) & (merged_df['time'] <= row['End Time'])
    merged_df.loc[mask, 'usage'] = 1

vol_df = merged_df[['time', 'volume_liters']].copy()
usage_df = merged_df[['time', 'usage']].copy()

usage_df.set_index('time', inplace=True)# Set 'time' as the index
vol_df.set_index('time', inplace=True)# Set 'time' as the index