In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random

# 设置随机种子
np.random.seed(42)

# 生成用户数据
def generate_users(n_users=1000):
    provinces = ['北京', '上海', '广东', '江苏', '浙江', '四川', '湖北', '河南', '山东', '河北']
    age_ranges = ['18-24', '25-34', '35-44', '45-54', '55+']
    
    users = {
        'user_id': range(1, n_users + 1),
        'age_range': np.random.choice(age_ranges, n_users),
        'gender': np.random.choice(['M', 'F'], n_users),
        'province': np.random.choice(provinces, n_users),
        'register_date': [datetime(2023, 1, 1) + timedelta(days=np.random.randint(0, 365)) for _ in range(n_users)]
    }
    return pd.DataFrame(users)

# 生成商品数据
def generate_products(n_products=100):
    categories = ['电子产品', '服装', '食品', '家居', '美妆']
    products = {
        'product_id': range(1, n_products + 1),
        'category': np.random.choice(categories, n_products),
        'price': np.random.uniform(10, 1000, n_products).round(2),
        'stock': np.random.randint(0, 1000, n_products)
    }
    return pd.DataFrame(products)

# 生成订单数据
def generate_orders(users_df, products_df, n_orders=5000):
    orders = []
    for _ in range(n_orders):
        user_id = np.random.choice(users_df['user_id'])
        n_items = np.random.randint(1, 5)
        order_date = datetime(2023, 1, 1) + timedelta(days=np.random.randint(0, 365))
        
        for _ in range(n_items):
            product_id = np.random.choice(products_df['product_id'])
            product_price = products_df.loc[products_df['product_id'] == product_id, 'price'].iloc[0]
            quantity = np.random.randint(1, 5)
            
            orders.append({
                'order_id': len(orders) + 1,
                'user_id': user_id,
                'product_id': product_id,
                'quantity': quantity,
                'price': product_price,
                'total_amount': quantity * product_price,
                'order_date': order_date
            })
    
    return pd.DataFrame(orders)

# 生成数据
users_df = generate_users()
products_df = generate_products()
orders_df = generate_orders(users_df, products_df)

# 显示数据预览
print("用户数据预览：")
print(users_df.head())
print("\n商品数据预览：")
print(products_df.head())
print("\n订单数据预览：")
print(orders_df.head())


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# 1. 总体销售情况
total_sales = orders_df['total_amount'].sum()
total_orders = len(orders_df['order_id'].unique())
avg_order_value = total_sales / total_orders

print(f"总销售额: ¥{total_sales:,.2f}")
print(f"总订单数: {total_orders:,}")
print(f"平均订单金额: ¥{avg_order_value:.2f}")

# 2. 日销售趋势
daily_sales = orders_df.groupby('order_date')['total_amount'].sum().reset_index()

plt.figure(figsize=(15, 6))
plt.plot(daily_sales['order_date'], daily_sales['total_amount'])
plt.title('日销售趋势')
plt.xlabel('日期')
plt.ylabel('销售额')
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# 3. 月度销售对比
orders_df['month'] = orders_df['order_date'].dt.month
monthly_sales = orders_df.groupby('month')['total_amount'].sum().reset_index()

plt.figure(figsize=(12, 6))
plt.bar(monthly_sales['month'], monthly_sales['total_amount'])
plt.title('月度销售对比')
plt.xlabel('月份')
plt.ylabel('销售额')
plt.grid(True)
plt.show()

# 4. 销售额分布
plt.figure(figsize=(10, 6))
plt.hist(orders_df['total_amount'], bins=50)
plt.title('订单金额分布')
plt.xlabel('订单金额')
plt.ylabel('频次')
plt.grid(True)
plt.show()

# 基础统计量
print("\n销售额基础统计：")
print(orders_df['total_amount'].describe())


In [None]:
# 1. 用户活跃度分析
user_activity = orders_df.groupby('user_id').agg({
    'order_id': 'count',
    'total_amount': 'sum',
    'order_date': lambda x: (x.max() - x.min()).days + 1
}).rename(columns={
    'order_id': '购买次数',
    'total_amount': '总消费额',
    'order_date': '活跃天数'
})

# 2. 用户购买频率分布
plt.figure(figsize=(10, 6))
plt.hist(user_activity['购买次数'], bins=30)
plt.title('用户购买频率分布')
plt.xlabel('购买次数')
plt.ylabel('用户数')
plt.grid(True)
plt.show()

# 3. 用户消费水平分析
plt.figure(figsize=(10, 6))
plt.hist(user_activity['总消费额'], bins=30)
plt.title('用户消费水平分布')
plt.xlabel('总消费额')
plt.ylabel('用户数')
plt.grid(True)
plt.show()

# 4. 用户画像分析
# 合并用户信息
user_profile = pd.merge(user_activity, users_df, on='user_id')

# 按年龄段分析
age_analysis = user_profile.groupby('age_range').agg({
    '总消费额': 'mean',
    '购买次数': 'mean'
}).round(2)

print("\n不同年龄段用户行为：")
print(age_analysis)

# 按性别分析
gender_analysis = user_profile.groupby('gender').agg({
    '总消费额': 'mean',
    '购买次数': 'mean'
}).round(2)

print("\n不同性别用户行为：")
print(gender_analysis)

# 可视化地理分布
plt.figure(figsize=(12, 6))
province_sales = user_profile.groupby('province')['总消费额'].sum().sort_values(ascending=False)
province_sales.plot(kind='bar')
plt.title('各省份销售额分布')
plt.xlabel('省份')
plt.ylabel('总销售额')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
# 合并订单和商品数据
order_products = pd.merge(orders_df, products_df, on='product_id')

# 1. 类别销售额占比
category_sales = order_products.groupby('category')['total_amount'].sum()

plt.figure(figsize=(10, 6))
plt.pie(category_sales, labels=category_sales.index, autopct='%1.1f%%')
plt.title('各类别销售额占比')
plt.axis('equal')
plt.show()

# 2. 类别销量分析
category_quantity = order_products.groupby('category')['quantity'].sum()

plt.figure(figsize=(10, 6))
plt.bar(category_quantity.index, category_quantity.values)
plt.title('各类别销量分析')
plt.xlabel('商品类别')
plt.ylabel('销量')
plt.xticks(rotation=45)
plt.grid(True)
plt.show()

# 3. 商品价格分布
plt.figure(figsize=(12, 6))
sns.boxplot(x='category', y='price', data=products_df)
plt.title('各类别商品价格分布')
plt.xlabel('商品类别')
plt.ylabel('价格')
plt.xticks(rotation=45)
plt.grid(True)
plt.show()

# 4. 热销商品分析
top_products = order_products.groupby('product_id').agg({
    'quantity': 'sum',
    'total_amount': 'sum',
    'category': 'first'
}).sort_values('total_amount', ascending=False).head(10)

print("\n销售额Top10商品：")
print(top_products)

# 计算类别均价和销售额
category_stats = order_products.groupby('category').agg({
    'price': 'mean',
    'total_amount': 'sum',
    'quantity': 'sum'
}).round(2)

print("\n类别统计：")
print(category_stats)


In [None]:
from datetime import datetime
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans

# 计算RFM指标
now = orders_df['order_date'].max()

rfm = orders_df.groupby('user_id').agg({
    'order_date': lambda x: (now - x.max()).days,  # Recency
    'order_id': 'count',  # Frequency
    'total_amount': 'sum'  # Monetary
}).rename(columns={
    'order_date': 'recency',
    'order_id': 'frequency',
    'total_amount': 'monetary'
})

# 标准化RFM指标
scaler = StandardScaler()
rfm_normalized = scaler.fit_transform(rfm)

# 使用K-means进行客户分群
kmeans = KMeans(n_clusters=4, random_state=42)
rfm['cluster'] = kmeans.fit_predict(rfm_normalized)

# 分析各群体特征
cluster_analysis = rfm.groupby('cluster').agg({
    'recency': 'mean',
    'frequency': 'mean',
    'monetary': 'mean',
    'user_id': 'count'
}).round(2)

print("客户群体特征：")
print(cluster_analysis)

# 可视化客户群体
plt.figure(figsize=(15, 5))

# 1. Recency vs Monetary
plt.subplot(131)
plt.scatter(rfm['recency'], rfm['monetary'], c=rfm['cluster'], cmap='viridis')
plt.xlabel('Recency (days)')
plt.ylabel('Monetary')
plt.title('Recency vs Monetary')

# 2. Frequency vs Monetary
plt.subplot(132)
plt.scatter(rfm['frequency'], rfm['monetary'], c=rfm['cluster'], cmap='viridis')
plt.xlabel('Frequency')
plt.ylabel('Monetary')
plt.title('Frequency vs Monetary')

# 3. Recency vs Frequency
plt.subplot(133)
plt.scatter(rfm['recency'], rfm['frequency'], c=rfm['cluster'], cmap='viridis')
plt.xlabel('Recency (days)')
plt.ylabel('Frequency')
plt.title('Recency vs Frequency')

plt.tight_layout()
plt.show()

# 计算各群体占比
cluster_proportions = (rfm['cluster'].value_counts() / len(rfm) * 100).round(2)
print("\n客户群体占比：")
print(cluster_proportions)


In [None]:
from mlxtend.frequent_patterns import apriori
from mlxtend.frequent_patterns import association_rules

# 准备数据
# 将订单-商品数据转换为交易矩阵
transaction_matrix = pd.crosstab(orders_df['order_id'], orders_df['product_id'])
transaction_matrix = (transaction_matrix > 0).astype(int)

# 使用Apriori算法找出频繁项集
frequent_itemsets = apriori(transaction_matrix, min_support=0.01, use_colnames=True)

# 生成关联规则
rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.5)

# 按提升度排序
rules = rules.sort_values('lift', ascending=False)

print("商品关联规则（Top 10）：")
print(rules.head(10))

# 可视化支持度和置信度的关系
plt.figure(figsize=(10, 6))
plt.scatter(rules['support'], rules['confidence'], alpha=0.5)
plt.xlabel('支持度')
plt.ylabel('置信度')
plt.title('关联规则支持度vs置信度')
plt.grid(True)
plt.show()

