# 第1课：Python基础与数据结构
# Lesson 1: Python Basics & Data Structures

本notebook演示Python基础概念和NumPy操作  
This notebook demonstrates basic Python concepts and NumPy operations

---

## 学习目标 (Learning Objectives)

- 掌握Python基础数据结构的使用
- 理解函数定义和参数传递机制
- 掌握numpy库的基本操作
- 为后续机器学习课程打下坚实基础

In [None]:
# 导入必要的库 (Import necessary libraries)
import numpy as np
from collections import Counter
import random
import matplotlib.pyplot as plt

print("=== 第1课：Python基础与数据结构 ===")
print("=== Lesson 1: Python Basics & Data Structures ===")

## 1. 数据结构演示 (Data Structures Demo)

Python提供了丰富的内置数据结构，每种都有其特定的用途和优势。

In [None]:
# 列表操作 (List operations)
ip_addresses = ["192.168.1.1", "10.0.0.1", "172.16.0.1"]
print(f"原始IP列表 (Original IP list): {ip_addresses}")

# 添加新IP
ip_addresses.append("192.168.1.100")
ip_addresses.extend(["10.0.0.50", "192.168.1.1"])  # 重复IP
print(f"添加后的IP列表 (After adding): {ip_addresses}")

# 列表的常用操作
print(f"列表长度 (List length): {len(ip_addresses)}")
print(f"第一个IP (First IP): {ip_addresses[0]}")
print(f"最后一个IP (Last IP): {ip_addresses[-1]}")

In [None]:
# 字典存储网络设备信息 (Dictionary for network device info)
network_device = {
    "hostname": "router-01",
    "ip": "192.168.1.1",
    "type": "router",
    "ports": [22, 23, 80, 443],
    "status": "active"
}

print("网络设备信息 (Network device info):")
for key, value in network_device.items():
    print(f"  {key}: {value}")

# 字典的安全访问
print(f"\n设备类型 (Device type): {network_device.get('type', 'Unknown')}")
print(f"设备位置 (Device location): {network_device.get('location', 'Not specified')}")

In [None]:
# 集合去重IP地址 (Set for unique IP addresses)
unique_ips = set(ip_addresses)
print(f"去重后的IP (Unique IPs): {unique_ips}")
print(f"原始列表有{len(ip_addresses)}个IP，去重后有{len(unique_ips)}个")

# 集合运算示例
internal_ips = {"192.168.1.1", "10.0.0.1", "172.16.0.1"}
external_ips = {"8.8.8.8", "1.1.1.1", "192.168.1.1"}

print(f"\n内部IP (Internal IPs): {internal_ips}")
print(f"外部IP (External IPs): {external_ips}")
print(f"交集 (Intersection): {internal_ips & external_ips}")
print(f"并集 (Union): {internal_ips | external_ips}")

In [None]:
# 元组存储网络连接信息 (Tuple for connection info)
connections = [
    ("192.168.1.10", "8.8.8.8", 80, "TCP"),
    ("10.0.0.5", "1.1.1.1", 443, "TCP"),
    ("192.168.1.20", "8.8.4.4", 53, "UDP")
]

print("网络连接信息 (Network connections):")
for i, connection in enumerate(connections, 1):
    src_ip, dst_ip, port, protocol = connection  # 元组解包
    print(f"  连接{i}: {src_ip} -> {dst_ip}:{port} ({protocol})")

# 元组的不可变性
print(f"\n第一个连接的源IP: {connections[0][0]}")
# connections[0][0] = "new_ip"  # 这会报错，因为元组不可变

## 2. 函数定义示例 (Function Definition Examples)

函数是代码复用和模块化的基础，我们来看几个与网络安全相关的函数示例。

In [None]:
def is_private_ip(ip):
    """
    检查是否为私有IP地址 (Check if IP is private)
    
    Args:
        ip (str): IP地址字符串
    
    Returns:
        bool: 如果是私有IP返回True，否则False
    """
    private_ranges = [
        "192.168.", "10.", "172.16.", "172.17.", "172.18.",
        "172.19.", "172.20.", "172.21.", "172.22.", "172.23.",
        "172.24.", "172.25.", "172.26.", "172.27.", "172.28.",
        "172.29.", "172.30.", "172.31."
    ]
    return any(ip.startswith(range_prefix) for range_prefix in private_ranges)

# 测试函数
test_ips = ["192.168.1.1", "8.8.8.8", "10.0.0.1", "203.0.113.1"]
for ip in test_ips:
    ip_type = "私有IP (Private)" if is_private_ip(ip) else "公有IP (Public)"
    print(f"{ip}: {ip_type}")

