In [None]:
!pip install sdv

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
import pickle
import seaborn as sns
import hashlib
from tqdm import tqdm
import matplotlib.pyplot as plt
from scipy.stats import ks_2samp


from sdv.metadata import SingleTableMetadata
from sdv.single_table import GaussianCopulaSynthesizer
from sdv.sampling import Condition

In [None]:
class TrafficDataDC:
    pass

with open('./TrafficDataDC.pickle', 'rb') as f:
    s3_traffic = pickle.load(f)

In [None]:
s3_traffic.df.head(5)

In [None]:
train_percentage = 1
split_index = int(len(s3_traffic.df) * train_percentage)
train_data = s3_traffic.df.iloc[:split_index, :]

In [None]:
train_data.dtypes

In [None]:
metadata = SingleTableMetadata()
metadata.detect_from_dataframe(train_data)

In [None]:
metadata.update_column(
    column_name='container_id',
    sdtype='categorical')

metadata.update_column(
    column_name='chunk_id',
    sdtype='categorical')

metadata.update_column(
    column_name='disk_capacity_tb',
    sdtype='categorical')

In [None]:
metadata

In [None]:
synthesizer = GaussianCopulaSynthesizer(metadata,
                                        enforce_min_max_values=False,
                                        enforce_rounding=True,
                                        locales='en_US'
                                       )

synthesizer.fit(train_data)

In [None]:
train_data.head()

In [None]:
class DataModel:
    def predict(self, day, month, disk_capacity, container_group):
        container_group_encoded = self.label_encoder.transform([container_group])[0]
        # Prepare input for prediction
        sample_input = pd.DataFrame({
            'disk_capacity_tb': [disk_capacity],
            'container_group_encoded': [container_group_encoded],
            'month': [month],
            'day': [day],
            'day_sin': [np.sin(day * (2. * np.pi / 31))],
            'day_cos': [np.cos(day * (2. * np.pi / 31))],
            'month_sin': [np.sin((month - 1) * (2. * np.pi / 12))],
            'month_cos': [np.cos((month - 1) * (2. * np.pi / 12))]
        })

        return int(self.model.predict(sample_input)[0])
    pass
with open('SamplePredictor.pickle', 'rb') as f:
    data_model = pickle.load(f)
    # print(data_model.predict(day=1, month=1, disk_capacity=20, container_group='02892102A8F17B5A551466B444222F4C3D9A399F'))

In [None]:
grouped_data = train_data.groupby('container_group')['container_id'].agg(list).reset_index()

container_dict = dict(zip(grouped_data['container_group'], grouped_data['container_id']))

def generate_conditions(start_date, end_date, disk_capacity, container_groups):
    conditions = []
    current_date = start_date
    delta = timedelta(days=1)
    while current_date <= end_date:
        for container_group, container_id_types in container_groups.items():
            container_ids = random.sample(container_dict[container_group], container_id_types)
            num_rows = data_model.predict(day=current_date.day, month=current_date.month, disk_capacity=disk_capacity, container_group=container_group)
            conditions.extend([Condition(num_rows=int(num_rows/len(container_ids)), 
                                         column_values={'datetime_column': current_date, 
                                                        'container_group': container_group,
                                                        'container_id': container_id
                                                       }
                                        )
                               for container_id in container_ids])
        current_date += delta
    return conditions

def generate_conditions(start_date, end_date, disk_capacity, container_groups):
    conditions = []
    
    total_samples = 0
    
    current_date = start_date
    delta = timedelta(days=1)

    for container_group, container_id_types in container_groups.items():
        container_ids = random.sample(container_dict[container_group], container_id_types)
        current_date = start_date
        while current_date <= end_date:

            num_rows = data_model.predict(day=current_date.day, month=current_date.month, disk_capacity= disk_capacity, container_group= container_group)
            total_samples += num_rows
            print(container_ids)
            print({'datetime_column': current_date, 
                                                        'disk_capacity_tb': disk_capacity_tb, 
                                                        'container_group': container_group, 'num_rows':int(num_rows/len(container_ids))})
            conditions.extend([Condition(num_rows=int(num_rows/len(container_ids)), 
                                         column_values={'datetime': current_date, 
                                                        'container_group': container_group,
                                                        'container_id': container_id
                                                       }
                                        )
                               for container_id in container_ids])

            current_date += delta
    print(total_samples)
    return conditions