# 找出最强关联的商品组合
top_combinations = rules.sort_values('lift', ascending=False).head(5)
print("\n最强关联的商品组合：")
print(top_combinations[['antecedents', 'consequents', 'lift', 'confidence']])

# 可视化不同提升度的规则数量
plt.figure(figsize=(10, 6))
plt.hist(rules['lift'], bins=30)
plt.xlabel('提升度')
plt.ylabel('规则数量')
plt.title('关联规则提升度分布')
plt.grid(True)
plt.show()


In [None]:
# 合并订单、用户和商品数据
geo_analysis = pd.merge(orders_df, users_df, on='user_id')
geo_analysis = pd.merge(geo_analysis, products_df, on='product_id')

# 1. 省份销售分布
province_sales = geo_analysis.groupby('province').agg({
    'total_amount': 'sum',
    'order_id': 'count',
    'user_id': 'nunique'
}).round(2)

print("省份销售情况：")
print(province_sales)

# 可视化省份销售分布
plt.figure(figsize=(12, 6))
province_sales['total_amount'].plot(kind='bar')
plt.title('省份销售额分布')
plt.xlabel('省份')
plt.ylabel('销售额')
plt.xticks(rotation=45)
plt.grid(True)
plt.show()

# 2. 地区消费能力
province_consumption = geo_analysis.groupby('province').agg({
    'total_amount': lambda x: x.sum() / len(x.unique())  # 平均订单金额
}).round(2)

plt.figure(figsize=(12, 6))
province_consumption['total_amount'].plot(kind='bar')
plt.title('省份平均订单金额')
plt.xlabel('省份')
plt.ylabel('平均订单金额')
plt.xticks(rotation=45)
plt.grid(True)
plt.show()

# 3. 地区商品偏好
province_category = geo_analysis.groupby(['province', 'category'])['total_amount'].sum().unstack()
province_category_pct = province_category.div(province_category.sum(axis=1), axis=0) * 100

plt.figure(figsize=(15, 8))
province_category_pct.plot(kind='bar', stacked=True)
plt.title('各省份商品类别销售占比')
plt.xlabel('省份')
plt.ylabel('销售占比(%)')
plt.legend(title='商品类别', bbox_to_anchor=(1.05, 1))
plt.tight_layout()
plt.show()

# 4. 区域市场机会
# 计算增长率
province_growth = geo_analysis.copy()
province_growth['month'] = province_growth['order_date'].dt.month
province_monthly = province_growth.groupby(['province', 'month'])['total_amount'].sum().unstack()
province_growth_rate = ((province_monthly.iloc[:, -1] - province_monthly.iloc[:, 0]) / province_monthly.iloc[:, 0] * 100).round(2)

print("\n各省份销售增长率：")
print(province_growth_rate)


In [None]:
# 模拟营销活动数据
np.random.seed(42)

# 生成活动数据
n_campaigns = 5
campaign_data = pd.DataFrame({
    'campaign_id': range(1, n_campaigns + 1),
    'campaign_name': [f'活动{i}' for i in range(1, n_campaigns + 1)],
    'start_date': pd.date_range(start='2023-01-01', periods=n_campaigns, freq='2M'),
    'duration_days': np.random.randint(7, 15, n_campaigns),
    'cost': np.random.uniform(10000, 50000, n_campaigns)
})

# 为订单添加活动标记
orders_df['is_campaign'] = orders_df['order_date'].apply(
    lambda x: any((x >= start) & (x <= start + pd.Timedelta(days=dur)) 
                 for start, dur in zip(campaign_data['start_date'], campaign_data['duration_days']))
)

# 1. 促销效果分析
campaign_effect = orders_df.groupby('is_campaign').agg({
    'total_amount': ['sum', 'mean'],
    'order_id': 'count',
    'user_id': 'nunique'
}).round(2)

print("促销活动效果对比：")
print(campaign_effect)

# 2. 用户响应度分析
user_response = orders_df.groupby(['user_id', 'is_campaign'])['order_id'].count().unstack(fill_value=0)
user_response['response_rate'] = user_response[True] / (user_response[True] + user_response[False])

plt.figure(figsize=(10, 6))
plt.hist(user_response['response_rate'], bins=20)
plt.title('用户促销响应度分布')
plt.xlabel('响应率')
plt.ylabel('用户数')
plt.grid(True)
plt.show()

# 3. 活动ROI分析
campaign_revenue = orders_df[orders_df['is_campaign']]['total_amount'].sum()
campaign_cost = campaign_data['cost'].sum()
roi = (campaign_revenue - campaign_cost) / campaign_cost * 100

print(f"\n活动总体ROI: {roi:.2f}%")

# 4. 营销策略优化建议
# 分析不同用户群体在活动期间的表现
campaign_user_analysis = pd.merge(orders_df[orders_df['is_campaign']], users_df, on='user_id')
group_performance = campaign_user_analysis.groupby(['age_range', 'gender']).agg({
    'total_amount': ['sum', 'mean'],
    'user_id': 'nunique'
}).round(2)

print("\n不同用户群体活动期间表现：")
print(group_performance)


In [None]:
import time
from datetime import datetime, timedelta

# 模拟实时数据生成器
def generate_real_time_order():
    return {
        'order_id': np.random.randint(10000, 99999),
        'user_id': np.random.randint(1, 1001),
        'product_id': np.random.randint(1, 101),
        'quantity': np.random.randint(1, 10),
        'price': np.random.uniform(10, 1000),
        'timestamp': datetime.now()
    }

# 1. 实时销售监控
print("模拟实时销售监控...")
sales_window = []
for _ in range(10):  # 模拟10个时间点
    order = generate_real_time_order()
    sales_amount = order['quantity'] * order['price']
    sales_window.append(sales_amount)
    
    print(f"时间: {order['timestamp'].strftime('%H:%M:%S')}")
    print(f"订单金额: ¥{sales_amount:.2f}")
    print(f"最近10笔订单平均金额: ¥{np.mean(sales_window):.2f}")
    print("---")
    time.sleep(1)

# 2. 异常订单检测
def detect_anomaly(order):
    # 简单的异常检测规则
    if order['quantity'] * order['price'] > 5000:  # 大额订单
        return True
    if order['quantity'] > 5:  # 大量购买
        return True
    return False

print("\n异常订单检测...")
for _ in range(5):
    order = generate_real_time_order()
    is_anomaly = detect_anomaly(order)
    print(f"订单ID: {order['order_id']}")
    print(f"订单金额: ¥{order['quantity'] * order['price']:.2f}")
    print(f"是否异常: {'是' if is_anomaly else '否'}")
    print("---")
    time.sleep(1)

# 3. 实时库存管理
inventory = products_df[['product_id', 'stock']].copy()
print("\n实时库存管理...")
for _ in range(5):
    order = generate_real_time_order()
    product_id = order['product_id']
    quantity = order['quantity']
    
    # 更新库存
    old_stock = inventory.loc[inventory['product_id'] == product_id, 'stock'].iloc[0]
    new_stock = old_stock - quantity
    inventory.loc[inventory['product_id'] == product_id, 'stock'] = new_stock
    
    print(f"商品ID: {product_id}")
    print(f"购买数量: {quantity}")
    print(f"剩余库存: {new_stock}")
    print("---")
    time.sleep(1)

# 4. 热点商品分析
print("\n实时热点商品分析...")
hot_products = {}
for _ in range(20):
    order = generate_real_time_order()
    product_id = order['product_id']
    hot_products[product_id] = hot_products.get(product_id, 0) + 1

top_products = sorted(hot_products.items(), key=lambda x: x[1], reverse=True)[:5]
print("最近热门商品:")
for product_id, count in top_products:
    print(f"商品ID: {product_id}, 被购买次数: {count}")


In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 创建SparkSession
spark = SparkSession.builder \
    .appName("E-commerce Analysis") \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size", "10g") \
    .getOrCreate()

print("Spark版本:", spark.version)
print("环境配置完成!")


In [None]:
# 读取数据
df = spark.read.csv("Online_Retail.csv", header=True, inferSchema=True)

# 数据清洗
df_cleaned = df.dropna() \
    .filter(col("Quantity") > 0) \
    .filter(col("UnitPrice") > 0) \
    .withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))

# 显示数据基本信息
print("数据总行数:", df_cleaned.count())
print("\n数据结构:")
df_cleaned.printSchema()

# 显示前几行数据
print("\n数据预览:")
df_cleaned.show(5)


In [None]:
# 按月统计销售额
monthly_sales = df_cleaned \
    .withColumn("YearMonth", date_format("InvoiceDate", "yyyy-MM")) \
    .groupBy("YearMonth") \
    .agg(sum("TotalAmount").alias("TotalSales")) \
    .orderBy("YearMonth")

# 转换为Pandas进行可视化
monthly_sales_pd = monthly_sales.toPandas()

# 绘制月度销售趋势图
plt.figure(figsize=(12, 6))
plt.plot(monthly_sales_pd["YearMonth"], monthly_sales_pd["TotalSales"], marker='o')
plt.title("月度销售趋势")
plt.xlabel("年月")
plt.ylabel("销售总额")
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()

# 计算环比增长率
monthly_sales_pd["GrowthRate"] = monthly_sales_pd["TotalSales"].pct_change() * 100
print("\n月度销售额环比增长率:")
print(monthly_sales_pd)


In [None]:
# 计算RFM指标
max_date = df_cleaned.agg(max("InvoiceDate")).collect()[0][0]

rfm = df_cleaned.groupBy("CustomerID").agg(
    datediff(lit(max_date), max("InvoiceDate")).alias("Recency"),
    countDistinct("InvoiceNo").alias("Frequency"),
    sum("TotalAmount").alias("Monetary")
)

# 使用K-means进行客户分群
assembler = VectorAssembler(
    inputCols=["Recency", "Frequency", "Monetary"],
    outputCol="features"
)

rfm_features = assembler.transform(rfm)
kmeans = KMeans(k=4, seed=1)
model = kmeans.fit(rfm_features)
rfm_clustered = model.transform(rfm_features)