In [None]:
def analyze_port(port, protocol="TCP"):
    """
    分析端口信息 (Analyze port information)
    
    Args:
        port (int): 端口号
        protocol (str): 协议类型，默认为TCP
    
    Returns:
        str: 端口分析结果
    """
    common_ports = {
        22: "SSH", 23: "Telnet", 25: "SMTP", 53: "DNS",
        80: "HTTP", 443: "HTTPS", 21: "FTP", 993: "IMAPS",
        3389: "RDP", 1433: "MSSQL", 3306: "MySQL"
    }
    
    service = common_ports.get(port, "Unknown")
    risk_level = "高风险" if port in [22, 23, 3389, 1433] else "常规"
    
    return f"端口 {port}/{protocol} - 服务: {service} - 风险级别: {risk_level}"

# 测试不同端口
test_ports = [22, 80, 443, 1433, 8080]
for port in test_ports:
    print(analyze_port(port))

In [None]:
def calculate_network_stats(*traffic_values, **kwargs):
    """
    计算网络流量统计 (Calculate network traffic statistics)
    
    Args:
        *traffic_values: 可变数量的流量值
        **kwargs: 可选参数，如unit（单位）
    
    Returns:
        dict: 统计结果字典
    """
    if not traffic_values:
        return "无数据 (No data)"
    
    unit = kwargs.get('unit', 'bytes')
    
    total = sum(traffic_values)
    average = total / len(traffic_values)
    maximum = max(traffic_values)
    minimum = min(traffic_values)
    
    return {
        f"总流量 ({unit})": total,
        f"平均流量 ({unit})": round(average, 2),
        f"最大流量 ({unit})": maximum,
        f"最小流量 ({unit})": minimum,
        "数据点数量": len(traffic_values)
    }

# 测试可变参数函数
stats1 = calculate_network_stats(100, 250, 180, 90, 300, 150)
stats2 = calculate_network_stats(1024, 2048, 512, unit='KB')

print("流量统计1 (Traffic stats 1):")
for key, value in stats1.items():
    print(f"  {key}: {value}")

print("\n流量统计2 (Traffic stats 2):")
for key, value in stats2.items():
    print(f"  {key}: {value}")

## 3. NumPy操作演示 (NumPy Operations Demo)

NumPy是Python科学计算的核心库，在机器学习中扮演着重要角色。

In [None]:
# 数组创建 (Array Creation)
print("=== 数组创建 (Array Creation) ===")

# 从列表创建
traffic_data = np.array([120, 85, 200, 156, 90, 175, 210, 95, 180, 145])
print(f"流量数据 (Traffic data): {traffic_data}")
print(f"数据类型: {traffic_data.dtype}")

# 创建特殊数组
zeros = np.zeros(5)
ones = np.ones(5)
range_array = np.arange(0, 10, 2)
linspace_array = np.linspace(0, 100, 5)

print(f"零数组: {zeros}")
print(f"全一数组: {ones}")
print(f"范围数组: {range_array}")
print(f"等间距数组: {linspace_array}")

In [None]:
# 随机数生成 (Random number generation)
print("\n=== 随机数生成 (Random Number Generation) ===")

np.random.seed(42)  # 设置随机种子以保证结果可重现

# 不同分布的随机数
uniform_random = np.random.random(10)  # 0-1均匀分布
normal_random = np.random.normal(100, 20, 10)  # 正态分布
int_random = np.random.randint(1, 100, 10)  # 整数随机数

print(f"均匀分布随机数: {np.round(uniform_random, 3)}")
print(f"正态分布随机数: {np.round(normal_random, 2)}")
print(f"整数随机数: {int_random}")

# 模拟网络响应时间（正态分布）
response_times = np.random.normal(50, 15, 100)  # 平均50ms，标准差15ms
print(f"\n模拟响应时间统计:")
print(f"  平均值: {np.mean(response_times):.2f}ms")
print(f"  标准差: {np.std(response_times):.2f}ms")

In [None]:
# 数组属性和形状操作 (Array properties and shape operations)
print("=== 数组属性 (Array Properties) ===")

# 创建二维数组
matrix = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]])
print(f"矩阵:\n{matrix}")
print(f"形状 (Shape): {matrix.shape}")
print(f"维度 (Dimensions): {matrix.ndim}")
print(f"元素总数 (Size): {matrix.size}")
print(f"数据类型 (Data type): {matrix.dtype}")

# 形状变换
reshaped = matrix.reshape(4, 3)
flattened = matrix.flatten()

print(f"\n重塑后 (Reshaped 4x3):\n{reshaped}")
print(f"扁平化 (Flattened): {flattened}")

