In [1]:
"""Part 1:Data generation"""

import os
import random
import csv
import time  # 用于记录时间
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor

# 生成随机时间函数
def random_time(start, end):
    delta = end - start
    random_seconds = random.randint(0, int(delta.total_seconds()))
    return start + timedelta(seconds=random_seconds)

# 生成部分数据集的函数，每个线程执行这个函数
def generate_partial_data(num_clients, num_calls, thread_id, used_pairs, output_dir):
    clients = list(range(1, num_clients + 1))  # 客户ID从1开始
    start_date = datetime(year=2024, month=1, day=1)
    end_date = datetime(year=2024, month=12, day=31)

    # 临时文件名使用线程ID来区分，保存到指定目录
    partial_filename = os.path.join(output_dir, f'temp_data_part_{thread_id}.csv')
    
    with open(partial_filename, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        for _ in range(num_calls):
            while True:
                client1 = random.choice(clients)
                client2 = random.choice(clients)
                if client1 != client2:
                    # 使用frozenset确保客户顺序无关
                    pair = frozenset([client1, client2])
                    if pair not in used_pairs:
                        used_pairs.add(pair)  # 记录该对已经使用
                        break

            # 随机生成开始时间和持续时间（1分钟到300分钟）
            start_time = random_time(start_date, end_date)
            duration_minutes = random.randint(1, 300)  # 通话持续时间随机为1到300分钟
            end_time = start_time + timedelta(minutes=duration_minutes)

            # 转换为所需的时间格式 YYMMDDHHMM
            start_str = start_time.strftime("%y%m%d%H%M")
            end_str = end_time.strftime("%y%m%d%H%M")

            # 逐条写入CSV文件
            writer.writerow([client1, client2, start_str, end_str])

    return partial_filename

# 主函数：使用多线程生成完整数据集
def generate_synthetic_calls_multithreaded(num_clients, total_num_calls, num_threads, output_dir, output_filename):
    calls_per_thread = total_num_calls // num_threads
    used_pairs = set()  # 存储已经生成过的客户对（无序）

    # 检查输出目录是否存在，如果不存在则创建
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 记录开始时间
    start_time = time.time()

    # 创建一个线程池，并行生成数据
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = []
        for i in range(num_threads):
            futures.append(executor.submit(generate_partial_data, num_clients, calls_per_thread, i, used_pairs, output_dir))
        
        # 收集所有生成的临时文件
        partial_files = [future.result() for future in futures]

    # 使用用户指定的文件名保存最终输出文件
    output_file = os.path.join(output_dir, output_filename)
    with open(output_file, 'w', newline='') as outfile:
        writer = csv.writer(outfile)
        #writer.writerow(['Client1', 'Client2', 'Start_Time', 'End_Time'])  # 添加表头
        for partial_file in partial_files:
            with open(partial_file, 'r') as infile:
                reader = csv.reader(infile)
                for row in reader:
                    writer.writerow(row)

    # 删除临时文件
    for partial_file in partial_files:
        os.remove(partial_file)

    # 记录结束时间
    end_time = time.time()

    # 计算并输出运行时间
    total_time = end_time - start_time
    print(f"数据生成完毕，已保存到 {output_file}")
    print(f"程序运行时间为 {total_time:.2f} 秒")

# 调用函数生成数据集，指定输出路径和文件名
generate_synthetic_calls_multithreaded(
    num_clients=800000, 
    total_num_calls=100000000, 
    num_threads=24, 
    output_dir="D:/Data_intensive_system_project/Data", 
    output_filename="5.csv"
)



数据生成完毕，已保存到 D:/Data_intensive_system_project/Data\5.csv
程序运行时间为 1834.08 秒


In [5]:
import os
import random
import time  # 用于记录时间
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
import pandas as pd

# 生成随机时间函数
def random_time(start, end):
    delta = end - start
    random_seconds = random.randint(0, int(delta.total_seconds()))
    return start + timedelta(seconds=random_seconds)

# 生成部分数据集的函数，每个线程执行这个函数
def generate_partial_data(num_clients, num_calls, thread_id, used_pairs, output_dir):
    clients = list(range(1, num_clients + 1))  # 客户ID从1开始
    start_date = datetime(year=2024, month=1, day=1)
    end_date = datetime(year=2024, month=12, day=31)

    # 存储数据的列表
    data = []

    for _ in range(num_calls):
        while True:
            client1 = random.choice(clients)
            client2 = random.choice(clients)
            if client1 != client2:
                # 使用frozenset确保客户顺序无关
                pair = frozenset([client1, client2])
                if pair not in used_pairs:
                    used_pairs.add(pair)  # 记录该对已经使用
                    break

        # 随机生成开始时间和持续时间（1分钟到300分钟）
        start_time = random_time(start_date, end_date)
        duration_minutes = random.randint(1, 300)  # 通话持续时间随机为1到300分钟
        end_time = start_time + timedelta(minutes=duration_minutes)

        # 转换为所需的时间格式 YYMMDDHHMM
        start_str = start_time.strftime("%y%m%d%H%M")
        end_str = end_time.strftime("%y%m%d%H%M")

        # 将数据加入列表
        data.append([client1, client2, start_str, end_str])

    # 将数据转换为 pandas DataFrame
    df = pd.DataFrame(data, columns=["client1", "client2", "start_time", "end_time"])

    # 将 DataFrame 保存为 Parquet 文件
    partial_filename = os.path.join(output_dir, f'temp_data_part_{thread_id}.parquet')
    df.to_parquet(partial_filename, index=False)

    return partial_filename

# 主函数：使用多线程生成完整数据集
def generate_synthetic_calls_multithreaded(num_clients, total_num_calls, num_threads, output_dir, output_filename):
    calls_per_thread = total_num_calls // num_threads
    used_pairs = set()  # 存储已经生成过的客户对（无序）

    # 检查输出目录是否存在，如果不存在则创建
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 记录开始时间
    start_time = time.time()

    # 创建一个线程池，并行生成数据
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = []
        for i in range(num_threads):
            futures.append(executor.submit(generate_partial_data, num_clients, calls_per_thread, i, used_pairs, output_dir))
        
        # 收集所有生成的临时文件
        partial_files = [future.result() for future in futures]

    # 合并所有临时文件为一个完整的Parquet文件
    dataframes = []
    for partial_file in partial_files:
        df = pd.read_parquet(partial_file)
        dataframes.append(df)

    # 合并所有的 DataFrame
    final_df = pd.concat(dataframes)

    # 将合并的数据保存为最终的 Parquet 文件
    output_file = os.path.join(output_dir, output_filename)
    final_df.to_parquet(output_file, index=False)

    # 删除临时文件
    for partial_file in partial_files:
        os.remove(partial_file)

    # 记录结束时间
    end_time = time.time()

    # 计算并输出运行时间
    total_time = end_time - start_time
    print(f"数据生成完毕，已保存到 {output_file}")
    print(f"程序运行时间为 {total_time:.2f} 秒")

# 调用函数生成数据集，指定输出路径和文件名
generate_synthetic_calls_multithreaded(
    num_clients=1000000, 
    total_num_calls=100000000, 
    num_threads=32, 
    output_dir="D:/Data_intensive_system_project/Data", 
    output_filename="final_data.parquet"
)


数据生成完毕，已保存到 D:/Data_intensive_system_project/Data\final_data.parquet
程序运行时间为 833.39 秒


D:\Anaconda3\envs\Data_intensive_system\python.exe
