In [13]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler
import networkx as nx
import torch
from torch_geometric.data import Data
from torch_geometric.utils import from_networkx

# 加载数据
file_path_clean = '/home/sdong/data/taxi/yellow_tripdata_sample.csv'
file_path_origi = '/home/sdong/data/taxi/yellow_tripdata_missing.csv'

data = pd.read_csv(file_path_clean)
data_dirty = pd.read_csv(file_path_origi)


# 保留指定的列
columns_to_keep = [
    "VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count",
    "trip_distance"
]
# columns_to_keep = [
#     "VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count",
#     "trip_distance", "pickup_longitude", "pickup_latitude", "RateCodeID",
#     "store_and_fwd_flag", "dropoff_longitude"
# ]
data = data[columns_to_keep]
data_dirty = data_dirty[columns_to_keep]
# 设置显示所有列和部分行
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 10)  
# 显示数据集的前几行和数据结构
print(data.info())
print(data_dirty.info())
# 保存原始DataFrame的列名
column_names = data_dirty.columns


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000000 entries, 0 to 9999999
Data columns (total 5 columns):
 #   Column                 Dtype  
---  ------                 -----  
 0   VendorID               int64  
 1   tpep_pickup_datetime   object 
 2   tpep_dropoff_datetime  object 
 3   passenger_count        int64  
 4   trip_distance          float64
dtypes: float64(1), int64(2), object(2)
memory usage: 381.5+ MB
None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000000 entries, 0 to 9999999
Data columns (total 5 columns):
 #   Column                 Dtype  
---  ------                 -----  
 0   VendorID               float64
 1   tpep_pickup_datetime   object 
 2   tpep_dropoff_datetime  object 
 3   passenger_count        float64
 4   trip_distance          float64
dtypes: float64(3), object(2)
memory usage: 381.5+ MB
None


In [14]:

data.fillna(0, inplace=True)
# 检查非数值列
non_numeric_cols = data.select_dtypes(include=['object']).columns
print("Non-numeric columns:", non_numeric_cols)

# 将非数值列转换为数值类型（使用标签编码）
label_encoders = {}
for col in non_numeric_cols:
    le = LabelEncoder()
    data[col] = le.fit_transform(data[col].astype(str))
    label_encoders[col] = le

# 确保所有特征列都是数值类型
print("Data types after encoding:\n", data.dtypes)


data_dirty.fillna(0, inplace=True)
# 检查非数值列
non_numeric_cols = data_dirty.select_dtypes(include=['object']).columns
print("Non-numeric columns:", non_numeric_cols)

# 将非数值列转换为数值类型（使用标签编码）
label_encoders = {}
for col in non_numeric_cols:
    le = LabelEncoder()
    data_dirty[col] = le.fit_transform(data_dirty[col].astype(str))
    label_encoders[col] = le

# 确保所有特征列都是数值类型
print("Data types after encoding:\n", data_dirty.dtypes)

Non-numeric columns: Index(['tpep_pickup_datetime', 'tpep_dropoff_datetime'], dtype='object')
Data types after encoding:
 VendorID                   int64
tpep_pickup_datetime       int64
tpep_dropoff_datetime      int64
passenger_count            int64
trip_distance            float64
dtype: object
Non-numeric columns: Index(['tpep_pickup_datetime', 'tpep_dropoff_datetime'], dtype='object')
Data types after encoding:
 VendorID                 float64
tpep_pickup_datetime       int64
tpep_dropoff_datetime      int64
passenger_count          float64
trip_distance            float64
dtype: object


In [15]:
# 标准化数值特征
scaler = MinMaxScaler()
data = scaler.fit_transform(data)

# 标准化数值特征
scaler = MinMaxScaler()
data_dirty = scaler.fit_transform(data_dirty)


In [16]:
data = pd.DataFrame(data, columns=column_names)
data_dirty = pd.DataFrame(data_dirty, columns=column_names)

In [17]:
# 检查两个数据集中是否所有值都在0到1之间
def check_values_in_range(df, lower=0, upper=1):
    return ((df >= lower) & (df <= upper)).all().all()

is_data_in_range = check_values_in_range(data)
is_data_dirty_in_range = check_values_in_range(data_dirty)

print(f"Clean data values are within [0, 1]: {is_data_in_range}")
print(f"Dirty data values are within [0, 1]: {is_data_dirty_in_range}")

if not is_data_in_range:
    print("Clean data contains values out of range [0, 1].")

if not is_data_dirty_in_range:
    print("Dirty data contains values out of range [0, 1].")

# 如果需要，打印出不在范围内的值和对应的索引
def find_out_of_range_values(df, lower=0, upper=1):
    out_of_range = df[(df < lower) | (df > upper)]
    return out_of_range.dropna(how='all')