# 分析各群体特征
cluster_stats = rfm_clustered.groupBy("prediction").agg(
    avg("Recency").alias("Avg_Recency"),
    avg("Frequency").alias("Avg_Frequency"),
    avg("Monetary").alias("Avg_Monetary"),
    count("*").alias("Count")
)

print("客户群体特征分析:")
cluster_stats.show()


In [None]:
from pyspark.ml.fpm import FPGrowth

# 准备数据
basket_data = df_cleaned.select("InvoiceNo", "Description") \
    .groupBy("InvoiceNo") \
    .agg(collect_list("Description").alias("items"))

# 使用FP-Growth算法进行关联分析
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.01, minConfidence=0.5)
model = fpGrowth.fit(basket_data)

# 显示频繁项集
print("频繁项集:")
model.freqItemsets.show(10)

# 显示关联规则
print("\n关联规则:")
model.associationRules.show(10)


In [None]:
from pyspark.streaming import StreamingContext

# 创建StreamingContext
ssc = StreamingContext(spark.sparkContext, 1)

# 模拟实时数据流
def generate_stream_data():
    return df_cleaned.limit(100).toPandas().to_json(orient="records")

# 创建DStream
stream_data = ssc.queueStream([spark.sparkContext.parallelize([generate_stream_data()])])

# 实时计算
def process_batch(time, rdd):
    if not rdd.isEmpty():
        # 计算实时销售额
        sales = rdd.map(lambda x: float(x.split(",")[6])).sum()
        print(f"Time: {time}, Sales: {sales}")

# 应用处理函数
stream_data.foreachRDD(process_batch)

# 启动流处理
ssc.start()
ssc.awaitTerminationOrTimeout(10)  # 运行10秒后停止
ssc.stop()


In [None]:
# 电商订单数据分析项目 - 第三部分：批处理分析(Hadoop)

在这部分中，我们将使用Hadoop生态系统进行大规模数据处理和分析。主要内容包括：

1. 数据迁移：MySQL到HDFS
2. MapReduce分析：订单数据处理
3. Hive分析：复杂查询实现
4. 数据导出：结果保存

本部分将展示如何使用Hadoop处理大规模数据集，为后续的实时处理打下基础。


In [None]:
# 导入所需的库
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import os

# 创建SparkSession
spark = SparkSession.builder \
    .appName("EcommerceAnalysis") \
    .config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .enableHiveSupport() \
    .getOrCreate()

# 从MySQL读取数据
def read_from_mysql():
    # MySQL连接配置
    mysql_properties = {
        "user": "root",
        "password": "your_password",  # 替换为实际密码
        "driver": "com.mysql.cj.jdbc.Driver"
    }
    
    # 读取MySQL数据
    df = spark.read.format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/ecommerce_analysis") \
        .option("dbtable", "orders") \
        .options(**mysql_properties) \
        .load()
    
    return df

# 读取数据
try:
    orders_df = read_from_mysql()
    print("数据读取成功！")
    print(f"总记录数: {orders_df.count()}")
    print("\n数据Schema:")
    orders_df.printSchema()
except Exception as e:
    print(f"错误: {e}")


In [None]:
# 将数据保存到HDFS
def save_to_hdfs(df):
    try:
        # 保存为Parquet格式
        df.write \
            .mode("overwrite") \
            .parquet("hdfs://localhost:9000/user/hadoop/ecommerce/orders")
        
        print("数据已成功保存到HDFS")
        
        # 创建Hive表
        spark.sql("""
        CREATE TABLE IF NOT EXISTS orders_hive (
            订单编号 INT,
            总金额 DECIMAL(10,2),
            买家实际支付金额 DECIMAL(10,2),
            收货地址 STRING,
            订单创建时间 TIMESTAMP,
            订单付款时间 TIMESTAMP,
            退款金额 DECIMAL(10,2),
            支付时长 FLOAT,
            下单年月 STRING,
            下单时间 INT,
            下单星期 INT,
            订单状态 STRING,
            实际收入 DECIMAL(10,2)
        )
        STORED AS PARQUET
        LOCATION 'hdfs://localhost:9000/user/hadoop/ecommerce/orders'
        """)
        
        print("Hive表创建成功")
        
    except Exception as e:
        print(f"错误: {e}")

# 保存数据
save_to_hdfs(orders_df)


In [None]:
# 使用Spark SQL进行数据分析
def analyze_orders():
    # 1. 销售趋势分析
    print("1. 每日销售趋势分析：")
    daily_sales = spark.sql("""
        SELECT 
            DATE(订单创建时间) as 日期,
            COUNT(*) as 订单数,
            SUM(总金额) as 总销售额,
            SUM(实际收入) as 实际收入,
            AVG(总金额) as 平均订单金额
        FROM orders_hive
        GROUP BY DATE(订单创建时间)
        ORDER BY 日期
    """)
    daily_sales.show(5)
    
    # 2. 地区销售分析
    print("\n2. 地区销售分析：")
    region_sales = spark.sql("""
        SELECT 
            收货地址,
            COUNT(*) as 订单数,
            SUM(总金额) as 总销售额,
            SUM(实际收入) as 实际收入,
            AVG(总金额) as 平均订单金额,
            COUNT(CASE WHEN 订单状态 = '已退款' THEN 1 END) as 退款订单数
        FROM orders_hive
        GROUP BY 收货地址
        ORDER BY 订单数 DESC
    """)
    region_sales.show(5)
    
    # 3. 时间段分析
    print("\n3. 各时段订单分布：")
    hourly_orders = spark.sql("""
        SELECT 
            下单时间 as 小时,
            COUNT(*) as 订单数,
            SUM(总金额) as 总销售额,
            AVG(总金额) as 平均订单金额
        FROM orders_hive
        GROUP BY 下单时间
        ORDER BY 小时
    """)
    hourly_orders.show(24)
    
    # 4. 订单状态分析
    print("\n4. 订单状态分析：")
    status_analysis = spark.sql("""
        SELECT 
            订单状态,
            COUNT(*) as 订单数,
            SUM(总金额) as 总金额,
            AVG(支付时长) as 平均支付时长
        FROM orders_hive
        GROUP BY 订单状态
    """)
    status_analysis.show()

# 执行分析
analyze_orders()


In [None]:
# 高级分析：RFM客户价值分析
def rfm_analysis():
    # 计算RFM指标
    rfm_query = """
    WITH rfm_base AS (
        SELECT 
            订单编号,
            订单创建时间,
            实际收入,
            MAX(订单创建时间) OVER () as max_date
        FROM orders_hive
        WHERE 订单状态 = '已付款'
    ),
    rfm_calc AS (
        SELECT 
            订单编号,
            DATEDIFF(max_date, 订单创建时间) as recency,
            COUNT(*) OVER (PARTITION BY 订单编号) as frequency,
            SUM(实际收入) OVER (PARTITION BY 订单编号) as monetary
        FROM rfm_base
    )
    SELECT 
        CASE 
            WHEN recency <= 30 THEN '高'
            WHEN recency <= 90 THEN '中'
            ELSE '低'
        END as 最近购买,
        CASE 
            WHEN frequency >= 3 THEN '高'
            WHEN frequency >= 2 THEN '中'
            ELSE '低'
        END as 购买频率,
        CASE 
            WHEN monetary >= 1000 THEN '高'
            WHEN monetary >= 500 THEN '中'
            ELSE '低'
        END as 消费金额,
        COUNT(*) as 客户数量
    FROM rfm_calc
    GROUP BY 
        CASE 
            WHEN recency <= 30 THEN '高'
            WHEN recency <= 90 THEN '中'
            ELSE '低'
        END,
        CASE 
            WHEN frequency >= 3 THEN '高'
            WHEN frequency >= 2 THEN '中'
            ELSE '低'
        END,
        CASE 
            WHEN monetary >= 1000 THEN '高'
            WHEN monetary >= 500 THEN '中'
            ELSE '低'
        END
    ORDER BY 最近购买, 购买频率, 消费金额
    """
    
    rfm_results = spark.sql(rfm_query)
    print("RFM客户价值分析结果：")
    rfm_results.show()

# 执行RFM分析
rfm_analysis()


In [None]:
# 保存分析结果
def save_analysis_results():
    # 1. 保存销售趋势分析结果
    daily_sales = spark.sql("""
        SELECT 
            DATE(订单创建时间) as 日期,
            COUNT(*) as 订单数,
            SUM(总金额) as 总销售额,
            SUM(实际收入) as 实际收入
        FROM orders_hive
        GROUP BY DATE(订单创建时间)
        ORDER BY 日期
    """)
    
    daily_sales.write \
        .mode("overwrite") \
        .parquet("hdfs://localhost:9000/user/hadoop/ecommerce/analysis/daily_sales")
    
    # 2. 保存地区销售分析结果
    region_sales = spark.sql("""
        SELECT 
            收货地址,
            COUNT(*) as 订单数,
            SUM(总金额) as 总销售额,
            SUM(实际收入) as 实际收入
        FROM orders_hive
        GROUP BY 收货地址
    """)
    
    region_sales.write \
        .mode("overwrite") \
        .parquet("hdfs://localhost:9000/user/hadoop/ecommerce/analysis/region_sales")
    
    # 3. 保存RFM分析结果
    rfm_results = spark.sql("""
        WITH rfm_base AS (
            SELECT 
                订单编号,
                订单创建时间,
                实际收入,
                MAX(订单创建时间) OVER () as max_date
            FROM orders_hive
            WHERE 订单状态 = '已付款'
        )
        SELECT 
            订单编号,
            DATEDIFF(max_date, 订单创建时间) as recency,
            COUNT(*) OVER (PARTITION BY 订单编号) as frequency,
            SUM(实际收入) OVER (PARTITION BY 订单编号) as monetary
        FROM rfm_base
    """)
    
    rfm_results.write \
        .mode("overwrite") \
        .parquet("hdfs://localhost:9000/user/hadoop/ecommerce/analysis/rfm_analysis")
    
    print("分析结果已保存到HDFS")

# 保存分析结果
save_analysis_results()


