# テストデータ作成

# テストデータ：パターン１
## 目的
- グラフデータの元となるCSVを作成

## 作成CSVデータ
- CSVのヘッダー情報は以下
  - タイムスタンプ
  - 回線番号
  - 下りのトラヒック量(Byte)
  - 上りのトラヒック量(Byte)
  - 下りの廃棄量(Packets)
  - 下りの廃棄量(Byte)
- タイムスタンプは5分間隔で1日(24時間)分　※日付は指定できる
- タイムスタンプは `"%Y-%m-%d %H:%M:%S"` の形式
- 回線番号は `AA00-00-1001`～`ZZ00-00-1001`
- 下りのトラヒック量(Byte)と上りのトラヒック量(Byte)は100Mbps～10Gbps分の範囲でランダム

In [1]:
import csv
import string
import random
import os
from datetime import datetime, timedelta

# --- 設定 (Constants) ---
CONFIG = {
    "FILENAME": "data/raw/sample-traffic-01-01.csv",
    "START_DATE": "2025-02-08", # 任意の日付を指定
    "INTERVAL_SEC": 300,        # 5分
    "ITERATIONS": 288,          # 288回 = 24時間 (最後は23:55)
    "MIN_BPS": 100 * 10**6,     # 100Mbps
    "MAX_BPS": 10 * 10**9,      # 10Gbps
    "AVG_PACKET_SIZE": 1500,    # MTU想定
    "DROP_PROBABILITY": 0.2,    # 廃棄が発生する確率
}

def calculate_drops(down_traffic):
    """下りトラフィックに基づき廃棄量を計算する"""
    if random.random() < CONFIG["DROP_PROBABILITY"]:
        drop_rate = random.uniform(0.1, 0.2)
        drop_bytes = int(down_traffic * drop_rate)
        drop_pkts = drop_bytes // CONFIG["AVG_PACKET_SIZE"]
        return drop_pkts, drop_bytes
    return 0, 0