if not is_data_in_range:
    out_of_range_clean = find_out_of_range_values(data)
    print("Out of range values in clean data:")
    print(out_of_range_clean)
    out_of_range_clean_indices = out_of_range_clean.index
    data = data.drop(out_of_range_clean_indices)

if not is_data_dirty_in_range:
    out_of_range_dirty = find_out_of_range_values(data_dirty)
    print("Out of range values in dirty data:")
    print(out_of_range_dirty)
    out_of_range_dirty_indices = out_of_range_dirty.index
    data_dirty = data_dirty.drop(out_of_range_dirty_indices)
    # 获取不在范围内的值的索引



# 删除不在范围内的行



Clean data values are within [0, 1]: True
Dirty data values are within [0, 1]: True


In [18]:
import torch
from torch import nn
from torch.nn import functional as F
from sklearn.model_selection import train_test_split

# 假设 data 已经是一个经过预处理的 DataFrame
data_array = data.values.astype(np.float32)  # 转换为浮点数类型的 NumPy 数组

# 分割数据为训练集和临时测试集（包括真正的测试集和验证集）
train_data, val_test_data = train_test_split(data_array, test_size=0.5, random_state=42)

# 将训练验证集进一步分割为训练集和验证集
val_data, test_data = train_test_split(val_test_data, test_size=0.5, random_state=42)  # 0.25 x 0.8 = 0.2


# 转换为PyTorch张量


# 创建数据加载器
from torch.utils.data import DataLoader, TensorDataset

batch_size = 1280  # 或者任何适合你GPU的大小

train_tensor = torch.tensor(train_data) #0.6
train_dataset = TensorDataset(train_tensor, train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_tensor = torch.tensor(val_data) #0.2
val_dataset = TensorDataset(val_tensor, val_tensor)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

test_tensor = torch.tensor(test_data)  #20%
test_dataset = TensorDataset(test_tensor, test_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)  # 50个批次
#len(test_dataset)// 50

data_dirty_array = data_dirty.values.astype(np.float32)  # 转换为浮点数类型的 NumPy 数组
test_dirty_tensor = torch.tensor(data_dirty_array)  #20%
test_dirty_dataset = TensorDataset(test_dirty_tensor, test_dirty_tensor)
test_dirty_loader = DataLoader(test_dirty_dataset, batch_size=batch_size, shuffle=False)  # 50个批次


In [19]:
import torch
from torch import nn
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        # Encoder
        self.fc1 = nn.Linear(5, 128)  # Input layer
        self.fc2 = nn.Linear(128, 64)  # Hidden layer
        self.fc31 = nn.Linear(64, 20)  # Output layer for mu
        self.fc32 = nn.Linear(64, 20)  # Output layer for logvar

        # Decoder
        self.fc4 = nn.Linear(20, 64)   # Input layer
        self.fc5 = nn.Linear(64, 128)  # Hidden layer
        self.fc6 = nn.Linear(128, 5)  # Output layer

    def encode(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        return self.fc31(h2), self.fc32(h2)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar) + 1e-8  # Adding a small constant for numerical stability
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h3 = F.relu(self.fc4(z))
        h4 = F.relu(self.fc5(h3))
        return torch.sigmoid(self.fc6(h4))  # Use sigmoid to ensure output is between 0 and 1

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

# Instantiate the model
model = VAE()
print(model)


VAE(
  (fc1): Linear(in_features=5, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc31): Linear(in_features=64, out_features=20, bias=True)
  (fc32): Linear(in_features=64, out_features=20, bias=True)
  (fc4): Linear(in_features=20, out_features=64, bias=True)
  (fc5): Linear(in_features=64, out_features=128, bias=True)
  (fc6): Linear(in_features=128, out_features=5, bias=True)
)


In [20]:
#device = torch.device("cpu")
model = VAE().to(device)

import torch.optim as optim

# 设置优化器
optimizer = optim.Adam(model.parameters(), lr=1e-3)

def loss_function(recon_x, x, mu, logvar):
    # 确保目标张量也是浮点类型且维度匹配
    recon_x = torch.clamp(recon_x, 0, 1)  # 确保输出值在[0, 1]范围内
    BCE = F.binary_cross_entropy(recon_x, x, reduction='none')  # 保留每个样本的损失
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=1, keepdim=True)  # 保持维度
    return BCE + KLD



In [21]:
import torch