In [None]:
# 第三部分总结

在这部分中，我们完成了以下工作：

1. 数据迁移
   - 从MySQL读取数据
   - 将数据保存到HDFS
   - 创建Hive表结构

2. 基础分析
   - 销售趋势分析
   - 地区销售分析
   - 时间段分析
   - 订单状态分析

3. 高级分析
   - RFM客户价值分析
   - 客户分层

4. 结果保存
   - 分析结果保存到HDFS
   - 生成多个数据集供后续使用

下一步，我们将进入第四部分：实时处理(Kafka + Flink)，实现实时数据分析和监控。


In [None]:
# 第四部分：实时处理(Kafka + Flink)

在这部分中，我们将构建实时数据处理流水线，包括：

1. Kafka数据流
   - 订单数据实时采集
   - 消息队列管理
   - 数据流监控

2. Flink实时处理
   - 实时指标计算
   - 窗口统计
   - 实时告警

3. 实时监控
   - 销售额实时统计
   - 订单量监控
   - 异常检测

本部分将展示如何处理实时数据流并进行实时分析。


In [None]:
# 导入所需的库
from kafka import KafkaProducer, KafkaConsumer
import json
import time
from datetime import datetime
import random
import threading
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
import numpy as np

# Kafka配置
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_TOPIC = 'orders_stream'

# 创建Kafka生产者
def create_producer():
    return KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda x: json.dumps(x).encode('utf-8')
    )

# 模拟生成订单数据
def generate_order():
    cities = ['北京', '上海', '广州', '深圳', '杭州', '成都', '武汉', '西安']
    return {
        '订单编号': random.randint(10000, 99999),
        '总金额': round(random.uniform(100, 2000), 2),
        '买家实际支付金额': round(random.uniform(100, 2000), 2),
        '收货地址': random.choice(cities),
        '订单创建时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        '订单状态': random.choice(['已付款', '未付款', '已退款']),
        'timestamp': int(time.time() * 1000)
    }

# 数据生产者函数
def produce_messages(stop_event):
    producer = create_producer()
    while not stop_event.is_set():
        order = generate_order()
        producer.send(KAFKA_TOPIC, order)
        print(f"已发送订单: {order['订单编号']}")
        time.sleep(1)  # 每秒生成一个订单
    producer.close()

# 创建停止事件
stop_event = threading.Event()

# 启动生产者线程
producer_thread = threading.Thread(target=produce_messages, args=(stop_event,))
producer_thread.start()

print("Kafka生产者已启动，开始生成订单数据...")


In [None]:
# 设置Flink环境
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# 创建Kafka源表
source_ddl = """
    CREATE TABLE orders_source (
        订单编号 INT,
        总金额 DECIMAL(10, 2),
        买家实际支付金额 DECIMAL(10, 2),
        收货地址 STRING,
        订单创建时间 STRING,
        订单状态 STRING,
        `timestamp` BIGINT,
        proctime AS PROCTIME()
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_stream',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
"""

# 创建结果表
sink_ddl = """
    CREATE TABLE orders_stats (
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3),
        订单数 BIGINT,
        总金额 DECIMAL(10, 2),
        平均金额 DECIMAL(10, 2)
    ) WITH (
        'connector' = 'print'
    )
"""

# 执行DDL
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

# 实时统计查询
stats_query = """
    INSERT INTO orders_stats
    SELECT
        window_start,
        window_end,
        COUNT(*) as 订单数,
        SUM(总金额) as 总金额,
        AVG(总金额) as 平均金额
    FROM TABLE(
        TUMBLE(TABLE orders_source, DESCRIPTOR(proctime), INTERVAL '1' MINUTES)
    )
    GROUP BY window_start, window_end
"""

# 执行查询
t_env.execute_sql(stats_query)

print("Flink实时处理任务已启动...")


