In [1]:
import numpy as np
import pandas as pd
from typing import Optional
from datetime import datetime, timezone, timedelta
import sys
import argparse
import os
import warnings
from typing import Optional, Literal, NewType
import json

import logging
from database_generator.logging_configuration import setup_logging_for_this_script
setup_logging_for_this_script()
# Get the logger for this module
logger = logging.getLogger(__name__)

from database_generator.helpers import (
    get_config_path,
    load_and_process_params,
    append_and_concatenate_dataframes
)

from database_generator.get_data import (
    generate_stable_toy_data,
    introduce_exponential_anomalies,
    simulate_broken_sensor,
)

from database_generator.evaluate import (
    overlaid_plots_with_plotly,
)

from database_generator.evaluate import (
    overlaid_plots_with_plotly,
)

from database_generator.db_operations import(
    create_sql_alchemy_engine,
    get_last_timestamp,
    query_data_by_datetime,
    store_pandas_dataframe_into_postegre,
)

# Accessing and reading the config file

In [2]:
# get the path to the .json file from the environment

path_for_the_json_file = get_config_path()
path_for_the_json_file


'/home/aldo/Repositories/general_projects/database_generator/config_file.json'

In [3]:
config_dict = load_and_process_params(path_for_the_json_file)

seed_for_the_stable_dataset = config_dict['seed_for_the_stable_dataset']


[database_generator.helpers] 2024-10-05 16:01:44 - INFO: number_of_rows_for_stable_toy_data = 10000
[database_generator.helpers] 2024-10-05 16:01:44 - INFO: seed_for_the_stable_dataset = 300
[database_generator.helpers] 2024-10-05 16:01:44 - INFO: start_date_for_the_toy_dataset = 2024-09-08 10:00:00+00:00
[database_generator.helpers] 2024-10-05 16:01:44 - INFO: variable_of_interest = pH
[database_generator.helpers] 2024-10-05 16:01:44 - INFO: path_to_save_the_outcomes = /home/aldo/Repositories/general_projects/database_generator/experiments
[database_generator.helpers] 2024-10-05 16:01:44 - INFO: table_name_to_be_created_on_postgresql = raw_data_version_1


# Create the stable data

In [4]:
# Example usage

main_datetime_in_utc = pd.Timestamp.now(tz='UTC')
start_datetime_in_utc = main_datetime_in_utc - timedelta(hours=24)

In [5]:
# list of dataframes
dfs_list = list()

In [6]:
df_stable = generate_stable_toy_data(start_datetime=start_datetime_in_utc,
                                     end_datetime=main_datetime_in_utc,
                                     seed_for_random=seed_for_the_stable_dataset)


[database_generator.helpers] 2024-10-05 16:02:01 - INFO: Datetime '2024-10-04 23:01:52.016261+00:00' is already in the correct format.
[database_generator.helpers] 2024-10-05 16:02:01 - INFO: Datetime '2024-10-05 23:01:52.016261+00:00' is already in the correct format.


In [24]:
start_time = pd.Timestamp(year=2024,
                          month=10,
                          day=5,
                          hour=16,
                          minute=0,
                          second=0,
                          microsecond=0,
                          tz='utc')
end_time = start_time + timedelta(minutes=1)
frequency = '120s'
random_state = 42
# When
df = generate_stable_toy_data(
    start_datetime=start_time,
    end_datetime=end_time,
    frequency=frequency,
    seed_for_random=random_state,
)

TypeError: generate_stable_toy_data() missing 1 required positional argument: 'end_datetime'

In [23]:
df

Unnamed: 0_level_0,Temperature_C,Pressure_MPa,Vibration_mm_s,Flow_Rate_l_min,Humidity_%
Timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2024-10-05 16:00:00+00:00,75.496714,2.98812,3.086912,303.357246,38.829233


In [13]:
df_stable.size

14405

In [14]:
df_stable.shape

(2881, 5)

In [15]:
df_stable.shape[0]

2881

In [10]:
len(df_stable)

2881

In [7]:
# adding df:
dfs_list.append(df_stable)

### visualize the generated data

In [8]:
fig_stable = overlaid_plots_with_plotly(df=df_stable,
                           # scatter_variables=['Vibration_mm_s', 'Flow_Rate_l_min'],
                           # variable_of_interest='Temperature_C',
                           save_plot=False)

In [None]:
fig_stable.show()

# Create the two types of anomaly to evalaute it

### Problem 1: Bearing Wear
Description: Over time, the bearings in the pump might wear out, causing an increase in vibration levels.


### Problem 5: Broken Temperature Sensor
Description: The temperature sensor might malfunction or break, leading to inaccurate or stuck readings.

- Stuck Readings: The sensor gets "stuck" at a constant value, providing the same reading for a period of time.