In [None]:
# 数学运算 (Mathematical operations)
print("=== 数学运算 (Mathematical Operations) ===")

# 基础运算
a = np.array([1, 2, 3, 4, 5])
b = np.array([6, 7, 8, 9, 10])

print(f"数组a: {a}")
print(f"数组b: {b}")
print(f"a + b = {a + b}")
print(f"a * b = {a * b}")
print(f"a ** 2 = {a ** 2}")
print(f"sqrt(a) = {np.sqrt(a)}")

# 广播机制
print(f"\na + 10 = {a + 10}")  # 标量广播
print(f"a * 2 = {a * 2}")

In [None]:
# 统计函数 (Statistical functions)
print("=== 统计函数 (Statistical Functions) ===")

# 使用之前的流量数据
print(f"流量数据: {traffic_data}")
print(f"均值 (Mean): {np.mean(traffic_data):.2f}")
print(f"中位数 (Median): {np.median(traffic_data):.2f}")
print(f"标准差 (Std Dev): {np.std(traffic_data):.2f}")
print(f"方差 (Variance): {np.var(traffic_data):.2f}")
print(f"最大值 (Maximum): {np.max(traffic_data)}")
print(f"最小值 (Minimum): {np.min(traffic_data)}")
print(f"总和 (Sum): {np.sum(traffic_data)}")

# 百分位数
percentiles = np.percentile(traffic_data, [25, 50, 75, 95])
print(f"百分位数 [25%, 50%, 75%, 95%]: {percentiles}")

In [None]:
# 数组索引和切片 (Array indexing and slicing)
print("=== 数组索引和切片 (Indexing and Slicing) ===")

print(f"原数组: {traffic_data}")
print(f"第一个元素: {traffic_data[0]}")
print(f"最后一个元素: {traffic_data[-1]}")
print(f"前5个元素: {traffic_data[:5]}")
print(f"后5个元素: {traffic_data[-5:]}")
print(f"每隔2个元素: {traffic_data[::2]}")

# 条件索引
high_traffic_mask = traffic_data > 150
high_traffic_values = traffic_data[high_traffic_mask]

print(f"\n高流量掩码: {high_traffic_mask}")
print(f"高流量值 (>150): {high_traffic_values}")
print(f"高流量值的索引: {np.where(traffic_data > 150)[0]}")

In [None]:
# 矩阵运算 (Matrix operations)
print("=== 矩阵运算 (Matrix Operations) ===")

# 创建两个矩阵
matrix_a = np.array([[1, 2], [3, 4]])
matrix_b = np.array([[5, 6], [7, 8]])

print(f"矩阵A:\n{matrix_a}")
print(f"矩阵B:\n{matrix_b}")

# 矩阵运算
print(f"\nA + B:\n{matrix_a + matrix_b}")
print(f"A * B (元素级乘法):\n{matrix_a * matrix_b}")
print(f"A @ B (矩阵乘法):\n{matrix_a @ matrix_b}")
print(f"A的转置:\n{matrix_a.T}")

# 矩阵的其他操作
try:
    inverse_a = np.linalg.inv(matrix_a)
    print(f"A的逆矩阵:\n{inverse_a}")
    print(f"验证 A @ A^(-1):\n{np.round(matrix_a @ inverse_a, 10)}")
except np.linalg.LinAlgError:
    print("矩阵A不可逆")

# 行列式
det_a = np.linalg.det(matrix_a)
print(f"A的行列式: {det_a}")

## 4. 网络安全场景应用 (Security Data Application)

将所学知识应用到实际的网络安全数据处理场景中。

In [None]:
# 模拟网络日志数据 (Simulate network log data)
print("=== 网络安全数据分析 (Security Data Analysis) ===")

# 创建模拟日志数据
log_entries = [
    {"ip": "192.168.1.10", "action": "login", "status": "success", "timestamp": "10:00:01", "bytes": 1024},
    {"ip": "192.168.1.10", "action": "view", "status": "success", "timestamp": "10:00:15", "bytes": 2048},
    {"ip": "203.0.113.1", "action": "login", "status": "failed", "timestamp": "10:01:02", "bytes": 512},
    {"ip": "203.0.113.1", "action": "login", "status": "failed", "timestamp": "10:01:05", "bytes": 512},
    {"ip": "10.0.0.5", "action": "download", "status": "success", "timestamp": "10:02:30", "bytes": 5120},
    {"ip": "203.0.113.1", "action": "login", "status": "failed", "timestamp": "10:01:08", "bytes": 512},
    {"ip": "192.168.1.20", "action": "upload", "status": "success", "timestamp": "10:03:00", "bytes": 3072},
]