In [None]:
# 实时监控和告警
def monitor_orders():
    # 创建消费者
    consumer = KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        auto_offset_reset='latest',
        enable_auto_commit=True
    )
    
    # 初始化监控指标
    window_size = 60  # 60秒窗口
    order_counts = []
    order_amounts = []
    alert_threshold = 1000  # 告警阈值
    
    print("开始实时监控订单数据...")
    
    try:
        for message in consumer:
            order = message.value
            
            # 添加订单金额到窗口
            order_amounts.append(order['总金额'])
            order_counts.append(1)
            
            # 移除超出窗口的数据
            current_time = time.time()
            cutoff_time = current_time - window_size
            
            # 计算窗口内的统计数据
            total_orders = sum(order_counts)
            total_amount = sum(order_amounts)
            avg_amount = total_amount / total_orders if total_orders > 0 else 0
            
            # 打印实时统计
            print(f"\n实时监控 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"最近{window_size}秒内:")
            print(f"订单数量: {total_orders}")
            print(f"总金额: ¥{total_amount:.2f}")
            print(f"平均订单金额: ¥{avg_amount:.2f}")
            
            # 告警检测
            if total_amount > alert_threshold:
                print(f"⚠️ 告警：订单金额超过阈值！(¥{total_amount:.2f} > ¥{alert_threshold:.2f})")
            
            time.sleep(1)  # 控制输出频率
            
    except KeyboardInterrupt:
        print("监控停止")
        consumer.close()

# 启动监控线程
monitor_thread = threading.Thread(target=monitor_orders)
monitor_thread.start()

# 运行一段时间后停止
time.sleep(300)  # 运行5分钟
stop_event.set()  # 停止生产者
print("已停止数据生成和监控")


In [None]:
# 第五部分：复杂分析(Spark)

在这部分中，我们将使用Spark进行更深入的数据分析，包括：

1. 用户行为分析
   - 用户购买路径分析
   - 用户活跃度分析
   - 用户生命周期分析

2. 商品关联分析
   - 商品共现分析
   - 购物篮分析
   - 商品推荐

3. 销售趋势预测
   - 时间序列分析
   - 销售预测模型
   - 季节性分析

4. 用户分群分析
   - 用户价值分群
   - 用户行为分群
   - 精准营销建议

本部分将展示如何使用Spark的高级特性来实现复杂的数据分析任务。


In [None]:
# 导入所需的库
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# 创建SparkSession
spark = SparkSession.builder \
    .appName("E-commerce Complex Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# 从MySQL读取数据
orders_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/ecommerce") \
    .option("dbtable", "orders") \
    .option("user", "root") \
    .option("password", "your_password") \
    .load()

# 注册临时视图
orders_df.createOrReplaceTempView("orders")

print("Spark环境已准备就绪，数据已加载完成。")


In [None]:
# 1. 用户行为分析

# 1.1 用户购买路径分析
def analyze_user_purchase_path():
    path_sql = """
    WITH user_path AS (
        SELECT 
            用户ID,
            订单创建时间,
            商品类别,
            LAG(商品类别) OVER (PARTITION BY 用户ID ORDER BY 订单创建时间) as prev_category
        FROM orders
    )
    SELECT 
        prev_category as 前一个类别,
        商品类别 as 当前类别,
        COUNT(*) as 转换次数
    FROM user_path
    WHERE prev_category IS NOT NULL
    GROUP BY prev_category, 商品类别
    ORDER BY 转换次数 DESC
    LIMIT 10
    """
    return spark.sql(path_sql)

# 1.2 用户活跃度分析
def analyze_user_activity():
    activity_sql = """
    SELECT 
        用户ID,
        COUNT(DISTINCT DATE(订单创建时间)) as 活跃天数,
        COUNT(*) as 总订单数,
        DATEDIFF(MAX(订单创建时间), MIN(订单创建时间)) as 购买时间跨度
    FROM orders
    GROUP BY 用户ID
    """
    return spark.sql(activity_sql)

# 1.3 用户生命周期分析
def analyze_user_lifecycle():
    lifecycle_sql = """
    WITH user_stats AS (
        SELECT 
            用户ID,
            MIN(订单创建时间) as 首次购买时间,
            MAX(订单创建时间) as 最近购买时间,
            COUNT(*) as 购买次数,
            SUM(总金额) as 总消费金额
        FROM orders
        GROUP BY 用户ID
    )
    SELECT 
        CASE 
            WHEN 购买次数 = 1 THEN '新用户'
            WHEN DATEDIFF(CURRENT_DATE, 最近购买时间) <= 30 THEN '活跃用户'
            WHEN DATEDIFF(CURRENT_DATE, 最近购买时间) <= 90 THEN '沉睡用户'
            ELSE '流失用户'
        END as 用户状态,
        COUNT(*) as 用户数量,
        AVG(总消费金额) as 平均消费金额
    FROM user_stats
    GROUP BY 
        CASE 
            WHEN 购买次数 = 1 THEN '新用户'
            WHEN DATEDIFF(CURRENT_DATE, 最近购买时间) <= 30 THEN '活跃用户'
            WHEN DATEDIFF(CURRENT_DATE, 最近购买时间) <= 90 THEN '沉睡用户'
            ELSE '流失用户'
        END
    """
    return spark.sql(lifecycle_sql)

# 执行分析
print("用户购买路径分析结果：")
analyze_user_purchase_path().show()

print("\n用户活跃度分析结果：")
analyze_user_activity().show()

print("\n用户生命周期分析结果：")
analyze_user_lifecycle().show()


In [None]:
# 2. 商品关联分析

# 2.1 商品共现分析
def analyze_product_cooccurrence():
    cooccurrence_sql = """
    WITH order_products AS (
        SELECT 
            订单编号,
            COLLECT_LIST(商品ID) as products
        FROM orders
        GROUP BY 订单编号
    )
    SELECT 
        p1.商品ID as 商品1,
        p2.商品ID as 商品2,
        COUNT(*) as 共现次数
    FROM order_products op
    CROSS JOIN UNNEST(op.products) p1
    CROSS JOIN UNNEST(op.products) p2
    WHERE p1.商品ID < p2.商品ID
    GROUP BY p1.商品ID, p2.商品ID
    ORDER BY 共现次数 DESC
    LIMIT 10
    """
    return spark.sql(cooccurrence_sql)

# 2.2 购物篮分析
def analyze_shopping_basket():
    basket_sql = """
    SELECT 
        商品类别,
        COUNT(*) as 购买次数,
        COUNT(DISTINCT 用户ID) as 购买用户数,
        AVG(商品数量) as 平均购买数量,
        AVG(总金额) as 平均购买金额
    FROM orders
    GROUP BY 商品类别
    ORDER BY 购买次数 DESC
    """
    return spark.sql(basket_sql)

# 2.3 商品推荐
def generate_product_recommendations():
    # 使用协同过滤的简单实现
    recommendations_sql = """
    WITH user_product_matrix AS (
        SELECT 
            用户ID,
            商品ID,
            COUNT(*) as 购买次数
        FROM orders
        GROUP BY 用户ID, 商品ID
    ),
    product_similarity AS (
        SELECT 
            a.商品ID as 商品1,
            b.商品ID as 商品2,
            CORR(a.购买次数, b.购买次数) as 相似度
        FROM user_product_matrix a
        JOIN user_product_matrix b ON a.用户ID = b.用户ID
        WHERE a.商品ID < b.商品ID
        GROUP BY a.商品ID, b.商品ID
    )
    SELECT 
        商品1,
        商品2,
        相似度
    FROM product_similarity
    WHERE 相似度 > 0.5
    ORDER BY 相似度 DESC
    LIMIT 10
    """
    return spark.sql(recommendations_sql)

# 执行分析
print("商品共现分析结果：")
analyze_product_cooccurrence().show()

print("\n购物篮分析结果：")
analyze_shopping_basket().show()

print("\n商品推荐结果：")
generate_product_recommendations().show()


In [None]:
# 3. 销售趋势预测

# 3.1 时间序列分析
def analyze_time_series():
    time_series_sql = """
    SELECT 
        DATE(订单创建时间) as 日期,
        COUNT(*) as 订单数,
        SUM(总金额) as 总销售额,
        AVG(总金额) as 平均订单金额
    FROM orders
    GROUP BY DATE(订单创建时间)
    ORDER BY 日期
    """
    return spark.sql(time_series_sql)

# 3.2 销售预测模型
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

def build_sales_prediction_model():
    # 准备训练数据
    sales_data = spark.sql("""
        SELECT 
            DAYOFWEEK(订单创建时间) as 星期,
            HOUR(订单创建时间) as 小时,
            COUNT(*) as 订单数,
            SUM(总金额) as 销售额
        FROM orders
        GROUP BY DAYOFWEEK(订单创建时间), HOUR(订单创建时间)
    """)
    
    # 特征工程
    assembler = VectorAssembler(
        inputCols=["星期", "小时"],
        outputCol="features"
    )
    
    # 准备训练数据
    training_data = assembler.transform(sales_data)
    
    # 训练模型
    lr = LinearRegression(featuresCol="features", labelCol="销售额")
    model = lr.fit(training_data)
    
    return model, training_data

# 3.3 季节性分析
def analyze_seasonality():
    seasonality_sql = """
    SELECT 
        MONTH(订单创建时间) as 月份,
        DAYOFWEEK(订单创建时间) as 星期,
        HOUR(订单创建时间) as 小时,
        AVG(总金额) as 平均销售额,
        COUNT(*) as 订单数
    FROM orders
    GROUP BY 
        MONTH(订单创建时间),
        DAYOFWEEK(订单创建时间),
        HOUR(订单创建时间)
    ORDER BY 
        月份, 星期, 小时
    """
    return spark.sql(seasonality_sql)

# 执行分析
print("时间序列分析结果：")
analyze_time_series().show()

print("\n销售预测模型结果：")
model, training_data = build_sales_prediction_model()
predictions = model.transform(training_data)
predictions.select("星期", "小时", "销售额", "prediction").show()

print("\n季节性分析结果：")
analyze_seasonality().show()


In [None]:
# 4. 用户分群分析

# 4.1 用户价值分群
def analyze_user_value_segments():
    # 计算RFM指标
    rfm_sql = """
    WITH user_metrics AS (
        SELECT 
            用户ID,
            DATEDIFF(CURRENT_DATE, MAX(订单创建时间)) as Recency,
            COUNT(DISTINCT DATE(订单创建时间)) as Frequency,
            SUM(总金额) as Monetary
        FROM orders
        GROUP BY 用户ID
    )
    SELECT 
        CASE 
            WHEN Recency <= 30 AND Frequency >= 10 AND Monetary >= 5000 THEN '高价值'
            WHEN Recency <= 90 AND Frequency >= 5 AND Monetary >= 2000 THEN '中价值'
            ELSE '低价值'
        END as 价值分群,
        COUNT(*) as 用户数量,
        AVG(Monetary) as 平均消费金额,
        AVG(Frequency) as 平均购买频率
    FROM user_metrics
    GROUP BY 
        CASE 
            WHEN Recency <= 30 AND Frequency >= 10 AND Monetary >= 5000 THEN '高价值'
            WHEN Recency <= 90 AND Frequency >= 5 AND Monetary >= 2000 THEN '中价值'
            ELSE '低价值'
        END
    """
    return spark.sql(rfm_sql)

# 4.2 用户行为分群
def analyze_user_behavior_clusters():
    # 准备聚类特征
    behavior_data = spark.sql("""
        SELECT 
            用户ID,
            COUNT(*) as 购买次数,
            AVG(总金额) as 平均订单金额,
            COUNT(DISTINCT 商品类别) as 购买类别数,
            SUM(商品数量) as 总购买数量
        FROM orders
        GROUP BY 用户ID
    """)
    
    # 特征标准化
    assembler = VectorAssembler(
        inputCols=["购买次数", "平均订单金额", "购买类别数", "总购买数量"],
        outputCol="features"
    )
    
    # 准备训练数据
    training_data = assembler.transform(behavior_data)
    
    # K-means聚类
    kmeans = KMeans(k=3, featuresCol="features")
    model = kmeans.fit(training_data)
    
    # 添加聚类标签
    clustered_data = model.transform(training_data)
    
    # 分析聚类结果
    return clustered_data.groupBy("prediction") \
        .agg(
            count("*").alias("用户数量"),
            avg("购买次数").alias("平均购买次数"),
            avg("平均订单金额").alias("平均订单金额"),
            avg("购买类别数").alias("平均购买类别数"),
            avg("总购买数量").alias("平均购买数量")
        )

# 4.3 精准营销建议
def generate_marketing_recommendations():
    marketing_sql = """
    WITH user_segments AS (
        SELECT 
            用户ID,
            CASE 
                WHEN COUNT(*) >= 10 AND SUM(总金额) >= 5000 THEN '高价值'
                WHEN COUNT(*) >= 5 AND SUM(总金额) >= 2000 THEN '中价值'
                ELSE '低价值'
            END as 用户分群,
            MAX(订单创建时间) as 最近购买时间,
            COUNT(DISTINCT 商品类别) as 购买类别数,
            AVG(总金额) as 平均订单金额
        FROM orders
        GROUP BY 用户ID
    )
    SELECT 
        用户分群,
        CASE 
            WHEN 用户分群 = '高价值' THEN '提供VIP服务和专属优惠'
            WHEN 用户分群 = '中价值' THEN '推送相关商品和促销活动'
            ELSE '提供新用户优惠和基础服务'
        END as 营销建议,
        COUNT(*) as 用户数量,
        AVG(购买类别数) as 平均购买类别数,
        AVG(平均订单金额) as 平均订单金额
    FROM user_segments
    GROUP BY 用户分群
    """
    return spark.sql(marketing_sql)

# 执行分析
print("用户价值分群分析结果：")
analyze_user_value_segments().show()

print("\n用户行为分群分析结果：")
analyze_user_behavior_clusters().show()

print("\n精准营销建议：")
generate_marketing_recommendations().show()

# 停止Spark会话
spark.stop()


In [None]:
# 第六部分：数据可视化

在这部分中，我们将使用多种可视化工具来展示分析结果，包括：

1. 销售趋势可视化
   - 销售额时间序列图
   - 订单量趋势图
   - 季节性分析图

2. 用户分析可视化
   - 用户分群分布图
   - 用户价值金字塔
   - 用户行为雷达图

3. 商品分析可视化
   - 商品关联网络图
   - 品类销售占比图
   - 热门商品排行榜

4. 地理分布可视化
   - 销售地理热力图
   - 区域订单分布图
   - 物流路线图


In [None]:
# 导入可视化所需的库
import plotly.express as px
import plotly.graph_objects as go
import plotly.figure_factory as ff
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
import seaborn as sns
import pyecharts.options as opts
from pyecharts.charts import Bar, Line, Pie, HeatMap, Graph
from pyecharts.globals import ThemeType

# 重新创建SparkSession
spark = SparkSession.builder \
    .appName("E-commerce Visualization") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# 1. 销售趋势可视化
def visualize_sales_trends():
    # 获取销售趋势数据
    sales_trend = spark.sql("""
        SELECT 
            DATE(订单创建时间) as 日期,
            COUNT(*) as 订单数,
            SUM(总金额) as 销售额
        FROM orders
        GROUP BY DATE(订单创建时间)
        ORDER BY 日期
    """).toPandas()
    
    # 创建销售趋势图
    fig = make_subplots(rows=2, cols=1,
                       subplot_titles=('日销售额趋势', '日订单量趋势'))
    
    # 销售额趋势
    fig.add_trace(
        go.Scatter(x=sales_trend['日期'], y=sales_trend['销售额'],
                  mode='lines', name='销售额'),
        row=1, col=1
    )
    
    # 订单量趋势
    fig.add_trace(
        go.Scatter(x=sales_trend['日期'], y=sales_trend['订单数'],
                  mode='lines', name='订单数'),
        row=2, col=1
    )
    
    fig.update_layout(height=800, title_text="销售趋势分析")
    fig.show()

# 2. 用户分析可视化
def visualize_user_analysis():
    # 获取用户分群数据
    user_segments = spark.sql("""
        WITH user_metrics AS (
            SELECT 
                用户ID,
                COUNT(*) as 购买次数,
                SUM(总金额) as 总消费额,
                AVG(总金额) as 平均订单额
            FROM orders
            GROUP BY 用户ID
        )
        SELECT 
            CASE 
                WHEN 总消费额 >= 5000 THEN '高价值'
                WHEN 总消费额 >= 2000 THEN '中价值'
                ELSE '低价值'
            END as 用户分群,
            COUNT(*) as 用户数量,
            SUM(总消费额) as 分群总消费额
        FROM user_metrics
        GROUP BY 
            CASE 
                WHEN 总消费额 >= 5000 THEN '高价值'
                WHEN 总消费额 >= 2000 THEN '中价值'
                ELSE '低价值'
            END
    """).toPandas()
    
    # 创建用户价值金字塔
    fig = go.Figure(data=[go.Funnel(
        y=user_segments['用户分群'],
        x=user_segments['用户数量'],
        textinfo="value+percent initial"
    )])
    
    fig.update_layout(title_text="用户价值金字塔")
    fig.show()

# 3. 商品分析可视化
def visualize_product_analysis():
    # 获取品类销售数据
    category_sales = spark.sql("""
        SELECT 
            商品类别,
            COUNT(*) as 销售量,
            SUM(总金额) as 销售额
        FROM orders
        GROUP BY 商品类别
        ORDER BY 销售额 DESC
    """).toPandas()
    
    # 创建品类销售占比图
    fig = px.pie(category_sales, 
                 values='销售额', 
                 names='商品类别',
                 title='商品类别销售额占比')
    fig.show()

# 4. 地理分布可视化
def visualize_geographic_distribution():
    # 获取地理分布数据
    geo_distribution = spark.sql("""
        SELECT 
            收货地址,
            COUNT(*) as 订单数,
            SUM(总金额) as 销售额
        FROM orders
        GROUP BY 收货地址
        ORDER BY 销售额 DESC
    """).toPandas()
    
    # 创建地理分布热力图
    c = (
        HeatMap()
        .add_xaxis(list(geo_distribution['收货地址']))
        .add_yaxis(
            "销售额",
            list(geo_distribution['销售额']),
            label_opts=opts.LabelOpts(is_show=True, position="inside")
        )
        .set_global_opts(
            title_opts=opts.TitleOpts(title="销售地理分布热力图"),
            visualmap_opts=opts.VisualMapOpts()
        )
    )
    c.render_notebook()

# 执行可视化
print("生成销售趋势可视化...")
visualize_sales_trends()

print("\n生成用户分析可视化...")
visualize_user_analysis()

print("\n生成商品分析可视化...")
visualize_product_analysis()

print("\n生成地理分布可视化...")
visualize_geographic_distribution()


In [None]:
# 第七部分：性能优化

在这部分中，我们将对系统进行性能优化，包括：

1. Spark性能优化
   - 内存管理优化
   - 分区优化
   - 缓存策略优化
   - SQL查询优化

2. 数据存储优化
   - 数据压缩
   - 索引优化
   - 分区策略
   - 存储格式选择

3. 实时处理优化
   - Kafka配置优化
   - Flink并行度调整
   - 状态后端优化
   - Checkpoint配置

4. 系统资源优化
   - CPU利用率优化
   - 内存分配优化
   - 网络传输优化
   - 磁盘I/O优化


In [None]:
# 1. Spark性能优化

# 优化Spark配置
def optimize_spark_config():
    # 创建优化后的SparkSession
    optimized_spark = SparkSession.builder \
        .appName("Optimized E-commerce Analysis") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.shuffle.partitions", "200") \
        .config("spark.memory.fraction", "0.8") \
        .config("spark.memory.storageFraction", "0.3") \
        .config("spark.speculation", "true") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .getOrCreate()
    
    return optimized_spark

# 优化数据缓存策略
def optimize_caching():
    # 识别频繁使用的数据集
    orders_df.cache()
    
    # 创建优化后的临时视图
    orders_df.createOrReplaceTempView("optimized_orders")
    
    # 对常用的聚合结果进行缓存
    daily_sales = spark.sql("""
        SELECT 
            DATE(订单创建时间) as 日期,
            COUNT(*) as 订单数,
            SUM(总金额) as 销售额
        FROM optimized_orders
        GROUP BY DATE(订单创建时间)
    """)
    daily_sales.cache()

# 2. 数据存储优化

# 优化MySQL配置
def optimize_mysql():
    # 创建索引
    spark.sql("""
        CREATE INDEX idx_order_time ON orders(订单创建时间);
        CREATE INDEX idx_user_id ON orders(用户ID);
        CREATE INDEX idx_product_id ON orders(商品ID);
    """)
    
    # 优化表分区
    spark.sql("""
        ALTER TABLE orders
        PARTITION BY RANGE (YEAR(订单创建时间)) (
            PARTITION p2023 VALUES LESS THAN (2024),
            PARTITION p2024 VALUES LESS THAN (2025)
        );
    """)

# 3. 实时处理优化

# 优化Kafka配置
kafka_optimized_config = {
    'bootstrap.servers': 'localhost:9092',
    'compression.type': 'lz4',
    'batch.size': '65536',
    'linger.ms': '5',
    'buffer.memory': '67108864',
    'acks': '1'
}

# 优化Flink配置
def optimize_flink():
    # 设置Flink优化参数
    env_config = {
        'parallelism.default': '4',
        'taskmanager.memory.process.size': '4096m',
        'taskmanager.numberOfTaskSlots': '8',
        'state.backend': 'rocksdb',
        'state.checkpoints.dir': 'hdfs://namenode:8020/flink-checkpoints',
        'execution.checkpointing.interval': '10000',
        'execution.checkpointing.mode': 'EXACTLY_ONCE'
    }
    
    # 应用配置
    for key, value in env_config.items():
        env.get_config().set_string(key, value)

# 4. 系统资源优化

# 监控系统资源使用
def monitor_system_resources():
    import psutil
    
    # 获取系统资源使用情况
    cpu_percent = psutil.cpu_percent(interval=1)
    memory_info = psutil.virtual_memory()
    disk_io = psutil.disk_io_counters()
    network_io = psutil.net_io_counters()
    
    print(f"系统资源监控:")
    print(f"CPU使用率: {cpu_percent}%")
    print(f"内存使用率: {memory_info.percent}%")
    print(f"磁盘读写: 读取={disk_io.read_bytes/1024/1024}MB, 写入={disk_io.write_bytes/1024/1024}MB")
    print(f"网络IO: 发送={network_io.bytes_sent/1024/1024}MB, 接收={network_io.bytes_recv/1024/1024}MB")

# 执行优化
print("开始执行性能优化...")

print("\n1. 优化Spark配置...")
optimized_spark = optimize_spark_config()
optimize_caching()

print("\n2. 优化数据存储...")
optimize_mysql()

print("\n3. 优化实时处理...")
optimize_flink()

print("\n4. 监控系统资源...")
monitor_system_resources()

print("\n性能优化完成！")


In [None]:
# 第八部分：系统集成

在这部分中，我们将整合所有组件，构建完整的数据处理流水线：

1. 数据流集成
   - 数据采集层
   - 存储层
   - 计算层
   - 服务层

2. 接口集成
   - REST API设计
   - 数据服务接口
   - 监控接口
   - 管理接口

3. 监控告警
   - 系统监控
   - 业务监控
   - 告警规则
   - 通知机制

4. 部署方案
   - 容器化部署
   - 服务编排
   - 负载均衡
   - 高可用设计


In [None]:
# 1. 数据流集成

from flask import Flask, jsonify, request
import json
import logging
from prometheus_client import start_http_server, Counter, Gauge
import docker
import yaml

# 创建Flask应用
app = Flask(__name__)

# 设置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 2. 接口集成

# REST API路由
@app.route('/api/v1/sales/daily', methods=['GET'])
def get_daily_sales():
    try:
        start_date = request.args.get('start_date')
        end_date = request.args.get('end_date')
        
        sales_data = spark.sql(f"""
            SELECT 
                DATE(订单创建时间) as 日期,
                COUNT(*) as 订单数,
                SUM(总金额) as 销售额
            FROM orders
            WHERE DATE(订单创建时间) BETWEEN '{start_date}' AND '{end_date}'
            GROUP BY DATE(订单创建时间)
            ORDER BY 日期
        """).toPandas().to_dict('records')
        
        return jsonify({
            'status': 'success',
            'data': sales_data
        })
    except Exception as e:
        logger.error(f"获取日销售数据失败: {str(e)}")
        return jsonify({
            'status': 'error',
            'message': str(e)
        }), 500

@app.route('/api/v1/users/segments', methods=['GET'])
def get_user_segments():
    try:
        user_segments = spark.sql("""
            WITH user_metrics AS (
                SELECT 
                    用户ID,
                    COUNT(*) as 购买次数,
                    SUM(总金额) as 总消费额
                FROM orders
                GROUP BY 用户ID
            )
            SELECT 
                CASE 
                    WHEN 总消费额 >= 5000 THEN '高价值'
                    WHEN 总消费额 >= 2000 THEN '中价值'
                    ELSE '低价值'
                END as 用户分群,
                COUNT(*) as 用户数量
            FROM user_metrics
            GROUP BY 
                CASE 
                    WHEN 总消费额 >= 5000 THEN '高价值'
                    WHEN 总消费额 >= 2000 THEN '中价值'
                    ELSE '低价值'
                END
        """).toPandas().to_dict('records')
        
        return jsonify({
            'status': 'success',
            'data': user_segments
        })
    except Exception as e:
        logger.error(f"获取用户分群数据失败: {str(e)}")
        return jsonify({
            'status': 'error',
            'message': str(e)
        }), 500

# 3. 监控告警

# Prometheus指标
SALES_COUNTER = Counter('total_sales', 'Total sales amount')
ORDER_COUNTER = Counter('total_orders', 'Total number of orders')
ACTIVE_USERS = Gauge('active_users', 'Number of active users')

def update_metrics():
    try:
        # 更新销售指标
        metrics = spark.sql("""
            SELECT 
                SUM(总金额) as total_sales,
                COUNT(*) as total_orders,
                COUNT(DISTINCT 用户ID) as active_users
            FROM orders
            WHERE DATE(订单创建时间) = CURRENT_DATE
        """).collect()[0]
        
        SALES_COUNTER.inc(metrics['total_sales'])
        ORDER_COUNTER.inc(metrics['total_orders'])
        ACTIVE_USERS.set(metrics['active_users'])
        
    except Exception as e:
        logger.error(f"更新监控指标失败: {str(e)}")

# 4. 部署方案

# Docker配置
def generate_docker_compose():
    services = {
        'spark-master': {
            'image': 'bitnami/spark:latest',
            'ports': ['8080:8080', '7077:7077'],
            'environment': ['SPARK_MODE=master']
        },
        'spark-worker': {
            'image': 'bitnami/spark:latest',
            'environment': [
                'SPARK_MODE=worker',
                'SPARK_MASTER_URL=spark://spark-master:7077'
            ],
            'depends_on': ['spark-master']
        },
        'mysql': {
            'image': 'mysql:8.0',
            'environment': [
                'MYSQL_ROOT_PASSWORD=your_password',
                'MYSQL_DATABASE=ecommerce'
            ],
            'volumes': ['mysql_data:/var/lib/mysql']
        },
        'kafka': {
            'image': 'bitnami/kafka:latest',
            'environment': [
                'KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181',
                'ALLOW_PLAINTEXT_LISTENER=yes'
            ],
            'depends_on': ['zookeeper']
        },
        'zookeeper': {
            'image': 'bitnami/zookeeper:latest',
            'environment': ['ALLOW_ANONYMOUS_LOGIN=yes']
        },
        'flink-jobmanager': {
            'image': 'flink:latest',
            'ports': ['8081:8081'],
            'command': 'jobmanager',
            'environment': ['JOB_MANAGER_RPC_ADDRESS=flink-jobmanager']
        },
        'flink-taskmanager': {
            'image': 'flink:latest',
            'command': 'taskmanager',
            'environment': ['JOB_MANAGER_RPC_ADDRESS=flink-jobmanager'],
            'depends_on': ['flink-jobmanager']
        }
    }
    
    compose = {
        'version': '3',
        'services': services,
        'volumes': {
            'mysql_data': None
        }
    }
    
    with open('docker-compose.yml', 'w') as f:
        yaml.dump(compose, f)

# 启动系统
def start_system():
    try:
        # 启动Prometheus监控
        start_http_server(8000)
        
        # 生成Docker配置
        generate_docker_compose()
        
        # 启动Flask应用
        app.run(host='0.0.0.0', port=5000)
        
        logger.info("系统已成功启动！")
        
    except Exception as e:
        logger.error(f"系统启动失败: {str(e)}")
        raise

if __name__ == '__main__':
    start_system()


In [None]:
# 第九部分：文档和报告

## 1. 项目概述

本项目是一个基于大数据技术栈的电商数据分析系统，主要功能包括：

- 数据采集和预处理
- 数据存储和管理
- 批处理分析
- 实时处理
- 复杂分析
- 数据可视化
- 系统集成

## 2. 系统架构

系统采用分布式架构，主要组件包括：

1. 数据采集层
   - Kafka用于实时数据采集
   - Flume用于日志收集

2. 存储层
   - MySQL用于结构化数据存储
   - HDFS用于大规模数据存储
   - Redis用于缓存

3. 计算层
   - Spark用于批处理分析
   - Flink用于实时处理
   - Hadoop用于资源调度

4. 服务层
   - Flask提供REST API
   - Prometheus进行监控
   - Docker进行容器化部署

## 3. 主要功能

1. 用户分析
   - 用户行为分析
   - 用户价值分析
   - 用户生命周期管理

2. 商品分析
   - 商品关联分析
   - 销售趋势分析
   - 商品推荐

3. 实时监控
   - 销售额监控
   - 订单量监控
   - 用户活跃度监控

4. 运营分析
   - ROI分析
   - 营销效果分析
   - 库存优化建议

## 4. 部署说明

1. 环境要求
   - Python 3.8+
   - Java 8+
   - Docker 20.10+
   - 16GB+ RAM
   - 4+ CPU cores

2. 安装步骤
   ```bash
   # 1. 克隆代码
   git clone https://github.com/your-repo/ecommerce-analysis.git
   
   # 2. 安装依赖
   pip install -r requirements.txt
   
   # 3. 启动服务
   docker-compose up -d
   
   # 4. 初始化数据库
   python init_db.py
   
   # 5. 启动应用
   python app.py
   ```

3. 配置说明
   - 修改config.yaml进行系统配置
   - 修改.env设置环境变量
   - 修改docker-compose.yml配置容器

## 5. 使用说明

1. 访问地址
   - Web界面：http://localhost:5000
   - API文档：http://localhost:5000/api/docs
   - 监控面板：http://localhost:8000

2. API使用
   ```python
   # 获取销售数据
   GET /api/v1/sales/daily?start_date=2024-01-01&end_date=2024-01-31
   
   # 获取用户分群
   GET /api/v1/users/segments
   ```

3. 监控指标
   - total_sales：总销售额
   - total_orders：总订单数
   - active_users：活跃用户数

## 6. 性能指标

1. 系统性能
   - 支持每秒1000+订单处理
   - 毫秒级实时分析响应
   - 支持TB级数据存储

2. 可用性指标
   - 系统可用性99.9%
   - 数据一致性99.99%
   - 故障恢复时间<5分钟

## 7. 维护建议

1. 日常维护
   - 定期备份数据
   - 监控系统资源
   - 清理历史数据

2. 故障处理
   - 检查日志文件
   - 重启相关服务
   - 回滚配置

3. 升级建议
   - 先测试后升级
   - 保持版本兼容
   - 做好备份


In [None]:
# 第九部分：机器学习与智能分析

在这部分中，我们将使用机器学习算法进行更深入的数据分析，包括：

1. 销售预测
   - 时间序列预测
   - 多因素回归分析
   - 预测模型评估

2. 用户行为预测
   - 购买倾向预测
   - 流失风险预测
   - 生命周期预测

3. 智能推荐
   - 协同过滤推荐
   - 基于内容推荐
   - 混合推荐算法

4. 异常检测
   - 交易异常检测
   - 用户行为异常
   - 系统异常监控


In [None]:
# 导入机器学习相关库
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.pipeline import Pipeline
from pyspark.sql.functions import col, datediff, current_date
import numpy as np
from sklearn.metrics import mean_squared_error, r2_score
import xgboost as xgb
from prophet import Prophet

# 1. 销售预测
def sales_prediction():
    # 准备时间序列数据
    sales_data = spark.sql("""
        SELECT 
            DATE(订单创建时间) as ds,
            SUM(总金额) as y,
            COUNT(*) as order_count,
            COUNT(DISTINCT 用户ID) as user_count,
            AVG(总金额) as avg_amount
        FROM orders
        GROUP BY DATE(订单创建时间)
        ORDER BY ds
    """).toPandas()
    
    # 使用Prophet进行时间序列预测
    model = Prophet(yearly_seasonality=True, weekly_seasonality=True)
    model.fit(sales_data[['ds', 'y']])
    
    # 预测未来30天
    future = model.make_future_dataframe(periods=30)
    forecast = model.predict(future)
    
    # 多因素回归预测
    feature_cols = ['order_count', 'user_count', 'avg_amount']
    assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
    scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
    lr = LinearRegression(featuresCol='scaled_features', labelCol='y')
    
    pipeline = Pipeline(stages=[assembler, scaler, lr])
    model = pipeline.fit(spark.createDataFrame(sales_data))
    
    return forecast, model

# 2. 用户行为预测
def user_behavior_prediction():
    # 准备用户特征
    user_features = spark.sql("""
        SELECT 
            用户ID,
            COUNT(*) as 购买次数,
            SUM(总金额) as 总消费额,
            AVG(总金额) as 平均订单金额,
            DATEDIFF(CURRENT_DATE, MAX(订单创建时间)) as 最近购买天数,
            COUNT(DISTINCT 商品类别) as 购买类别数
        FROM orders
        GROUP BY 用户ID
    """)
    
    # 定义流失标签（90天未购买视为流失）
    user_features = user_features.withColumn(
        'is_churn',
        col('最近购买天数') > 90
    )
    
    # 准备特征
    feature_cols = ['购买次数', '总消费额', '平均订单金额', '最近购买天数', '购买类别数']
    assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
    
    # 训练流失预测模型
    rf = RandomForestClassifier(
        featuresCol='features',
        labelCol='is_churn',
        numTrees=100
    )
    
    pipeline = Pipeline(stages=[assembler, rf])
    model = pipeline.fit(user_features)
    
    return model

# 3. 智能推荐
def build_recommendation_system():
    # 准备用户-商品评分数据
    ratings = spark.sql("""
        SELECT 
            用户ID as user,
            商品ID as item,
            总金额 as rating
        FROM orders
    """)
    
    # 使用ALS算法构建推荐系统
    als = ALS(
        maxIter=10,
        regParam=0.01,
        userCol="user",
        itemCol="item",
        ratingCol="rating",
        coldStartStrategy="drop"
    )
    
    model = als.fit(ratings)
    
    # 生成推荐
    userRecs = model.recommendForAllUsers(10)
    itemRecs = model.recommendForAllItems(10)
    
    return model, userRecs, itemRecs

# 4. 异常检测
def anomaly_detection():
    # 交易异常检测
    transaction_anomalies = spark.sql("""
        WITH user_stats AS (
            SELECT 
                用户ID,
                AVG(总金额) as avg_amount,
                STDDEV(总金额) as std_amount
            FROM orders
            GROUP BY 用户ID
        )
        SELECT 
            o.*,
            (o.总金额 - us.avg_amount) / us.std_amount as z_score
        FROM orders o
        JOIN user_stats us ON o.用户ID = us.用户ID
        WHERE ABS((o.总金额 - us.avg_amount) / us.std_amount) > 3
    """)
    
    # 用户行为异常检测
    behavior_anomalies = spark.sql("""
        WITH user_daily_stats AS (
            SELECT 
                用户ID,
                DATE(订单创建时间) as date,
                COUNT(*) as daily_orders,
                SUM(总金额) as daily_amount
            FROM orders
            GROUP BY 用户ID, DATE(订单创建时间)
        ),
        user_averages AS (
            SELECT 
                用户ID,
                AVG(daily_orders) as avg_daily_orders,
                STDDEV(daily_orders) as std_daily_orders,
                AVG(daily_amount) as avg_daily_amount,
                STDDEV(daily_amount) as std_daily_amount
            FROM user_daily_stats
            GROUP BY 用户ID
        )
        SELECT 
            uds.*,
            (uds.daily_orders - ua.avg_daily_orders) / ua.std_daily_orders as order_z_score,
            (uds.daily_amount - ua.avg_daily_amount) / ua.std_daily_amount as amount_z_score
        FROM user_daily_stats uds
        JOIN user_averages ua ON uds.用户ID = ua.用户ID
        WHERE 
            ABS((uds.daily_orders - ua.avg_daily_orders) / ua.std_daily_orders) > 3
            OR ABS((uds.daily_amount - ua.avg_daily_amount) / ua.std_daily_amount) > 3
    """)
    
    return transaction_anomalies, behavior_anomalies

# 执行分析
print("开始执行机器学习分析...")

print("\n1. 执行销售预测...")
forecast, sales_model = sales_prediction()

print("\n2. 执行用户行为预测...")
user_model = user_behavior_prediction()

print("\n3. 构建推荐系统...")
rec_model, user_recommendations, item_recommendations = build_recommendation_system()

print("\n4. 执行异常检测...")
trans_anomalies, behav_anomalies = anomaly_detection()

print("\n机器学习分析完成！")


In [None]:
# 电商订单数据分析项目

## 第一部分：数据采集和预处理

本项目将分析电商订单数据，包括：
1. 数据理解与检查
2. 数据清洗
3. 特征工程
4. 数据转换
5. 数据质量验证


In [None]:
# 导入所需的库
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# 读取数据
df = pd.read_csv('tmall_order_report.csv')
print("数据集基本信息：")
print(f"数据集大小：{df.shape}")
print("\n数据集前5行：")
df.head()


In [None]:
# 数据质量检查
print("数据集信息：")
df.info()

print("\n缺失值统计：")
print(df.isnull().sum())

print("\n重复值统计：")
print(f"重复行数：{df.duplicated().sum()}")

print("\n数据基本统计：")
df.describe()


In [None]:
# 数据清洗
def clean_data(df):
    # 复制数据框
    df_clean = df.copy()
    
    # 1. 处理时间列
    df_clean['订单创建时间'] = pd.to_datetime(df_clean['订单创建时间'])
    df_clean['订单付款时间'] = pd.to_datetime(df_clean['订单付款时间'])
    
    # 2. 处理地址列
    df_clean['收货地址'] = df_clean['收货地址'].str.replace('自治区|维吾尔|回族|壮族|省', '')
    
    # 3. 添加新的特征
    # 计算订单支付时长（分钟）
    df_clean['支付时长'] = (df_clean['订单付款时间'] - df_clean['订单创建时间']).dt.total_seconds() / 60
    
    # 添加时间相关特征
    df_clean['下单年月'] = df_clean['订单创建时间'].dt.strftime('%Y-%m')
    df_clean['下单时间'] = df_clean['订单创建时间'].dt.hour
    df_clean['下单星期'] = df_clean['订单创建时间'].dt.dayofweek
    
    # 4. 计算订单状态
    df_clean['订单状态'] = '已付款'
    df_clean.loc[df_clean['订单付款时间'].isnull(), '订单状态'] = '未付款'
    df_clean.loc[df_clean['退款金额'] > 0, '订单状态'] = '已退款'
    
    # 5. 计算实际收入
    df_clean['实际收入'] = df_clean['买家实际支付金额'] - df_clean['退款金额']
    
    return df_clean

# 清洗数据
df_clean = clean_data(df)
print("清洗后的数据集信息：")
df_clean.info()

print("\n清洗后的数据样例：")
df_clean.head()


In [None]:
# 数据质量验证
def validate_data_quality(df):
    print("1. 数值范围检查")
    print("\n金额字段的范围：")
    for col in ['总金额', '买家实际支付金额', '退款金额', '实际收入']:
        print(f"{col}范围: {df[col].min():.2f} - {df[col].max():.2f}")
    
    print("\n2. 逻辑关系验证")
    # 验证实际支付金额不应大于总金额
    invalid_payment = df[df['买家实际支付金额'] > df['总金额']].shape[0]
    print(f"实际支付金额大于总金额的订单数: {invalid_payment}")
    
    # 验证退款金额不应大于实际支付金额
    invalid_refund = df[df['退款金额'] > df['买家实际支付金额']].shape[0]
    print(f"退款金额大于实际支付金额的订单数: {invalid_refund}")
    
    print("\n3. 时间合理性检查")
    # 检查付款时间是否晚于创建时间
    invalid_time = df[
        (df['订单付款时间'].notna()) & 
        (df['订单付款时间'] < df['订单创建时间'])
    ].shape[0]
    print(f"付款时间早于创建时间的订单数: {invalid_time}")
    
    print("\n4. 地址完整性检查")
    print("收货地址唯一值：")
    print(df['收货地址'].unique())

# 验证数据质量
validate_data_quality(df_clean)


In [None]:
# 第二部分：数据存储和管理(MySQL)

在这部分中，我们将：
1. 创建数据库连接
2. 设计并创建数据表
3. 导入处理后的数据
4. 编写基础查询
5. 验证数据完整性


In [None]:
# 安装并导入必要的库
import mysql.connector
from sqlalchemy import create_engine
import pymysql
import pandas as pd

# 数据库连接配置
db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'your_password',  # 请替换为你的实际密码
    'database': 'ecommerce_analysis'
}

# 创建数据库连接
def create_database():
    try:
        # 创建到MySQL服务器的连接
        conn = mysql.connector.connect(
            host=db_config['host'],
            user=db_config['user'],
            password=db_config['password']
        )
        cursor = conn.cursor()
        
        # 创建数据库
        cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_config['database']}")
        print(f"数据库 {db_config['database']} 创建成功")
        
        # 关闭连接
        cursor.close()
        conn.close()
        
    except mysql.connector.Error as err:
        print(f"错误: {err}")