# defining variables to be passed to the synthesizer
start_date = datetime.strptime('2022-01-27', '%Y-%m-%d')
end_date = datetime.strptime('2022-01-28', '%Y-%m-%d')
disk_capacity_tb = 16
# this translates to I want 2 container_ids of type X and 3 container_ids of type Y
container_groups = {'02892102A8F17B5A551466B444222F4C3D9A399F':2, 'CC21F742BC91C1A0ED11A719D5C2CE74690BCD44':3}

conditions = generate_conditions(start_date, end_date, disk_capacity_tb, container_groups)

In [None]:
csd = synthesizer.sample_from_conditions(conditions=conditions)

In [None]:
csd.head(5)

In [None]:
len(csd)

In [None]:
csd.groupby('container_id').count()

In [None]:
temp = csd.groupby(['container_group', 'container_id'])['chunk_id'].count().reset_index()
tr = dict(temp.groupby('container_group')['chunk_id'].sum())


print(train_data[train_data['container_id'] == container]['location_id'].unique())
print(train_data[train_data['container_id'] == container]['server_id'].unique())
print(train_data[train_data['container_id'] == container]['config_id'].unique())
print(type(train_data[train_data['container_id'] == container]['disk_id'].unique()))

In [None]:
csd['disk_capacity_tb'] = disk_capacity_tb

In [None]:
csd.describe()

