In [None]:

import gradio as gr
import os
import pandas as pd

def plot(target, forecast, prediction_length, prediction_intervals=(50.0, 90.0), color='g', fname=None):
label_prefix = ""
rows = 4
cols = 4
fig, axs = plt.subplots(rows, cols, figsize=(24, 24))
axx = axs.ravel()
seq_len, target_dim = target.shape
ps = [50.0] + [
50.0 + f * c / 2.0 for c in prediction_intervals for f in [-1.0, +1.0]
]
percentiles_sorted = sorted(set(ps))
def alpha_for_percentile(p):
return (p / 100.0) ** 0.3
for dim in range(0, min(rows * cols, target_dim)):
ax = axx[dim]

target[-2 * prediction_length :][dim].plot(ax=ax)
ps_data = [forecast.quantile(p / 100.0)[:,dim] for p in percentiles_sorted]
i_p50 = len(percentiles_sorted) // 2
p50_data = ps_data[i_p50]
p50_series = pd.Series(data=p50_data, index=forecast.index)
p50_series.plot(color=color, ls="-", label=f"{label_prefix}median", ax=ax)
for i in range(len(percentiles_sorted) // 2):
ptile = percentiles_sorted[i]
alpha = alpha_for_percentile(ptile)
ax.fill_between(
forecast.index,
ps_data[i],
ps_data[-i - 1],
facecolor=color,
alpha=alpha,
interpolate=True,
)
# Hack to create labels for the error intervals.
# Doesn't actually plot anything, because we only pass a single data point
pd.Series(data=p50_data[:1], index=forecast.index[:1]).plot(
color=color,
alpha=alpha,
linewidth=10,
label=f"{label_prefix}{100 - ptile * 2}%",
ax=ax,
)

legend = ["observations", "median prediction"] + [f"{k}% prediction interval" for k in prediction_intervals][::-1] 
axx[0].legend(legend, loc="upper left")
if fname is not None:
plt.savefig(fname, bbox_inches='tight', pad_inches=0.05)

return fig

# def preprocess_data(dataset):
# # 将数据集分为训练集和测试集，前80%作为训练集，后20%作为测试集
# train_data = dataset[:int(len(dataset) * 0.8)]
# test_data = dataset[int(len(dataset) * 0.8):]
# # 将数据集里的单变量数据转为多变量数据
# train_grouper = MultivariateGrouper(max_target_dim=min(1000, dataset.col_count))
# test_grouper = MultivariateGrouper(num_test_dates=int(len(test_data)), max_target_dim=min(1000, dataset.col_count))
# dataset_train = train_grouper(dataset=train_data)
# dataset_test = test_grouper(dataset=test_data)
# return dataset_train, dataset_test

def preprocess_data(dataset):
# 将数据集分为训练集和测试集，前80%作为训练集，后20%作为测试集
train_data = dataset[:int(len(dataset) * 0.8)]
test_data = dataset[int(len(dataset) * 0.8):]
# 将数据集里的单变量数据转为多变量数据
train_grouper = MultivariateGrouper(max_target_dim=min(1000, dataset.shape[1]))
test_grouper = MultivariateGrouper(num_test_dates=int(len(test_data)), max_target_dim=min(1000, dataset.shape[1]))
dataset_train = train_grouper(dataset=train_data)
dataset_test = test_grouper(dataset=test_data)
return dataset_train, dataset_test # 直接返回训练集和测试集

def prediction(dataset_train, dataset_test):
predictor = estimator.train(dataset_train, num_workers=8)
forecast_it, ts_it = make_evaluation_predictions(dataset=dataset_test,
predictor=predictor,
num_samples=100)
forecasts = list(forecast_it)
targets = list(ts_it)
return forecasts, targets

def generate_plot(forecasts, targets, prediction_length):
# 调用 plot() 函数生成图像，并捕获返回的 fig 对象
fig = plot(
target=targets[0],
forecast=forecasts[0],
prediction_length=prediction_length,
)
return fig

def generate_text(forecasts):
df= pd.DataFrame(forecasts)
file_path= 'prediction_data.csv'
df.to_csv(file_path,index=False)
return file_path

def rnn_change(choice):
estimator.cell_type = choice
return None
def step_change(step):
estimator.diff_steps= step
return None
def lr_change(lr):
estimator.trainer.learning_rate = lr
return None

def epoch_change(epoch):
estimator.trainer.epochs = epoch
return None

def batchsize_change(batchsize):
estimator.trainer.batch_size = batchsize
return None

def prediction_length_change(prediction_length):
estimator.prediction_length = int(prediction_length)
return None

def beta_end_change(beta_end):
estimator.beta_end = beta_end
return None

def context_length_change(context_length):
estimator.context_length = int(context_length)
return None

def freq_change(freq):
estimator.freq = freq
return None

def change_target_dim(dataset):
estimator.target_dim= dataset.col_count
return None

def load_user_data(file_obj):
# 获取上传的文件的路径
file_path = file_obj.name
# 读取文件并返回 DataFrame
ext = os.path.splitext(file_path)[1].lower()
if ext == '.csv':
df = pd.read_csv(file_path)
elif ext == '.xls' or ext == '.xlsx':
df = pd.read_excel(file_path)
elif ext == '.json':
df = pd.read_json(file_path)
else:
raise ValueError(f"Unsupported file format: {ext}")
# 返回读取的 DataFrame
return df

estimator = TimeGradEstimator(
target_dim=10, 
prediction_length=10, 
context_length=20, 
cell_type='GRU', 
input_size=1484, 
freq='d', 
loss_type='l2', 
scaling=True, 
diff_steps=100, 
beta_end=0.1, 
beta_schedule="linear", 
trainer=Trainer(
device=device,
epochs=1, 
learning_rate=1e-3, 
num_batches_per_epoch=100, 
batch_size=64, 
)
)

with gr.Blocks() as demo:
with gr.Tab(label= 'Timegrad'):
with gr.Row():
input= gr.File(label= 'Upload file')
dataset = gr.State()
input.upload(fn=load_user_data, inputs=input, outputs=dataset)
with gr.Column():
button1= gr.Button(value= 'Electricity')
button2= gr.Button(value= 'Stocks')
button3= gr.Button(value= '3')
with gr.Column():
generate_button= gr.Button(value= 'Generate')
with gr.Row():
dropdown= gr.Dropdown(choices=['Image only', 'Image and Text'], label= 'Format')

with gr.Row():
with gr.Column():
#a title
gr.Markdown('diffusion model parameters')
with gr.Row():
dropdown_rnn= gr.Dropdown(choices=['LSTM', 'GRU'], label= 'RNN')
dropdown_rnn.change(fn=rnn_change, inputs=dropdown_rnn, outputs=None)
slider_step= gr.Slider(minimum= 1, maximum= 1000, step= 10, label= 'Steps')
slider_step.change(fn=step_change, inputs=slider_step, outputs=None)
gr.Markdown('train parameters')
with gr.Row():
slider_lr= gr.Slider(minimum= 0.00001, maximum= 0.01, step= 0.00001, label= 'Learning rate')
slider_lr.change(fn=lr_change, inputs=slider_lr, outputs=None)
slider_epoch= gr.Slider(minimum= 1, maximum= 20, step= 1, label= 'training epoch')
slider_epoch.change(fn=epoch_change, inputs=slider_epoch, outputs=None)
with gr.Row():
slider_batchsize= gr.Slider(minimum= 0, maximum= 10, step= 1, label= 'batch_size' )
slider_batchsize.change(fn=batchsize_change, inputs=slider_batchsize, outputs=None)
slider_beta_end= gr.Slider(minimum= 0.00001, maximum= 0.01, step= 0.00001, label= 'beta_end')
slider_beta_end.change(fn=beta_end_change, inputs=slider_beta_end, outputs=None)
with gr.Row():
text_prediction_length= gr.Textbox(label= 'prediction length')
text_prediction_length.change(fn=prediction_length_change, inputs=text_prediction_length, outputs=None)
text_context_length= gr.Textbox(label= 'context length')
text_context_length.change(fn=context_length_change, inputs=text_context_length, outputs=None)
dropdown_freq = gr.Dropdown(choices=['D', 'H', 'M', 'S'], label='Frequency')
dropdown_freq.change(fn=freq_change, inputs=dropdown_freq, outputs=None)

with gr.Row():
with gr.Column():
prediction_img= gr.Plot()
generate_button.click(fn=change_target_dim, inputs=dataset, outputs=None)
generate_button.click(fn=preprocess_data, inputs=dataset, outputs=[gr.State(), gr.State()]) # 接收并返回输出
generate_button.click(fn=prediction, inputs=[gr.State(), gr.State()], outputs=[gr.State(), gr.State()])
generate_button.click(fn=generate_plot, inputs=[gr.State(), gr.State(), int(text_prediction_length)], outputs=prediction_img)
prediction_text= gr.File(label= 'prediction data')
generate_button.click(fn=generate_text, inputs=None, outputs= prediction_text)




demo.launch(share=True, server_port=6006) 
























#gradio ui establishment
import gradio as gr
import os
import pandas as pd

def plot(target, forecast, prediction_length, prediction_intervals=(50.0, 90.0), color='g', fname=None):
label_prefix = ""
rows = 4
cols = 4
fig, axs = plt.subplots(rows, cols, figsize=(24, 24))
axx = axs.ravel()
seq_len, target_dim = target.shape
ps = [50.0] + [
50.0 + f * c / 2.0 for c in prediction_intervals for f in [-1.0, +1.0]
]
percentiles_sorted = sorted(set(ps))
def alpha_for_percentile(p):
return (p / 100.0) ** 0.3
for dim in range(0, min(rows * cols, target_dim)):
ax = axx[dim]

target[-2 * prediction_length :][dim].plot(ax=ax)
ps_data = [forecast.quantile(p / 100.0)[:,dim] for p in percentiles_sorted]
i_p50 = len(percentiles_sorted) // 2
p50_data = ps_data[i_p50]
p50_series = pd.Series(data=p50_data, index=forecast.index)
p50_series.plot(color=color, ls="-", label=f"{label_prefix}median", ax=ax)
for i in range(len(percentiles_sorted) // 2):
ptile = percentiles_sorted[i]
alpha = alpha_for_percentile(ptile)
ax.fill_between(
forecast.index,
ps_data[i],
ps_data[-i - 1],
facecolor=color,
alpha=alpha,
interpolate=True,
)
# Hack to create labels for the error intervals.
# Doesn't actually plot anything, because we only pass a single data point
pd.Series(data=p50_data[:1], index=forecast.index[:1]).plot(
color=color,
alpha=alpha,
linewidth=10,
label=f"{label_prefix}{100 - ptile * 2}%",
ax=ax,
)

legend = ["observations", "median prediction"] + [f"{k}% prediction interval" for k in prediction_intervals][::-1] 
axx[0].legend(legend, loc="upper left")
if fname is not None:
plt.savefig(fname, bbox_inches='tight', pad_inches=0.05)

return fig

def preprocess_data(dataset):
# 将数据集分为训练集和测试集，前80%作为训练集，后20%作为测试集
# train_data = dataset[:int(len(dataset) * 0.8)]
# test_data = dataset[int(len(dataset) * 0.8):]
# 将数据集里的单变量数据转为多变量数据
# train_grouper = MultivariateGrouper(max_target_dim=min(1000, dataset.shape[1]))
# test_grouper = MultivariateGrouper(num_test_dates=int(len(test_data)), max_target_dim=min(1000, dataset.shape[1]))
train_grouper = MultivariateGrouper(max_target_dim=min(2000, int(dataset.metadata.feat_static_cat[0].cardinality)))

test_grouper = MultivariateGrouper(num_test_dates=int(len(dataset.test)/len(dataset.train)), 
max_target_dim=min(2000, int(dataset.metadata.feat_static_cat[0].cardinality)))

# dataset_train = train_grouper(dataset=train_data)
# dataset_test = test_grouper(dataset=test_data)
dataset_train = train_grouper(dataset.train)
dataset_test = test_grouper(dataset.test)
return dataset_train, dataset_test
def prediction(dataset_train, dataset_test):
predictor = estimator.train(dataset_train, num_workers=8)
forecast_it, ts_it = make_evaluation_predictions(dataset=dataset_test,
predictor=predictor,
num_samples=100)
forecasts = list(forecast_it)
targets = list(ts_it)
return forecasts, targets



def generate_plot(forecasts, targets, prediction_length):
# 调用 plot() 函数生成图像，并捕获返回的 fig 对象
fig = plot(
target=targets[0],
forecast=forecasts[0],
prediction_length=prediction_length,
)
return fig

def generate_text(forecasts):
df= pd.DataFrame(forecasts)
file_path= 'prediction_data.csv'
df.to_csv(file_path,index=False)
return file_path

def rnn_change(choice):
estimator.cell_type = choice
return None
def step_change(step):
estimator.diff_steps= step
return None
def lr_change(lr):
estimator.trainer.learning_rate = lr
return None

def epoch_change(epoch):
estimator.trainer.epochs = epoch
return None

def batchsize_change(batchsize):
estimator.trainer.batch_size = batchsize
return None

def prediction_length_change(prediction_length):
estimator.prediction_length = int(prediction_length)
return None

def beta_end_change(beta_end):
estimator.beta_end = beta_end
return None

def context_length_change(context_length):
estimator.context_length = int(context_length)
return None

def freq_change(freq):
estimator.freq = freq
return None

def change_target_dim(dataset):
estimator.target_dim= dataset.col_count
return None

def load_user_data(file_obj):
# 获取上传的文件的路径
file_path = file_obj.name
# 读取文件并返回 DataFrame
ext = os.path.splitext(file_path)[1].lower()
if ext == '.csv':
df = pd.read_csv(file_path)
elif ext == '.xls' or ext == '.xlsx':
df = pd.read_excel(file_path)
elif ext == '.json':
df = pd.read_json(file_path)
else:
raise ValueError(f"Unsupported file format: {ext}")
# 返回读取的 DataFrame
return df

# estimator = TimeGradEstimator(
# target_dim=10, 
# prediction_length=10, 
# context_length=20, 
# cell_type='GRU', 
# input_size=1484, 
# freq='d', 
# loss_type='l2', 
# scaling=True, 
# diff_steps=100, 
# beta_end=0.1, 
# beta_schedule="linear", 
# trainer=Trainer(
# device=device,
# epochs=1, 
# learning_rate=1e-3, 
# num_batches_per_epoch=100, 
# batch_size=64, 
# )
# )
dataset = get_dataset("electricity_nips", regenerate=False)
estimator = TimeGradEstimator(
target_dim=int(dataset.metadata.feat_static_cat[0].cardinality),
prediction_length=dataset.metadata.prediction_length,
context_length=dataset.metadata.prediction_length,
cell_type='GRU',
input_size=1484,
freq=dataset.metadata.freq,
loss_type='l2',
scaling=True,
diff_steps=100,
beta_end=0.1,
beta_schedule="linear",
trainer=Trainer(device=device,
epochs=20,
learning_rate=1e-3,
num_batches_per_epoch=100,
batch_size=64,)
)

def full_pipeline(file_obj, prediction_length):
# 完整的数据处理流程
# dataset = load_user_data(file_obj)
dataset = get_dataset("electricity_nips", regenerate=False)
dataset_train, dataset_test = preprocess_data(dataset)
forecasts, targets = prediction(dataset_train, dataset_test)
fig = generate_plot(forecasts, targets, prediction_length)
return fig

def full_pipeline_text(file_obj, prediction_length):
# 完整的数据处理流程
dataset = load_user_data(file_obj)
dataset_train, dataset_test = preprocess_data(dataset)
forecasts, targets = prediction(dataset_train, dataset_test)
text_book = generate_text(forecasts)
return text_book

with gr.Blocks() as demo:
with gr.Tab(label= 'Timegrad'):
with gr.Row():
input= gr.File(label= 'Upload file')
with gr.Column():
button1= gr.Button(value= 'Electricity')
button2= gr.Button(value= 'Stocks')
button3= gr.Button(value= '3')
with gr.Column():
generate_button= gr.Button(value= 'Generate')
with gr.Row():
dropdown= gr.Dropdown(choices=['Image only', 'Image and Text'], label= 'Format')

with gr.Row():
with gr.Column():
#a title
gr.Markdown('diffusion model parameters')
with gr.Row():
dropdown_rnn= gr.Dropdown(choices=['LSTM', 'GRU'], label= 'RNN')
dropdown_rnn.change(fn=rnn_change, inputs=dropdown_rnn, outputs=None)
slider_step= gr.Slider(minimum= 1, maximum= 1000, step= 10, label= 'Steps')
slider_step.change(fn=step_change, inputs=slider_step, outputs=None)
gr.Markdown('train parameters')
with gr.Row():
slider_lr= gr.Slider(minimum= 0.00001, maximum= 0.01, step= 0.00001, label= 'Learning rate')
slider_lr.change(fn=lr_change, inputs=slider_lr, outputs=None)
slider_epoch= gr.Slider(minimum= 1, maximum= 20, step= 1, label= 'training epoch')
slider_epoch.change(fn=epoch_change, inputs=slider_epoch, outputs=None)
with gr.Row():
slider_batchsize= gr.Slider(minimum= 0, maximum= 10, step= 1, label= 'batch_size' )
slider_batchsize.change(fn=batchsize_change, inputs=slider_batchsize, outputs=None)
slider_beta_end= gr.Slider(minimum= 0.00001, maximum= 0.01, step= 0.00001, label= 'beta_end')
slider_beta_end.change(fn=beta_end_change, inputs=slider_beta_end, outputs=None)
with gr.Row():
text_prediction_length= gr.Slider(minimum= 1, maximum= 1000, step= 10, label= 'prediction length')
text_prediction_length.change(fn=prediction_length_change, inputs=text_prediction_length, outputs=None)
text_context_length= gr.Slider(minimum= 1, maximum= 1000, step= 10, label= 'context length')
text_context_length.change(fn=context_length_change, inputs=text_context_length, outputs=None)
dropdown_freq = gr.Dropdown(choices=['D', 'H', 'M', 'S'], label='Frequency')
dropdown_freq.change(fn=freq_change, inputs=dropdown_freq, outputs=None)

with gr.Row():
with gr.Column():
prediction_img= gr.Plot()
generate_button.click(fn=full_pipeline, inputs=[input, text_prediction_length], outputs=prediction_img)
prediction_text= gr.File(label= 'prediction data')
generate_button.click(fn=full_pipeline_text, inputs=[input, text_prediction_length], outputs= prediction_text)




demo.launch(share=True, server_port=6006) 