- Sudden Jumps: The sensor might suddenly jump to an unusually high or low value, remaining there for some time.

- Intermittent Spikes: The sensor occasionally produces spikes of incorrect readings, either very high or very low.

- Dropouts: The sensor might stop reporting data altogether, which could be simulated as missing values (NaN).

### Bearing Wear Anomaly

In [10]:
# defining datetime
start_time_anomaly_exponential = main_datetime_in_utc
end_time_anomaly_exponential = main_datetime_in_utc + timedelta(hours=4)

In [None]:
# Introduce bearing wear

df_with_anomaly_exponential = introduce_exponential_anomalies(variable='Vibration_mm_s',
                                                  start_datetime=start_time_anomaly_exponential,
                                                  end_datetime=end_time_anomaly_exponential,
                                                  increase_rate=0.01
                                                  )

dfs_list.append(df_with_anomaly_exponential)

In [12]:
# Plot the data to see the effect of the anomaly
fig_anomaly_exponential = overlaid_plots_with_plotly(df_with_anomaly_exponential,
                                                     # scatter_variables=['Vibration_mm_s'],
                                                     variable_of_interest='Vibration_mm_s',
                                                     save_plot=False)



In [None]:
fig_anomaly_exponential.show()

### Stuck Readings

In [None]:
start_datetime_stuck_sensor = end_time_anomaly_exponential
end_datetime_stuck_sensor = start_datetime_stuck_sensor + timedelta(hours=3)

# Simulate a sensor stuck at a constant value
df_with_sensor_issue_stuck = simulate_broken_sensor(variable='Temperature_C',
                                              start_datetime=start_datetime_stuck_sensor,
                                              end_datetime=end_datetime_stuck_sensor,
                                              mode='stuck'
                                              )

dfs_list.append(df_with_sensor_issue_stuck)

In [None]:
# Plot the data to see the effect of the anomaly
fig_anomaly_stuck = overlaid_plots_with_plotly(df_with_sensor_issue_stuck,
                                               variable_of_interest='Temperature_C',
                                               save_plot=False)
fig_anomaly_stuck.show()

### Sudden Jumps

In [None]:
start_datetime_jump_sensor = end_datetime_stuck_sensor
end_datetime_jump_sensor = start_datetime_jump_sensor + timedelta(hours=3)

# Simulate a sensor stuck at a constant value
df_with_sensor_issue_jump = simulate_broken_sensor(variable='Temperature_C',
                                              start_datetime=start_datetime_jump_sensor,
                                              end_datetime=end_datetime_jump_sensor,
                                              mode='jump'
                                              )

dfs_list.append(df_with_sensor_issue_jump)

In [None]:
# Plot the data to see the effect of the anomaly
fig_anomaly_stuck = overlaid_plots_with_plotly(df_with_sensor_issue_jump,
                                               variable_of_interest='Temperature_C',
                                               save_plot=False)
fig_anomaly_stuck.show()

### Intermittent Spikes

In [None]:
start_datetime_spike_sensor = end_datetime_jump_sensor
end_datetime_spike_sensor = start_datetime_spike_sensor + timedelta(hours=3)

# Simulate a sensor stuck at a constant value
df_with_sensor_issue_spike = simulate_broken_sensor(variable='Temperature_C',
                                              start_datetime=start_datetime_spike_sensor,
                                              end_datetime=end_datetime_spike_sensor,
                                              mode='spike'
                                              )

dfs_list.append(df_with_sensor_issue_spike)

In [None]:
# Plot the data to see the effect of the anomaly
fig_anomaly_stuck = overlaid_plots_with_plotly(df_with_sensor_issue_spike,
                                               variable_of_interest='Temperature_C',
                                               save_plot=False)
fig_anomaly_stuck.show()

In [None]:
start_datetime_dropout_sensor = end_datetime_spike_sensor
end_datetime_dropout_sensor = start_datetime_dropout_sensor + timedelta(hours=3)

# Simulate a sensor stuck at a constant value
df_with_sensor_issue_dropout = simulate_broken_sensor(variable='Temperature_C',
                                              start_datetime=start_datetime_dropout_sensor,
                                              end_datetime=end_datetime_dropout_sensor,
                                              mode='dropout'
                                              )

dfs_list.append(df_with_sensor_issue_dropout)

In [None]:
# Plot the data to see the effect of the anomaly
fig_anomaly_stuck = overlaid_plots_with_plotly(df_with_sensor_issue_dropout,
                                               variable_of_interest='Temperature_C',
                                               save_plot=False)
fig_anomaly_stuck.show()

In [24]:
# Concatenate DataFrames and handle duplicates by taking the mean of values for duplicate indices
combined_df = append_and_concatenate_dataframes(dfs_list, method='first')

In [None]:
# Store pandas df into postgre
store_pandas_dataframe_into_postegre(df=combined_df)