def train(epoch):
    model.train()
    total_BCE = 0
    total_KLD = 0
    total_loss = 0
    for batch_idx, (data, _) in enumerate(train_loader):  # 使用TensorDataset，数据被重复用作输入和标签
        data = data.to(device)
        # 检查输入数据的范围
        if (data < 0).any() or (data > 1).any():
            raise ValueError("Input data contains values out of range [0, 1]")
        
        optimizer.zero_grad()
        recon_batch, mu, logvar = model(data)
        if (recon_batch < 0).any() or (recon_batch > 1).any():
            raise ValueError("Warning: recon_batch contains values out of range [0, 1]")
        loss = loss_function(recon_batch, data, mu, logvar)  # 这里loss是每个样本的损失
        loss.mean().backward()  # 使用.mean()在所有样本上平均损失然后进行反向传播
        total_loss += loss.sum().item()  # 更新总损失
        optimizer.step()
        
        # if batch_idx % 100 == 0:
        #     print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
        #         epoch, batch_idx * len(data), len(train_loader.dataset),
        #         100. * batch_idx / len(train_loader), loss.mean().item()))

    # 打印每个epoch的平均损失
    print(f'Epoch: {epoch} Total Loss: {total_loss / len(train_loader.dataset)}')

# 训练模型
num_epochs = 10  # 可根据需要调整
for epoch in range(1, num_epochs + 1):
    train(epoch)

# 保存模型的状态字典
torch.save(model.state_dict(), 'vae_model_ny_graph_scal.pth')
print("Model saved to vae_model_ny_graph_scal.pth")

# 添加调试信息
if torch.cuda.is_available():
    torch.cuda.synchronize()
    torch.cuda.empty_cache()
    print("CUDA cache cleared.")


Epoch: 1 Total Loss: 2.5664610731933593
Epoch: 2 Total Loss: 2.559936175830078
Epoch: 3 Total Loss: 2.559724727282715
Epoch: 4 Total Loss: 2.5596493733764647
Epoch: 5 Total Loss: 2.559626724243164
Epoch: 6 Total Loss: 2.5596165051757813
Epoch: 7 Total Loss: 2.5596081761230467
Epoch: 8 Total Loss: 2.5596041731445314
Epoch: 9 Total Loss: 2.5595967587646484
Epoch: 10 Total Loss: 2.5595948743408203
Model saved to vae_model_ny_graph_scal.pth
CUDA cache cleared.


In [10]:
def evaluate_model(model, data_loader):
    model.eval()  # 切换到评估模式
    total_loss = 0
    with torch.no_grad():  # 关闭梯度计算
        for inputs, _ in data_loader:  # 假设 data_loader 返回 inputs 和 targets，这里我们不需要 targets
            inputs = inputs.to(device)  # 确保将 inputs 转移到正确的设备
            recon, mu, logvar = model(inputs)
            loss = loss_function(recon, inputs, mu, logvar)  # 每个样本的损失列表
            total_loss += loss.sum().item()  # 累计所有样本的损失

    average_loss = total_loss / len(data_loader.dataset)
    return average_loss

# 加载模型
model = VAE().to(device)
model.load_state_dict(torch.load('vae_model_ny_graph_scal.pth'))

# 计算测试集和验证集上的平均损失
val_loss = evaluate_model(model, val_loader)
test_loss = evaluate_model(model, test_loader)
test_dirty_loss = evaluate_model(model, test_dirty_loader)

print(f"Average loss on validation data: {val_loss}")
print(f"Average loss on test data: {test_loss}")
print(f"Average loss on test dirty data: {test_dirty_loss}")


Average loss on validation data: 5.411014973925782
Average loss on test data: 5.4115777953125
Average loss on test dirty data: 9.254327900585938


In [22]:
import numpy as np

def collect_reconstruction_errors(model, data_loader):
    model.eval()
    reconstruction_errors = []
    with torch.no_grad():
        for inputs, _ in data_loader:  # 假设 data_loader 返回的是 inputs 和 labels，这里我们忽略 labels
            inputs = inputs.to(device)  # 将输入数据移动到正确的设备
            recon, mu, logvar = model(inputs)
            loss = loss_function(recon, inputs, mu, logvar)  # 每个样本的损失列表
            loss_per_sample = loss.sum(dim=1)
            # print("Type of loss:", type(loss_per_sample))
            # print("Shape of loss:", loss_per_sample.shape)
            #print("First few loss values:", loss[:10])  # 打印前10个损失值
            reconstruction_errors.extend(loss_per_sample.tolist())  

    return reconstruction_errors

# 收集验证集的重构误差
val_errors = collect_reconstruction_errors(model, val_loader)

threshold = np.quantile(val_errors, 0.95)  # 计算95%分位数作为阈值
print(f"Loss threshold for detecting data quality issues: {threshold}")