In [None]:
class DataQValidation:

    def __init__(self, original, synthetic, start_date, end_date, disk_capacity_tb, container_groups):
        self.original_df = original
        self.synthetic_df = synthetic

        self.start_date = start_date
        self.end_date = end_date
        self.disk_cap = disk_capacity_tb
        self.container_groups = container_groups

        self.original_df['datetime'] = pd.to_datetime(self.original_df['datetime'], errors='coerce')
        self.synthetic_df['datetime'] = pd.to_datetime(self.synthetic_df['datetime'], errors='coerce')

        self.original_df = self.original_df.sort_values(by='datetime')
        self.synthetic_df = self.synthetic_df.sort_values(by='datetime')

    def unique_data(self):

        def row_hash(row):
            return hashlib.sha256(row.to_string().encode('utf-8')).hexdigest()

        original_hashes = self.original_df.apply(lambda row: row_hash(row), axis=1)
        synthetic_hashes = self.synthetic_df.apply(lambda row: row_hash(row), axis=1)

        tqdm.pandas(desc="Hashing original rows")
        original_hashes = self.original_df.progress_apply(row_hash, axis=1)

        tqdm.pandas(desc="Hashing synthetic rows")
        synthetic_hashes = self.synthetic_df.progress_apply(row_hash, axis=1)

        original_set = set(original_hashes)
        synthetic_set = set(synthetic_hashes)

        exact_matches = original_set.intersection(synthetic_set)

        print("Number of exact matches with original data:", len(exact_matches))

        print("Number of exact matches within synthetic data:", len(synthetic_hashes) - len(synthetic_set))

    def check_capacity(self):

        cap_cumulative_test = True
        positive_chunk = True

        cumulative_sizes = {container_id: 0 for container_id in self.synthetic_df['container_id'].unique()}
        fixed_disk_cap_gb = 16 * 1e9

        for index, row in self.synthetic_df.iterrows():

            if row['chunk_size'] < 0:
                positive_chunk = False

            if row['operation'] == 'WRITE':
                cumulative_sizes[row['container_id']] += row['chunk_size']

            if row['operation'] == 'DELETE_PERFORMED':
                cumulative_sizes[row['container_id']] -= row['chunk_size']

            if cumulative_sizes[row['container_id']] > fixed_disk_cap_gb: # Assuming we get 100% of the theoretical space
                cap_cumulative_test = False

        if cap_cumulative_test:
            print("Cumulative Data Test Passed : Size did not exceed disk capacity for any container at any given time")
        else:
            print("Cumulative Data Test Failed : Size did exceeded disk capacity for some container at a given time")

        if positive_chunk:
            print("Positive Chunk Size Test Passed")
        else:
            print("Positive Chunk Size Test Failed")
            
        print(cumulative_sizes)
        print(fixed_disk_cap_gb)

    def null_check(self):
        if self.synthetic_df.isnull().values.any():
            print("Null Test Failed : Null values found in the generated data")
        else:
            print("Null Test Passed")

    def range_check(self):

        COUNT_THRESHOLD = 3

        container_group_master = self.original_df['container_group'].unique()

        operations = set(self.synthetic_df['operation'])
        ops_master = set(self.original_df['operation'])
        if not operations.issubset(ops_master):
            print('Operation Range Test Failed : Unkown Operation found')
        else:
            print(f'Operation Range Test Passed : Unique operations found in synthetic dataset are\n{operations}')

        print('\n')
        if self.disk_cap:
            disk_caps = set(self.synthetic_df['disk_capacity_tb'])
            if len(disk_caps) == 1:
                if next(iter(disk_caps)) == self.disk_cap:
                    print("Disk Capacity Range Test Passed : Synthetic data generated for the conditioned disk capacity")
                else:
                    print("Disk Capacity Range Test Failed : Synthetic data generated has different disk capacity than the condition")
            else:
                print("Disk Capacity Range Test Failed : 0 or more than 1 unique disk capacities found in synthetic data")

        print('\n')
        if self.container_groups:
            if not set(self.container_groups.keys()).issubset(container_group_master):
                print("Container Group Range Test cannot be perfomed, synthetic data generated on different container groups than original data")
            else:
                synthetic_cg_master = self.synthetic_df['container_group'].unique()
                if not set(self.container_groups.keys()).issubset(synthetic_cg_master):
                    print("Container Group Range Test Failed : synthetic data is missing some conatiner groups")
                elif not set(synthetic_cg_master).issubset(self.container_groups.keys()):
                    print("Container Group Range Test Failed : synthetic data has extra conatiner groups not passed in condition")
                else:
                    print("Container Group Range Test Passed : synthetic data and conditions have exact container groups")

        synthetic_cg_metadf = self.synthetic_df.groupby(['container_group', 'container_id'])['chunk_id'].count().reset_index()
        transaction_counts = dict(synthetic_cg_metadf.groupby('container_group')['chunk_id'].sum())
        container_counts = dict(synthetic_cg_metadf.groupby('container_group')['container_id'].count())
        
        predictor = self.load_predictor_model()
        delta = timedelta(days=1)

        transactions_test = True
        container_count_test = True
        total_transactions = 0
        for cg, ccount in self.container_groups.items():
            current_date = self.start_date
            predicted_count = 0
            while current_date <= end_date:
                predicted_count += predictor.predict(day=current_date.day, month=current_date.month, disk_capacity= self.disk_cap, container_group= cg)
                current_date += delta

            if abs(predicted_count - transaction_counts[cg]) > COUNT_THRESHOLD:
                transactions_test = False
            
            # print(container_counts[cg])
            # print(ccount)
            if ccount != container_counts[cg]:
                container_count_test = False

        print('\n')
        if transactions_test:
            print('Transaction Number Test Passed : Number of samples generated is in lieu with models prediction')
        else:
            print('Transaction Number Test Failed : Number of samples generated is not in lieu with models prediction')

        print('\n')
        if container_count_test:
            print('Container Count Test Passed : Number of unique containers found in synthetic data is equal to condition')
        else:
            print('Container Count Test Failed : Number of unique containers found in synthetic data is not equal to condition')

        containers = list(synthetic_cg_metadf['container_id'])

        lid_test = True
        sid_test = True
        cid_test = True
        did_test = True

        for co in containers:
            uog = self.original_df[self.original_df['container_id'] == co]
            uog_lid = set(uog['location_id'].unique())
            uog_sid = set(uog['server_id'].unique())
            uog_cid = set(uog['config_id'].unique())
            uog_did = set(uog['disk_id'].unique())

            us = self.synthetic_df[self.synthetic_df['container_id'] == co]
            us_lid = set(us['location_id'].unique())
            us_sid = set(us['server_id'].unique())
            us_cid = set(us['config_id'].unique())
            us_did = set(us['disk_id'].unique())

            if not us_lid.issubset(uog_lid):
                lid_test = False
            if not us_sid.issubset(uog_sid):
                sid_test = False
            if not us_cid.issubset(uog_cid):
                cid_test = False
            if not us_did.issubset(uog_did):
                did_test = False

        print('\n')
        if lid_test:
            print('Location Uniqueness Test Passed : Container in synthetic data and original data belong to same location id')
        else:
            print('Location Uniqueness Test Failed : Container in synthetic data and original data do not belong to same location id')

        print('\n')
        if sid_test:
            print('Server Uniqueness Test Passed : Container in synthetic data and original data belong to same server id')
        else:
            print('Server Uniqueness Test Failed : Container in synthetic data and original data do not belong to same server id')

        print('\n')
        if cid_test:
            print('Config Uniqueness Test Passed : Container in synthetic data and original data belong to same config id')
        else:
            print('Config Uniqueness Test Failed : Container in synthetic data and original data do not belong to same config id')

        print('\n')
        if did_test:
            print('Disk Uniqueness Test Passed : Container in synthetic data and original data belong to same disk id')
        else:
            print('Disk Uniqueness Test Failed : Container in synthetic data and original data do not belong to same disk id')

        outside_range = self.synthetic_df[(self.synthetic_df['datetime'] < self.start_date) | (self.synthetic_df['datetime'] > self.end_date)]

        print('\n')
        if outside_range.empty:
            print('Date Range Test Passed : All dates are within the provided start and end date')
        else:
            print('Date Range Test Failed : Some dates are outside start and end date')

    def load_predictor_model(self):
        class DataModel:
            def predict(self, day, month, disk_capacity, container_group):
                container_group_encoded = self.label_encoder.transform([container_group])[0]
                # Prepare input for prediction
                sample_input = pd.DataFrame({
                    'disk_capacity_tb': [disk_capacity],
                    'container_group_encoded': [container_group_encoded],
                    'month': [month],
                    'day': [day],
                    'day_sin': [np.sin(day * (2. * np.pi / 31))],
                    'day_cos': [np.cos(day * (2. * np.pi / 31))],
                    'month_sin': [np.sin((month - 1) * (2. * np.pi / 12))],
                    'month_cos': [np.cos((month - 1) * (2. * np.pi / 12))]
                })

                return int(self.model.predict(sample_input)[0])
            pass
        with open('SamplePredictor.pickle', 'rb') as f:
            data_model = pickle.load(f)
        return data_model

    def stat_dist(self, col, isCat=False):
        if isCat:
            print(f"Original {col} distribution:\n", self.original_df[col].value_counts(normalize=True))
            print(f"Synthetic {col} distribution:\n", self.synthetic_df[col].value_counts(normalize=True))
        else:
            fig, ax = plt.subplots(figsize=(10, 6))

            # Plotting histograms
            sns.histplot(self.original_df[col], color="skyblue", label='Original', kde=True, ax=ax)
            sns.histplot(self.synthetic_df[col], color="red", label='Synthetic', kde=True, ax=ax, alpha=0.6)

            plt.legend()
            plt.title(f'Distribution of {col}')

            # Summary statistics
            original_stats = self.original_df[col].describe()
            synthetic_stats = self.synthetic_df[col].describe()

            stats_df = pd.DataFrame({'Original': original_stats, 'Synthetic': synthetic_stats})
            stats_text = stats_df.to_string()

            # Adding text box for summary statistics
            plt.text(1.05, 0.95, stats_text, transform=ax.transAxes, fontsize=10, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.5))

            plt.show()

    def unique_data_by_col(self, col):
        print(f"Unique values in original data {col}:", self.original_df[col].nunique())
        print(f"Unique values in synthetic data {col}:", self.synthetic_df[col].nunique())

    @staticmethod
    def parse_datetime(df):
        df['datetime'] = pd.to_datetime(df['datetime'], format='%Y-%m-%d %H:%M:%S')
        return df

    def compare_numerical_distributions(self, col):
        """
        Compare the distributions of a numerical column using histograms and the Kolmogorov-Smirnov test.
        """
        plt.figure(figsize=(12, 6))
        sns.histplot(self.original_df[col], color="skyblue", label="Original", kde=True, stat="density", bins=30)
        sns.histplot(self.synthetic_df[col], color="red", label="Synthetic", kde=True, stat="density", bins=30)
        plt.legend()
        plt.title(f'Distribution Comparison for {col}')
        plt.show()

        stat, p = ks_2samp(self.original_df[col], self.synthetic_df[col])
        print(f"Kolmogorov-Smirnov test for {col}: Statistic={stat:.4f}, P-value={p:.4g}")

    def compare_categorical_distributions(self, col):
        """
        Compare the distributions of a categorical column using count plots on the same graph
        for both original and synthetic datasets.
        :param col: The column name for the categorical data.
        """
        original_df_copy = self.original_df.copy()
        synthetic_df_copy = self.synthetic_df.copy()
        original_df_copy['Dataset'] = 'Original'
        synthetic_df_copy['Dataset'] = 'Synthetic'

        combined_df = pd.concat([original_df_copy, synthetic_df_copy], ignore_index=True)

        # Plotting
        plt.figure(figsize=(10, 6))
        sns.countplot(x=col, hue='Dataset', data=combined_df, palette='viridis')
        plt.title(f'Comparison of {col} Distribution Between Original and Synthetic Data')
        plt.xticks(rotation=45)  # Rotate the x-axis labels for better readability
        plt.legend(title='Dataset')
        plt.tight_layout()
        plt.show()

    def temporal_distribution_check(self):
        """
        Validate the consistency of event distribution over time.
        """
        original_timeseries = self.original_df.set_index('datetime').resample('M').size()
        synthetic_timeseries = self.synthetic_df.set_index('datetime').resample('M').size()

        plt.figure(figsize=(12, 6))
        original_timeseries.plot(label='Original', color='blue')
        synthetic_timeseries.plot(label='Synthetic', color='red')
        plt.legend()
        plt.title('Temporal Distribution Comparison')
        plt.xlabel('Month')
        plt.ylabel('Count')
        plt.show()

    def plot_cdf(self, col):
        """
        Plots the CDF for a numerical column for both original and synthetic datasets.
        :param col: The column name to analyze.
        """
        x_original = np.sort(self.original_df[col])
        y_original = np.arange(1, len(x_original)+1) / len(x_original)
        x_synthetic = np.sort(self.synthetic_df[col])
        y_synthetic = np.arange(1, len(x_synthetic)+1) / len(x_synthetic)

        plt.figure(figsize=(10, 6))
        plt.plot(x_original, y_original, marker='.', linestyle='none', label='Original')
        plt.plot(x_synthetic, y_synthetic, marker='.', linestyle='none', label='Synthetic')
        plt.legend()
        plt.title(f'Cumulative Distribution Function (CDF) of {col}')
        plt.xlabel(col)
        plt.ylabel('CDF')
        plt.show()

    def run_test_suite(self):
        self.null_check()
        print('\n')
        print('-'*50)
        self.stat_dist(col='chunk_size')
        print('\n')
        print('-'*50)
        self.unique_data()
        print('\n')
        print('-'*50)
        self.range_check()
        print('\n')
        print('-'*50)
        self.check_capacity()
        print('\n')
        print('-'*50)
        self.compare_numerical_distributions(col='chunk_size')
        print('\n')
        print('-'*50)
        self.plot_cdf(col='chunk_size')
        print('\n')
        print('-'*50)
        self.temporal_distribution_check()