print(f"总日志条目数: {len(log_entries)}")
print("\n前3条日志:")
for i, entry in enumerate(log_entries[:3]):
    print(f"  {i+1}. {entry}")

In [None]:
# 分析登录失败模式 (Analyze failed login patterns)
failed_logins = [entry for entry in log_entries 
                if entry["action"] == "login" and entry["status"] == "failed"]

print("登录失败记录分析:")
print(f"总失败次数: {len(failed_logins)}")

for i, entry in enumerate(failed_logins, 1):
    print(f"  失败{i}: IP={entry['ip']}, 时间={entry['timestamp']}")

# 统计每个IP的失败次数
failed_by_ip = Counter(entry["ip"] for entry in failed_logins)
print(f"\n按IP统计失败次数: {dict(failed_by_ip)}")

In [None]:
# 使用NumPy进行数据分析
print("=== 使用NumPy分析网络数据 ===")

# 提取数值特征
bytes_data = np.array([entry["bytes"] for entry in log_entries])
print(f"字节数数据: {bytes_data}")

# 统计分析
print(f"\n字节传输统计:")
print(f"  平均字节数: {np.mean(bytes_data):.2f}")
print(f"  中位数: {np.median(bytes_data):.2f}")
print(f"  标准差: {np.std(bytes_data):.2f}")
print(f"  总传输量: {np.sum(bytes_data)} bytes")

# 异常检测（简单的统计方法）
mean_bytes = np.mean(bytes_data)
std_bytes = np.std(bytes_data)
threshold = mean_bytes + 2 * std_bytes  # 2倍标准差阈值

anomalous_indices = np.where(bytes_data > threshold)[0]
print(f"\n异常传输检测 (阈值: {threshold:.2f}):")
if len(anomalous_indices) > 0:
    for idx in anomalous_indices:
        entry = log_entries[idx]
        print(f"  异常: IP={entry['ip']}, 字节数={entry['bytes']}, 动作={entry['action']}")
else:
    print("  未发现异常传输")

In [None]:
# 创建特征矩阵进行多维分析
print("=== 特征矩阵分析 ===")

# 为每个日志条目创建特征向量
def create_feature_vector(entry):
    """为日志条目创建特征向量"""
    # 特征: [是否为私有IP, 字节数, 是否成功, 动作类型编码]
    is_private = 1 if is_private_ip(entry["ip"]) else 0
    bytes_normalized = entry["bytes"] / 1024  # 标准化到KB
    is_success = 1 if entry["status"] == "success" else 0
    
    # 简单的动作编码
    action_encoding = {"login": 1, "view": 2, "download": 3, "upload": 4}
    action_code = action_encoding.get(entry["action"], 0)
    
    return [is_private, bytes_normalized, is_success, action_code]

# 创建特征矩阵
feature_matrix = np.array([create_feature_vector(entry) for entry in log_entries])
feature_names = ["是否私有IP", "字节数(KB)", "是否成功", "动作编码"]

print(f"特征矩阵形状: {feature_matrix.shape}")
print(f"特征名称: {feature_names}")
print(f"\n特征矩阵:")
print(feature_matrix)

# 按特征维度分析
print(f"\n特征统计:")
for i, name in enumerate(feature_names):
    feature_col = feature_matrix[:, i]
    print(f"  {name}: 均值={np.mean(feature_col):.2f}, 标准差={np.std(feature_col):.2f}")

In [None]:
# 数据可视化（如果matplotlib可用）
print("=== 数据可视化 ===")

try:
    plt.figure(figsize=(12, 8))
    
    # 子图1: 字节数分布
    plt.subplot(2, 2, 1)
    plt.hist(bytes_data, bins=5, alpha=0.7, color='skyblue', edgecolor='black')
    plt.title('字节数分布 (Bytes Distribution)')
    plt.xlabel('字节数 (Bytes)')
    plt.ylabel('频次 (Frequency)')
    
    # 子图2: IP类型统计
    plt.subplot(2, 2, 2)
    private_counts = np.sum(feature_matrix[:, 0])
    public_counts = len(feature_matrix) - private_counts
    plt.pie([private_counts, public_counts], labels=['私有IP', '公有IP'], autopct='%1.1f%%')
    plt.title('IP类型分布 (IP Type Distribution)')
    
    # 子图3: 成功率统计
    plt.subplot(2, 2, 3)
    success_counts = np.sum(feature_matrix[:, 2])
    failed_counts = len(feature_matrix) - success_counts
    plt.bar(['成功', '失败'], [success_counts, failed_counts], color=['green', 'red'], alpha=0.7)
    plt.title('操作成功率 (Success Rate)')
    plt.ylabel('次数 (Count)')
    
    # 子图4: 特征相关性热图
    plt.subplot(2, 2, 4)
    correlation_matrix = np.corrcoef(feature_matrix.T)
    plt.imshow(correlation_matrix, cmap='coolwarm', vmin=-1, vmax=1)
    plt.colorbar()
    plt.title('特征相关性 (Feature Correlation)')
    plt.xticks(range(len(feature_names)), [name[:6] for name in feature_names], rotation=45)
    plt.yticks(range(len(feature_names)), [name[:6] for name in feature_names])
    
    plt.tight_layout()
    plt.show()
    
