In [1]:
import numpy as np
import os, sys
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import seaborn as sns
from collections import Counter
from tqdm.auto import tqdm, trange
from sklearn.preprocessing import MinMaxScaler
import re
import concurrent.futures
from sklearn.metrics import silhouette_score
from scipy.stats import pearsonr
import copy
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, DistributedSampler
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torchvision import transforms
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from sdv.metadata import SingleTableMetadata
from sdv.evaluation.single_table import evaluate_quality
from sdv.single_table import CTGANSynthesizer
from sdv.sampling import Condition
from sdv.evaluation.single_table import get_column_plot

import dask.dataframe as dpd
import dask_geopandas as dgpd
from dask.diagnostics import ProgressBar
from dask.distributed import Client

import warnings
warnings.filterwarnings('ignore')

sys.path.append('../')
from utils.logger import Logger

import gc
gc.collect()

np.random.seed(0)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
client = Client(n_workers=100) #192 totally

In [3]:
print(torch.__version__, torch.cuda.is_available())

2.0.1+cu118 True


In [4]:
if torch.cuda.is_available():
    print("CUDA is available. Detected CUDA Devices:")
    for i in range(torch.cuda.device_count()):
        print(f"CUDA Device {i}: {torch.cuda.get_device_name(i)}")
else:
    print("CUDA is not available.")

CUDA is available. Detected CUDA Devices:
CUDA Device 0: NVIDIA RTX A6000


In [5]:
torch.cuda.set_device(0)

In [6]:
coord_gdf = gpd.read_file('../src/coord/coord_gdf.shp')
coord_gdf = coord_gdf.drop(columns=['cell_rmse1', 'cell_r21', 'cell_rmse2', 'cell_r22', 'depth'])
coord_gdf

Unnamed: 0,x,y,ter,HUC12,region,channel,geometry
0,2.933766e+06,1.396557e+07,301.388702,Cypress Creek,0,0,"POLYGON ((2934366.000 13964974.635, 2933003.17..."
1,2.934966e+06,1.396557e+07,301.594696,Cypress Creek,0,0,"POLYGON ((2934366.000 13967369.160, 2934380.33..."
2,2.933766e+06,1.396437e+07,294.629181,Cypress Creek,0,0,"POLYGON ((2934366.000 13964974.635, 2934366.00..."
3,2.934966e+06,1.396437e+07,298.529877,Cypress Creek,0,0,"POLYGON ((2935566.000 13963774.635, 2934366.00..."
4,2.936166e+06,1.396437e+07,294.815002,Cypress Creek,0,0,"POLYGON ((2936766.000 13963774.635, 2935566.00..."
...,...,...,...,...,...,...,...
26296,3.039069e+06,1.385008e+07,54.643570,Whiteoak Bayou-Buffalo Bayou,2,1,"POLYGON ((3039427.707 13849492.726, 3038745.86..."
26297,3.039053e+06,1.385088e+07,59.625050,Addicks Reservoir,3,1,"POLYGON ((3039399.212 13851153.541, 3039405.50..."
26298,3.038396e+06,1.385006e+07,60.055576,Whiteoak Bayou-Buffalo Bayou,2,0,"POLYGON ((3038723.769 13850469.724, 3038724.68..."
26299,3.038392e+06,1.385087e+07,59.625050,Addicks Reservoir,3,0,"POLYGON ((3038721.900 13851266.014, 3038723.76..."


In [7]:
total_bounds = coord_gdf.total_bounds
total_bounds

array([ 2921166.      , 13769377.      ,  3202957.156923, 13967374.634684])

In [8]:
np.random.seed(3)
sample_event_num = 50
event_indices = np.random.choice(range(593), sample_event_num, replace=False)

In [9]:
event_indices

array([349, 402,  37, 345, 340, 375, 134, 268, 535,  10,  53, 161, 513,
       247, 107, 410, 428, 192, 362, 450, 378,  14, 280,  58, 312, 323,
       237, 438, 467,  48, 288, 328, 130, 137, 131, 239,  45, 194, 592,
       127, 102, 448, 488, 522, 124, 575, 492, 315, 132, 524])

In [10]:
scaler = MinMaxScaler()
xy_scaled = scaler.fit_transform(coord_gdf[['x', 'y']])

def load_and_scale(file_path, scale=False):
    df = pd.read_parquet(file_path)[['x', 'y', 'channel', 'ter', 'cumu_rain', 'peak_int', 'duration', 'depth']]
    if scale:
        df[['x', 'y']] = xy_scaled
    return df
file_paths = [f'../src/tables/data{i}.parquet' for i in event_indices]
selected_events = [load_and_scale(file, scale=True) for file in file_paths]
with ProgressBar():
    result = dpd.concat(selected_events, axis=0)
selected_events_df = result.compute()

In [11]:
selected_events_df = selected_events_df.drop(columns=['channel', 'ter', 'depth'])

In [12]:
selected_events_df['duration'].value_counts()

duration
1     289311
2     210408
3     184107
6     184107
4     105204
7      78903
9      52602
8      52602
11     26301
16     26301
14     26301
5      26301
15     26301
10     26301
Name: count, dtype: int64