In [None]:
validation = DataQValidation(train_data, csd, start_date, end_date, disk_capacity_tb, container_groups)

In [None]:
validation.range_check()

In [None]:
validation.run_test_suite()

In [None]:
containers = list(csd['container_id'].unique())

for co in containers:
    print(co)
    uog = train_data[train_data['container_id'] == co]
    uog_lid = set(uog['server_id'].unique())
    print(uog_lid)
    print()
    us = csd[csd['container_id'] == co]
    us_lid = set(us['server_id'].unique())
    print(us_lid)
    print(us_lid.issubset(uog_lid))
    print('-'*50)

In [None]:
validation.check_capacity()

In [None]:
validation.stat_dist(col='chunk_size')
print("------------------------------")
validation.unique_data_by_col(col='container_id')
print("------------------------------")
validation.unique_data_by_col(col='container_group')

In [None]:
validation.unique_data()

In [None]:
validation.check_capacity()

In [None]:
validation.compare_categorical_distributions('operation')

In [None]:
validation.temporal_distribution_check()

In [None]:
validation.compare_numerical_distributions('chunk_size')

In [None]:
from statsmodels.graphics.gofplots import qqplot
qqplot(train_data['chunk_size'], line='s')
plt.title('Q-Q Plot of chunk_size')
plt.show()