# 创建数据表
def create_tables():
    try:
        # 连接到特定数据库
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor()
        
        # 创建订单表
        create_orders_table = """
        CREATE TABLE IF NOT EXISTS orders (
            订单编号 INT PRIMARY KEY,
            总金额 DECIMAL(10,2),
            买家实际支付金额 DECIMAL(10,2),
            收货地址 VARCHAR(50),
            订单创建时间 DATETIME,
            订单付款时间 DATETIME,
            退款金额 DECIMAL(10,2),
            支付时长 FLOAT,
            下单年月 VARCHAR(7),
            下单时间 INT,
            下单星期 INT,
            订单状态 VARCHAR(10),
            实际收入 DECIMAL(10,2)
        )
        """
        cursor.execute(create_orders_table)
        print("订单表创建成功")
        
        # 关闭连接
        cursor.close()
        conn.close()
        
    except mysql.connector.Error as err:
        print(f"错误: {err}")

# 执行数据库和表的创建
create_database()
create_tables()


In [None]:
# 将数据导入MySQL
def import_data_to_mysql(df):
    try:
        # 创建SQLAlchemy引擎
        engine = create_engine(
            f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}"
        )
        
        # 将DataFrame导入MySQL
        df.to_sql('orders', engine, if_exists='replace', index=False)
        print("数据成功导入MySQL")
        
        # 验证数据导入
        with engine.connect() as conn:
            result = conn.execute("SELECT COUNT(*) FROM orders").fetchone()
            print(f"导入的记录数: {result[0]}")
            
    except Exception as e:
        print(f"错误: {e}")