min_val_error = min(val_errors)
max_val_error = max(val_errors)
mean_val_error = sum(val_errors) / len(val_errors)
print(f"Min validation error: {min_val_error}")
print(f"Max validation error: {max_val_error}")
print(f"Mean validation error: {mean_val_error}")
print(f"95th percentile of validation errors: {np.quantile(val_errors, 0.95)}")
print(f"Maximum validation error (100th percentile): {np.quantile(val_errors, 1)}")


Loss threshold for detecting data quality issues: 3.086783230304718
Min validation error: 2.1996965408325195
Max validation error: 12.267314910888672
Mean validation error: 2.559661284045696
95th percentile of validation errors: 3.086783230304718
Maximum validation error (100th percentile): 12.267314910888672


In [23]:
import torch
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import time

def detect_quality_issues(model, data_loader, threshold):
    model.eval()
    current_issues_count = 0
    total_samples = 0  # 累积处理的样本总数

    start_time = time.time()  # 开始计时
    with torch.no_grad():
        for inputs, _ in data_loader:
            inputs = inputs.to(device)
            recon, mu, logvar = model(inputs)
            losses = loss_function(recon, inputs, mu, logvar)  # 获取每个样本的损失
            loss_per_sample = losses.sum(dim=1)
            # 检查每个样本是否有问题
            for loss in loss_per_sample:
                if loss.item() > threshold:
                    current_issues_count += 1
            total_samples += inputs.size(0)  # 更新处理的样本总数
    end_time = time.time()  # 结束计时

    run_time = end_time - start_time  # 计算运行时间
    return current_issues_count, total_samples, run_time

def test_random_samples(model, data, threshold, data_sizes):
    for size in data_sizes:
        sample_data = data[:size]  # 从数据中获取指定大小的样本
        sample_data_array = sample_data.values.astype(np.float32)  # 转换为浮点数类型的 NumPy 数组
        sample_tensor = torch.tensor(sample_data_array)  # 转换为tensor
        sample_dataset = TensorDataset(sample_tensor, sample_tensor)
        test_dirty_loader = DataLoader(sample_dataset, batch_size=len(sample_tensor), shuffle=False)  # 使用整个样本作为一个批次
        current_issues_count, total_samples, run_time = detect_quality_issues(model, test_dirty_loader, threshold)
        print(f"样本大小: {size}, 运行时间: {run_time:.3f} 秒")
        if current_issues_count > total_samples * 0.1:
            print(f"Random sample size {size} is problematic: {current_issues_count} out of {total_samples} samples are faulty ({(current_issues_count / total_samples * 100):.2f}%).")
        else:
            print(f"Random sample size {size} is ok: {current_issues_count} out of {total_samples} samples are faulty ({(current_issues_count / total_samples * 100):.2f}%).")
        # 保存结果
        results.append((size, run_time))
# 假设 model, data_dirty, threshold, device 已经被正确定义和设置

results = []
data_sizes = [1000, 5000, 10000, 20000, 50000, 100000, 200000, 500000, 1000000]
test_random_samples(model, data_dirty, threshold, data_sizes)

# 将结果保存到文本文件
with open("/home/sdong/experiments/results/results.txt", "a") as file:
    file.write(f"NY taxi My approach scalability:\n")
    for size, time in results:
        file.write(f"Data Size: {size}, Time Taken: {time} seconds\n")


样本大小: 1000, 运行时间: 0.016 秒
Random sample size 1000 is ok: 58 out of 1000 samples are faulty (5.80%).
样本大小: 5000, 运行时间: 0.070 秒
Random sample size 5000 is ok: 266 out of 5000 samples are faulty (5.32%).
样本大小: 10000, 运行时间: 0.140 秒
Random sample size 10000 is ok: 579 out of 10000 samples are faulty (5.79%).
样本大小: 20000, 运行时间: 0.409 秒
Random sample size 20000 is ok: 1158 out of 20000 samples are faulty (5.79%).
样本大小: 50000, 运行时间: 0.979 秒
Random sample size 50000 is ok: 2979 out of 50000 samples are faulty (5.96%).
样本大小: 100000, 运行时间: 1.998 秒
Random sample size 100000 is ok: 6028 out of 100000 samples are faulty (6.03%).
样本大小: 200000, 运行时间: 3.812 秒
Random sample size 200000 is ok: 12092 out of 200000 samples are faulty (6.05%).
样本大小: 500000, 运行时间: 9.392 秒
Random sample size 500000 is ok: 31151 out of 500000 samples are faulty (6.23%).
样本大小: 1000000, 运行时间: 18.073 秒
Random sample size 1000000 is ok: 61718 out of 1000000 samples are faulty (6.17%).