def generate_full_data():
    # 単位変換: bps -> Bytes
    min_byte = CONFIG["MIN_BPS"] // 8
    max_byte = CONFIG["MAX_BPS"] // 8

    # 保存先ディレクトリの作成
    os.makedirs(os.path.dirname(CONFIG["FILENAME"]), exist_ok=True)

    # 各パイプの初期状態 (A-Z)
    pipes = [f"{l}{l}00-00-1001" for l in string.ascii_uppercase]
    pipe_states = {name: {
        "down": random.randint(min_byte, max_byte // 2),
        "up": random.randint(min_byte, max_byte // 2)
    } for name in pipes}

    # ★【変更箇所】開始時間を当日の 00:00:00 に固定
    base_time = datetime.strptime(CONFIG["START_DATE"], "%Y-%m-%d")

    header = ["タイムスタンプ", "回線番号", "下りのトラヒック量(Byte)", 
              "上りのトラヒック量(Byte)", "下りの廃棄量(Packets)", "下りの廃棄量(Byte)"]

    try:
        with open(CONFIG["FILENAME"], mode='w', encoding='utf-8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(header)
            
            for i in range(CONFIG["ITERATIONS"]):
                # i=0 のとき 00:00:00, i=287 のとき 23:55:00
                current_time = base_time + timedelta(seconds=CONFIG["INTERVAL_SEC"] * i)
                timestamp_str = current_time.strftime("%Y-%m-%d %H:%M:%S")
                
                for name in pipes:
                    state = pipe_states[name]
                    
                    # トラフィック変動
                    for key in ["down", "up"]:
                        variation = random.uniform(0.9, 1.1)
                        state[key] = int(state[key] * variation)
                        state[key] = max(min_byte, min(max_byte, state[key]))
                    
                    # 廃棄量の計算
                    drop_pkts, drop_bytes = calculate_drops(state["down"])
                    
                    writer.writerow([
                        timestamp_str, name, state["down"], 
                        state["up"], drop_pkts, drop_bytes
                    ])
        
        print(f"✅ CSV作成完了: {CONFIG['FILENAME']}")

    except IOError as e:
        print(f"❌ ファイル書き込みエラー: {e}")

if __name__ == "__main__":
    generate_full_data()

✅ CSV作成完了: data/raw/sample-traffic-01-01.csv


In [2]:
import csv
import string
import random
import os
from datetime import datetime, timedelta

# --- 設定 (Constants) ---
CONFIG = {
    "FILENAME": "data/raw/sample-traffic-01-02.csv",
    "START_DATE": "2025-02-07", # 任意の日付を指定
    "INTERVAL_SEC": 300,        # 5分
    "ITERATIONS": 288,          # 288回 = 24時間 (最後は23:55)
    "MIN_BPS": 100 * 10**6,     # 100Mbps
    "MAX_BPS": 10 * 10**9,      # 10Gbps
    "AVG_PACKET_SIZE": 1500,    # MTU想定
    "DROP_PROBABILITY": 0.2,    # 廃棄が発生する確率
}

def calculate_drops(down_traffic):
    """下りトラフィックに基づき廃棄量を計算する"""
    if random.random() < CONFIG["DROP_PROBABILITY"]:
        drop_rate = random.uniform(0.1, 0.2)
        drop_bytes = int(down_traffic * drop_rate)
        drop_pkts = drop_bytes // CONFIG["AVG_PACKET_SIZE"]
        return drop_pkts, drop_bytes
    return 0, 0

def generate_full_data():
    # 単位変換: bps -> Bytes
    min_byte = CONFIG["MIN_BPS"] // 8
    max_byte = CONFIG["MAX_BPS"] // 8

    # 保存先ディレクトリの作成
    os.makedirs(os.path.dirname(CONFIG["FILENAME"]), exist_ok=True)

    # 各パイプの初期状態 (A-Z)
    pipes = [f"{l}{l}00-00-1001" for l in string.ascii_uppercase]
    pipe_states = {name: {
        "down": random.randint(min_byte, max_byte // 2),
        "up": random.randint(min_byte, max_byte // 2)
    } for name in pipes}

    # ★【変更箇所】開始時間を当日の 00:00:00 に固定
    base_time = datetime.strptime(CONFIG["START_DATE"], "%Y-%m-%d")

    header = ["タイムスタンプ", "回線番号", "下りのトラヒック量(Byte)", 
              "上りのトラヒック量(Byte)", "下りの廃棄量(Packets)", "下りの廃棄量(Byte)"]

    try:
        with open(CONFIG["FILENAME"], mode='w', encoding='utf-8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(header)
            
            for i in range(CONFIG["ITERATIONS"]):
                # i=0 のとき 00:00:00, i=287 のとき 23:55:00
                current_time = base_time + timedelta(seconds=CONFIG["INTERVAL_SEC"] * i)
                timestamp_str = current_time.strftime("%Y-%m-%d %H:%M:%S")
                
                for name in pipes:
                    state = pipe_states[name]
                    
                    # トラフィック変動
                    for key in ["down", "up"]:
                        variation = random.uniform(0.9, 1.1)
                        state[key] = int(state[key] * variation)
                        state[key] = max(min_byte, min(max_byte, state[key]))
                    
                    # 廃棄量の計算
                    drop_pkts, drop_bytes = calculate_drops(state["down"])
                    
                    writer.writerow([
                        timestamp_str, name, state["down"], 
                        state["up"], drop_pkts, drop_bytes
                    ])
        
        print(f"✅ CSV作成完了: {CONFIG['FILENAME']}")

    except IOError as e:
        print(f"❌ ファイル書き込みエラー: {e}")

if __name__ == "__main__":
    generate_full_data()

✅ CSV作成完了: data/raw/sample-traffic-01-02.csv


# テストデータ：パターン２
## 目的
- グラフデータの元となるCSVを作成

## 作成CSVデータ
- CSVのヘッダー情報は以下
  - timestamp    # テストデータ（パターン１）のタイムスタンプ
  - code         # テストデータ（パターン１）の回線番号の頭から7文字
  - identifer 　 # テストデータ（パターン１）の回線番号の末から4文字
  - volume_in    # テストデータ（パターン１）の下りのトラヒック量(Byte)
  - volume_out   # テストデータ（パターン１）の上りのトラヒック量(Byte)
- 値はダブルクォーテーション`"` で囲う
- timestamp は5分間隔で1日(24時間)分　※日付は指定できる
- timestamp は `"%Y%m%d%H%M%S"` の形式
- code は `AA00-00`～`ZZ00-00`
- volume_in と volume_out は100Mbps～10Gbps分の範囲でランダム

In [3]:
import csv
import string
import random
import os
from datetime import datetime, timedelta

# --- 設定 (Constants) ---
CONFIG = {
    "FILENAME": "data/raw/sample-traffic-02.csv",
    "START_DATE": "2025-02-07",
    "INTERVAL_SEC": 300,
    "ITERATIONS": 288,
    "MIN_BPS": 100 * 10**6,
    "MAX_BPS": 10 * 10**9,
    "AVG_PACKET_SIZE": 1500,
    "DROP_PROBABILITY": 0.2,
}

def generate_full_data():
    min_byte = CONFIG["MIN_BPS"] // 8
    max_byte = CONFIG["MAX_BPS"] // 8

    os.makedirs(os.path.dirname(CONFIG["FILENAME"]), exist_ok=True)

    pipes = [f"{l}{l}00-00" for l in string.ascii_uppercase]
    pipe_states = {name: {
        "down": random.randint(min_byte, max_byte // 2),
        "up": random.randint(min_byte, max_byte // 2)
    } for name in pipes}

    base_time = datetime.strptime(CONFIG["START_DATE"], "%Y-%m-%d")
    header = ["timestamp", "code", "identifer","volume_in", "volume_out"]

    try:
        with open(CONFIG["FILENAME"], mode='w', encoding='utf-8', newline='') as f:
            # 冒頭の説明文
            f.write(f"# Traffic Data Report\n")
            f.write(f"# Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"# --------------------------------------------------\n")
            
            # ★ quoting=csv.QUOTE_ALL を追加することで全項目を "" で囲む
            writer = csv.writer(f, quoting=csv.QUOTE_ALL)
            writer.writerow(header)
            
            for i in range(CONFIG["ITERATIONS"]):
                current_time = base_time + timedelta(seconds=CONFIG["INTERVAL_SEC"] * i)
                timestamp_str = current_time.strftime("%Y%m%d%H%M%S")
                
                for name in pipes:
                    state = pipe_states[name]
                    for key in ["down", "up"]:
                        variation = random.uniform(0.9, 1.1)
                        state[key] = int(state[key] * variation)
                        state[key] = max(min_byte, min(max_byte, state[key]))
                    
                    identifer = "1001"
                    writer.writerow([
                        timestamp_str, name, identifer, state["down"], state["up"]
                    ])
        
        print(f"✅ CSV作成完了: {CONFIG['FILENAME']}")

    except IOError as e:
        print(f"❌ ファイル書き込みエラー: {e}")

if __name__ == "__main__":
    generate_full_data()

✅ CSV作成完了: data/raw/sample-traffic-02.csv