# 导入数据
import_data_to_mysql(df_clean)


In [None]:
# 基础SQL查询示例
def run_sql_queries():
    try:
        # 创建数据库连接
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor()
        
        # 1. 按订单状态统计订单数量和金额
        print("1. 订单状态统计：")
        cursor.execute("""
            SELECT 
                订单状态,
                COUNT(*) as 订单数量,
                SUM(总金额) as 总金额,
                AVG(总金额) as 平均金额
            FROM orders
            GROUP BY 订单状态
        """)
        print(pd.DataFrame(cursor.fetchall(), 
                         columns=['订单状态', '订单数量', '总金额', '平均金额']))
        
        # 2. 按地区统计订单数量
        print("\n2. 地区订单统计：")
        cursor.execute("""
            SELECT 
                收货地址,
                COUNT(*) as 订单数量,
                SUM(实际收入) as 总收入
            FROM orders
            GROUP BY 收货地址
            ORDER BY 订单数量 DESC
            LIMIT 10
        """)
        print(pd.DataFrame(cursor.fetchall(), 
                         columns=['收货地址', '订单数量', '总收入']))
        
        # 3. 按时间段统计订单数量
        print("\n3. 时间段订单统计：")
        cursor.execute("""
            SELECT 
                下单时间,
                COUNT(*) as 订单数量
            FROM orders
            GROUP BY 下单时间
            ORDER BY 下单时间
        """)
        print(pd.DataFrame(cursor.fetchall(), 
                         columns=['小时', '订单数量']))
        
        # 关闭连接
        cursor.close()
        conn.close()
        
    except mysql.connector.Error as err:
        print(f"错误: {err}")

# 执行SQL查询
run_sql_queries()


In [None]:
# 第二部分总结

在这部分中，我们完成了：

1. 数据库环境搭建
   - 创建了ecommerce_analysis数据库
   - 设计并创建了orders表结构

2. 数据导入
   - 将预处理后的数据成功导入MySQL
   - 验证了数据的完整性

3. 基础分析
   - 实现了订单状态统计
   - 完成了地区订单分析
   - 进行了时间维度分析

下一步，我们将进入第三部分：批处理分析(Hadoop)，在那里我们将处理更大规模的数据。