except Exception as e:
    print(f"可视化出现问题: {e}")
    print("请确保matplotlib已正确安装")

## 5. 实用工具函数 (Utility Functions)

创建一些在网络安全分析中常用的工具函数。

In [None]:
def analyze_ip_frequency(ip_list):
    """
    分析IP地址访问频率
    
    Args:
        ip_list (list): IP地址列表
    
    Returns:
        dict: 包含去重IP、频次统计等信息的字典
    """
    unique_ips = list(set(ip_list))
    frequency_dict = Counter(ip_list)
    
    # 找出最频繁的IP
    most_common = frequency_dict.most_common(1)
    most_frequent_ip = most_common[0] if most_common else (None, 0)
    
    return {
        "unique_ips": unique_ips,
        "total_unique": len(unique_ips),
        "frequency_dict": dict(frequency_dict),
        "most_frequent": most_frequent_ip,
        "total_requests": len(ip_list)
    }

# 测试IP频率分析
sample_ips = ["192.168.1.1", "10.0.0.1", "192.168.1.1", "203.0.113.1", 
              "10.0.0.1", "192.168.1.1", "8.8.8.8"]

result = analyze_ip_frequency(sample_ips)
print("IP频率分析结果:")
for key, value in result.items():
    print(f"  {key}: {value}")

In [None]:
def detect_anomalies(data, method='zscore', threshold=2):
    """
    检测数据中的异常值
    
    Args:
        data (array-like): 输入数据
        method (str): 检测方法 ('zscore' 或 'iqr')
        threshold (float): 阈值
    
    Returns:
        dict: 异常检测结果
    """
    data = np.array(data)
    
    if method == 'zscore':
        z_scores = np.abs((data - np.mean(data)) / np.std(data))
        anomaly_mask = z_scores > threshold
        anomaly_scores = z_scores
    
    elif method == 'iqr':
        q1 = np.percentile(data, 25)
        q3 = np.percentile(data, 75)
        iqr = q3 - q1
        lower_bound = q1 - threshold * iqr
        upper_bound = q3 + threshold * iqr
        anomaly_mask = (data < lower_bound) | (data > upper_bound)
        anomaly_scores = np.abs(data - np.median(data))
    
    else:
        raise ValueError("方法必须是 'zscore' 或 'iqr'")
    
    anomaly_indices = np.where(anomaly_mask)[0]
    anomaly_values = data[anomaly_mask]
    
    return {
        "anomaly_indices": anomaly_indices.tolist(),
        "anomaly_values": anomaly_values.tolist(),
        "anomaly_count": len(anomaly_indices),
        "anomaly_rate": len(anomaly_indices) / len(data),
        "method": method,
        "threshold": threshold
    }

# 测试异常检测
test_data = [10, 12, 11, 13, 12, 45, 11, 10, 9, 13, 12, 100, 11]  # 包含异常值45和100

print("异常检测测试:")
print(f"原始数据: {test_data}")

# 使用Z-score方法
zscore_result = detect_anomalies(test_data, method='zscore', threshold=2)
print(f"\nZ-score方法结果:")
for key, value in zscore_result.items():
    print(f"  {key}: {value}")

# 使用IQR方法
iqr_result = detect_anomalies(test_data, method='iqr', threshold=1.5)
print(f"\nIQR方法结果:")
for key, value in iqr_result.items():
    print(f"  {key}: {value}")

## 总结 (Summary)

通过本课的学习，我们掌握了：

1. **Python基础数据结构**：列表、字典、集合、元组的使用和应用场景
2. **函数设计**：如何编写清晰、可重用的函数
3. **NumPy核心操作**：数组创建、运算、统计分析等
4. **实际应用**：将所学知识应用到网络安全数据分析中

这些技能为后续的机器学习课程打下了坚实的基础。在下一课中，我们将学习线性代数的基础知识。

---

**练习建议**：
1. 尝试修改本notebook中的代码，观察结果变化
2. 使用不同的数据集测试工具函数
3. 完成exercise.ipynb中的练习题