# Service Architecture Overview


## A. Components

- Data Simulation Module
Purpose: Generate synthetic datasets that represent normal operational conditions.
Functions:
generate_stable_toy_data: Generates the baseline dataset.
introduce_exponential_anomalies: Simulates anomalies like bearing wear.
simulate_broken_sensor: Introduces faults such as sensor failures.

- Data Ingestion Service
Purpose: Writes the generated/simulated data into a PostgreSQL database.
Components:
A script or service (e.g., using Python and psycopg2 or SQLAlchemy) to connect to the PostgreSQL database and insert the generated data.

- PostgreSQL Database
Purpose: Stores the simulated data. This data can be queried by other services/modules for anomaly detection and analysis.
Schema Design:
Table Structure: Design tables to hold time-series data with columns like timestamp, vibration_level, pressure, temperature, anomaly_flag, etc.
Indexing: Ensure the timestamp field is indexed for efficient querying.

- Anomaly Detection Module (External Service)
Purpose: Consumes data from the PostgreSQL database, applies anomaly detection algorithms, and flags potential issues.
Data Flow: Queries the database periodically or in real-time and applies models like autoencoders, statistical methods, or machine learning algorithms.

- Monitoring and Logging
Purpose: Monitors the service performance, logs errors, and ensures data integrity.
Components: Tools like Prometheus for monitoring and Grafana for visualization. Logs can be stored locally or in a logging service.

## B. Workflow
Data Generation and Simulation:

The service periodically or on-demand runs the generate_stable_toy_data function to create a stable dataset.
Anomalies are introduced using introduce_exponential_anomalies and simulate_broken_sensor functions.
Data Ingestion:

The simulated data is sent to the Data Ingestion Service, which connects to the PostgreSQL database and inserts the data into the appropriate tables.
Database Storage:

The PostgreSQL database stores the time-series data along with any anomaly flags or metadata that might be useful for downstream analysis.
Anomaly Detection:

The Anomaly Detection Module queries the database, retrieves the data, and applies algorithms to detect anomalies. Detected anomalies are flagged and stored back in the database or sent to an alerting system.
Monitoring:

The entire process is monitored for performance and reliability. Logs are reviewed to ensure data integrity, and alerts are triggered for any unexpected behavior.


## Implementation Steps

 - A. Setting Up the PostgreSQL Database
Create the Database:
Install PostgreSQL and create a new database for the service.
Design the Schema:
Define tables for storing the time-series data, ensuring that they are normalized and indexed appropriately.

- B. Implement the Data Simulation Module
Refactor Existing Functions:

Refactor generate_stable_toy_data, introduce_exponential_anomalies, and simulate_broken_sensor to be callable by the service.
Integrate with the Data Ingestion Service:

Implement a Python script or service that runs these functions and writes the results to the PostgreSQL database.

- C. Implement the Data Ingestion Service
Database Connection:
Use libraries like psycopg2 or SQLAlchemy to connect to the PostgreSQL database.
Data Insertion Logic:
Implement the logic to insert the generated data into the database, ensuring proper handling of timestamps and other relevant metadata.

- D. Anomaly Detection Integration
Develop or Integrate Anomaly Detection Algorithms:

Implement or integrate existing anomaly detection algorithms that will consume the data from the PostgreSQL database.
Store Results:

Store the results of the anomaly detection in the same database or send them to a monitoring/alerting system.

- E. Monitoring and Logging
Set Up Monitoring Tools:

Use Prometheus to monitor service metrics and Grafana to visualize them.
Implement Logging:

Ensure that all critical operations are logged, and set up error-handling mechanisms.

## Tools and Technologies
Python: Core language for scripting, data simulation, and ingestion.
PostgreSQL: Database for storing and querying time-series data.
SQLAlchemy/psycopg2: Libraries for database interaction.
Prometheus/Grafana: Monitoring and visualization.
Docker (Optional): For containerizing the service to ensure consistency across environments.

## Future Considerations
Scalability: Ensure the system can handle increasing volumes of data as the service expands.
Real-Time Processing: Consider integrating real-time data processing pipelines if needed.
Security: Implement proper security measures for database access and data handling.

# Identify the Services to Containerize
Based on this architecture, the following components can be containerized:

- Data Simulation and Ingestion Service
- PostgreSQL Database
- Anomaly Detection Service
- Monitoring and Logging Tools (Prometheus and Grafana)


### Putting It All Together with Docker Compose
Use Docker Compose to orchestrate all the services:

### Suggested SQLAlchemy Methods to Build:

- Setup: Connecting to the Database
- Create a Table for Pump Data
- Insert Data into the Table
- Query Data from the Table
- Update Data in the Table
- Delete Data from the Table
- Use SQLAlchemy ORM to Define Models and Perform CRUD Operations