In [None]:
validation.plot_cdf('chunk_size')

In [None]:
grouped_data = train_data.groupby('container_group')['container_id'].agg(list).reset_index()
container_dict = dict(zip(grouped_data['container_group'], grouped_data['container_id']))

In [None]:
def generate_conditions(start_date, end_date, disk_capacity, container_groups, num_rows):
    conditions = []
    
    current_date = start_date
    delta = timedelta(days=1)

    while current_date <= end_date:
        for container_group, container_id_types in container_groups.items():
            container_ids = random.sample(container_dict[container_group], container_id_types)
            
            conditions.extend([Condition(num_rows=num_rows, 
                                         column_values={'datetime': current_date, 
                                                        'disk_capacity_tb': disk_capacity, 
                                                        'container_group': container_group,
                                                        'container_id': container_id
                                                       }
                                        )
                               for container_id in container_ids])

        current_date += delta

    return conditions

In [None]:
# defining variables to be passed to the synthesizer
start_date = datetime.strptime('2022-02-01', '%Y-%m-%d')
end_date = datetime.strptime('2022-02-02', '%Y-%m-%d')
disk_capacity = 16
# this translates to I want 2 container_ids of type X and 3 container_ids of type Y
container_groups = {'02892102A8F17B5A551466B444222F4C3D9A399F':2, 'CC21F742BC91C1A0ED11A719D5C2CE74690BCD44':3}
num_rows = 20000

In [None]:
conditions = generate_conditions(start_date, end_date, disk_capacity, container_groups, num_rows)

In [None]:
csd = synthesizer.sample_from_conditions(conditions=conditions)

In [None]:
csd

In [None]:
validation_conditional = DataQValidation(s3_traffic.df, csd)

In [None]:
validation_conditional.stat_dist(col='chunk_size')
print("------------------------------")
validation_conditional.unique_data_by_col(col='container_id')
print("------------------------------")
validation_conditional.unique_data_by_col(col='container_group')

In [None]:
validation_conditional.unique_data()

In [None]:
validation_conditional.check_capacity()

In [None]:
validation_conditional.compare_categorical_distributions('operation')

In [None]:
validation_conditional.compare_numerical_distributions('chunk_size')

In [None]:
validation_conditional.plot_cdf('chunk_size')