In [13]:
selected_events_df['duration'] = selected_events_df['duration'].astype(float)

# cCTGAN events augmentation

In [14]:
metadata = SingleTableMetadata()
metadata.detect_from_dataframe(selected_events_df)

In [15]:
metadata

{
    "columns": {
        "x": {
            "sdtype": "numerical"
        },
        "y": {
            "sdtype": "numerical"
        },
        "cumu_rain": {
            "sdtype": "numerical"
        },
        "peak_int": {
            "sdtype": "numerical"
        },
        "duration": {
            "sdtype": "numerical"
        }
    },
    "METADATA_SPEC_VERSION": "SINGLE_TABLE_V1"
}

In [16]:
lr_sets = [
    # G, D
    [1e-5, 1e-5],
    [2e-5, 1e-5],
    [1e-4, 1e-4],
    [2e-4, 1e-4],
    [1e-3, 1e-3],
    [2e-3, 1e-3]
]

In [21]:
x_bounds_constraints = {
    'constraint_class': 'ScalarRange',
    'constraint_parameters': {
        'column_name': 'x',
        'low_value': 0.0,
        'high_value': 1.0,
        'strict_boundaries': False
    }
}

y_bounds_constraints = {
    'constraint_class': 'ScalarRange',
    'constraint_parameters': {
        'column_name': 'y',
        'low_value': 0.0,
        'high_value': 1.0,
        'strict_boundaries': False
    }
}

peak_int_constraints = {
    'constraint_class': 'PeakIntConstraintClass',
    'constraint_parameters': {
        'column_names': ['cumu_rain', 'peak_int', 'duration']
    }
}

positive_constraints = [{
    'constraint_class': 'Positive',
    'constraint_parameters': {
        'column_name': col,
        'strict_boundaries': False
    }
} for col in ['cumu_rain', 'peak_int']]

inequalty_constraints = {
    'constraint_class': 'Inequality',
    'constraint_parameters': {
        'low_column_name': 'peak_int',
        'high_column_name': 'cumu_rain'
    }
}

In [22]:
def filter_rows_by_condition(df):
    return df[(df['peak_int'] >= df['cumu_rain'] / df['duration']) & (df['cumu_rain'] >= df['peak_int'])]

In [23]:
ctgan_logger = Logger('../logs/cCTGAN_model.log')

In [25]:
for lr_id, lrs in tqdm(enumerate(lr_sets), total=len(lr_sets)):
    for epoch in range(50, 301, 50):
        try:
            ctgan_logger.log_info(f'[{lr_id+1}/{len(lr_sets)}] [Epoch: {epoch}]: {lrs[0]}_{lrs[1]}_{epoch+1}')
            ctgan_synthesizer = CTGANSynthesizer(metadata, epochs=epoch, 
                                                cuda=True, verbose=True, enforce_rounding=False, 
                                                batch_size=500, generator_lr=lrs[0], discriminator_lr=lrs[1])
            ctgan_synthesizer.load_custom_constraint_classes(
                filepath = '../models/cCTGAN.py',
                class_names = ['PeakIntConstraintClass']
            )
            ctgan_synthesizer.add_constraints(
                constraints = [x_bounds_constraints, y_bounds_constraints, inequalty_constraints] + positive_constraints
                + [peak_int_constraints]
            )
            # train ctgan
            ctgan_logger.log_info('Start training...')
            ctgan_logger.log_info(ctgan_synthesizer.get_metadata())
            ctgan_synthesizer.fit(selected_events_df)
            ctgan_synthesizer.save(f'../checkpoints/cCTGAN/{lrs[0]}_{lrs[1]}_{epoch+1}.pkl')
            ctgan_logger.log_info('Generating events...')
            filtered_df_path = f'../outputs/augmented_data/{lrs[0]}_{lrs[1]}_{epoch}.parquet'
            ctgan_synthetic_df = ctgan_synthesizer.sample(num_rows=5000000, batch_size=500)
            ctgan_synthetic_df[['x', 'y']] = scaler.inverse_transform(ctgan_synthetic_df[['x', 'y']])
            ctgan_synthetic_gdf = gpd.GeoDataFrame(
                ctgan_synthetic_df, 
                geometry=gpd.points_from_xy(ctgan_synthetic_df.x, ctgan_synthetic_df.y),
                crs=coord_gdf.crs
            )
            ctgan_logger.log_info('Filtering...')
            ctgan_synthetic_inbound_df = ctgan_synthetic_gdf.sjoin(coord_union_gdf, predicate='within').drop(columns=['geometry', 'index_right'])
            ctgan_synthetic_inbound_df = filter_rows_by_condition(ctgan_synthetic_inbound_df).reset_index(drop=True)
            ctgan_synthetic_inbound_df.to_parquet(filtered_df_path)
        except Exception as e:
            ctgan_logger.log_error(e)

  0%|                                                                                                                                   | 0/6 [00:14<?, ?it/s]


KeyboardInterrupt: 

In [None]:
ctgan_synthesizer.get_constraints()