# 构建电商数据仓库-营销活动主题分析

## 一、简介

### 1、项目介绍

#### 1.1 项目描述
- **背景：** 一家零售公司希望通过促销活动方式提升交易量，提高用户满意度。需要针对该业务过程创建营销主题，以获取活动期间的统计信息。
- **建模方法：** 使用星型模式（Star Schema）建模，包括维度表（如营销活动维度）和事实表（包括订单信息等）。
- **实验过程：** 加载数据、数据建模、指标计算。


#### 1.2 适学人群
大二年级及以上，数据仓库初学者，并且有志于从事数据开发工程师。


#### 1.3 课程基础
掌握python基础、掌握Spark基础。


### 2、任务介绍
使用星型模式（Star Schema）建模，包括维度表（如营销活动维度）和事实表（包括订单信息等），针对该业务过程创建营销主题，以获取活动期间的统计信息。

### 3、数据集介绍

整个分析会涉及以下来自业务系统的表
- order_detail_sample_data： 订单的主要信息
- order_info_sample_data: 订单的补充信息
- promotion_full_data：营销活动信息

order_detail_sample_data 包含字段如下：
- "id":StringType,订单的ID
- "amount":FloatType, 订单总价
- "item_num":IntegerType,订单中包含的商品数量
- "user_id": StringType,下单用户ID
- "order_info_id": StringType,订单其他信息ID
- "product_category_id": StringType,产品类别ID
- "product_sku_id": StringType,产品的SKU ID 
- "shop_id": StringType,商店ID
- "create_time": StringType,订单创建时间
- "update_time": StringType,订单更新时间

order_info_sample_data 包含字段如下：
- "id"： StringType，订单的补充信息ID
- "actual_amount": FloatType，实际支付总价
- "comment"： StringType，评价
- "original_amount"： FloatType, 折扣前总价
- "rating"： IntegerType，评分
- "reduce_amount"： FloatType，折扣金额
- "delevery_amount"： FloatType，运费
- "status"： StringType，订单状态
- "promotion_id"： StringType()，促销活动ID
- "user_addr"： StringType()，用户地址
- "create_time"： StringType()，订单创建时间
- "update_time"： StringType()，订单更新时间

promotion_full_data 包含字段如下：
- "id"： StringType(),营销活动ID
- "company_name"： StringType(),公司名称
- "owner_name"： StringType(), 公司所有者
- "promotion_name"： StringType(), 营销活动名称
- "is_active"： IntegerType(), 活动是否在进行中，0表示未开始，1表示进行中
- "create_time"： StringType(), 活动创建时间
- "update_time"： StringType()，活动更新时间

### 4、学习目标
- 通过本项目能够实现星型模式（Star Schema）建模
- 了解增量表的概念，以及命名方式、建模方式

## 二、导入工具包

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, date_format,md5, concat, substring, lit, sum, countDistinct, when
import os
import sys

## 三、准备工作

创建 SparkSession

In [0]:
spark = SparkSession.builder \
    .appName("exp1_promotion_analysis") \
    .getOrCreate()


（可选）解析输入参数

In [0]:
def parse_args(args):
    result = dict()
    size = len(args)
    for i in range(0, size - 1):
        if (args[i].startswith("--")) and i + 1 < size and not args[i + 1].startswith("--"):
            print('SystemLog:', args[i].strip().lower(), '=', args[i+1].strip())
            result[args[i].strip().lower()] = args[i + 1].strip()
    return result

args_map = parse_args(sys.argv)
input_path = args_map["--input_path".lower()].rstrip("*")

确定输入文件的路径

In [0]:
current_directory = input_path
ods_order_path = os.path.join(current_directory, 'exp1_ods_order_1di_extend.csv')
ods_order_detail_path = os.path.join(current_directory, 'exp1_ods_order_detail_info_1di_extend.csv')
promo_info_path = os.path.join(current_directory, 'exp1_ods_promo_info_full_extend.csv')

## 四、构建ODS层

- ODS层的作用是将业务系统的数据导入到数据仓库中，创建ODS层数据表是构建数据仓库的起点
- 在下列表的命名中，会对应数据的业务过程、数据的时间范围
- 以下Schema是介绍表ODS读取的源数据的结构，这个结构是和源数据的csv文件的header对应的

### 1、定义ODS表的表结构

定义 exp1_ods_order_1di 的 Schema


In [0]:
order_detail_schema = StructType([
    StructField("id", StringType()),
    StructField("amount", FloatType()),
    StructField("item_num", IntegerType()),
    StructField("user_id", StringType()),
    StructField("order_info_id", StringType()),
    StructField("product_category_id", StringType()),
    StructField("product_sku_id", StringType()),
    StructField("shop_id", StringType()),
    StructField("create_time", StringType()),
    StructField("update_time", StringType())
])

定义 exp1_ods_order_detail_info_1di 的 Schema

In [0]:
order_info_schema = StructType([
    StructField("id", StringType(), True),
    StructField("actual_amount", FloatType(), True),
    StructField("comment", StringType(), True),
    StructField("original_amount", FloatType(), True),
    StructField("rating", IntegerType(), True),
    StructField("reduce_amount", FloatType(), True),
    StructField("delevery_amount", FloatType(), True),
    StructField("status", StringType(), True),
    StructField("promotion_id", StringType(), True),
    StructField("user_addr", StringType(), True),
    StructField("create_time", StringType(), True),
    StructField("update_time", StringType(), True)
])

定义 exp1_ods_promo_info_full 表的 schema

In [0]:
promotion_full_schema = StructType([
    StructField("id", StringType(), True),
    StructField("company_name", StringType(), True),
    StructField("owner_name", StringType(), True),
    StructField("promotion_name", StringType(), True),
    StructField("is_active", IntegerType(), True),
    StructField("create_time", StringType(), True),
    StructField("update_time", StringType(), True)
])

### 2、加载ODS数据、保存为ODS表

构建 exp1_ods_order_1di 表
- ods 表示该表为 ods 层级
- order 表示对应创建订单业务过程
- 1di 中 “1d”表示最近1天数据，i表示是增量数据，1di就是最近1天的增量数据

In [0]:
exp1_ods_order_1di = spark.read.csv(ods_order_path, header=True)
exp1_ods_order_1di.show()

+----------+------------+---------+------+------------+----------+-------------+-----------+------------------------+
|Product ID|Product Name| Category| Brand| Description|Cost Price|Selling Price|Supplier ID|Stock Keeping Unit (SKU)|
+----------+------------+---------+------+------------+----------+-------------+-----------+------------------------+
|        P1|    Product1|Category1|Brand1|Description1|      10.5|         20.5|  Supplier1|                    SKU1|
|        P2|    Product2|Category2|Brand2|Description2|      15.0|         25.0|  Supplier2|                    SKU2|
|        P3|    Product3|Category3|Brand3|Description3|      12.0|         22.0|  Supplier3|                    SKU3|
+----------+------------+---------+------+------------+----------+-------------+-----------+------------------------+



构建 exp1_ods_order_detail_info_1di
- ods 表示该表为 ods 层级
- order_detail 表示对应创建订单业务过程，是订单的详细信息
- 1di 中 “1d”表示最近1天数据，i表示是增量数据，1di就是最近1天的增量数据

In [0]:
exp1_ods_order_detail_info_1di = spark.read.csv(ods_order_detail_path, header=True)
exp1_ods_order_detail_info_1di.show()

构建 exp1_ods_promo_info_full
- ods 表示该表为 ods 层级
- promo_info 表示对应营销活动数据，是营销活动的详细信息
- full 表示全量数据，既包含当天新增数据，也包含历史数据

In [0]:
exp1_ods_promo_info_full = spark.read.csv(promo_info_path, header=True)
exp1_ods_promo_info_full.show()

## 五、构建DIM层（维度表）

- DIM层的作用是保存业务过程中，各个参与实体的信息，例如在营销活动中，每个营销活动本身就是一个实体

构建 exp1_dim_promo_info_full 表
- dim 表示该表为 dim 层级
- promo_info 表示对应营销活动数据，是营销活动的详细信息
- full 表示全量数据，既包含当天新增数据，也包含历史数据

In [0]:
# 通过 is_active 筛选有效的记录
exp1_dim_promo_info_full = exp1_ods_promo_info_full \
    .where(col("is_active") == 1)\
    .select("id", "company_name", "owner_name", "promotion_name")
exp1_dim_promo_info_full.show()

## 六、构建DWD层（明细事实表）

- DWD层的作用是保存业务过程中的详细数据，主要用来描述业务过程的细节

构建 exp1_dwd_order_detail_1di 表
- dwd 表示该表为 dwd 层级
- order_detail 表示对应营销活动数据的详细，每个记录都代表参与营销活动的下单记录
- 1di 中 “1d”表示最近1天数据，i表示是增量数据，1di就是最近1天的增量数据

In [0]:
# 对 detail_df 转换，格式化时间信息
detail_filtered_df = exp1_ods_order_1di \
    .select(
        col("id"),
        col("create_time").alias("order_time"),
        date_format(col("create_time"), "yyyy-MM-dd").alias("order_date"),
        col("amount").alias("split_orginal_amount"),
        col("item_num"),
        col("user_id"),
        col("order_info_id").alias("order_info_id"),
        col("product_category_id"),
        col("product_sku_id"),
        col("shop_id")
    )

+---------+-----------+--------+----------+-----------+---------+------+------------+-------------+-----------------+
|ProductID|WarehouseID|Quantity| StockDate|ProductName| Category| Brand|StockQuarter|WarehouseName|WarehouseLocation|
+---------+-----------+--------+----------+-----------+---------+------+------------+-------------+-----------------+
|       P2|         W2|     150|2023-01-02|   Product2|Category2|Brand2|          Q1|  Warehouse B|      Los Angeles|
|       P1|         W1|     100|2023-01-01|   Product1|Category1|Brand1|          Q1|  Warehouse A|         New York|
|       P3|         W1|     200|2023-01-03|   Product3|Category3|Brand3|          Q1|  Warehouse A|         New York|
+---------+-----------+--------+----------+-----------+---------+------+------------+-------------+-----------------+



In [0]:
# 对info_df进行筛选和转换
info_filtered_df = exp1_ods_order_detail_info_1di \
    .select(
        col("id"),
        col("promotion_id"),
        col("original_amount"),
        col("reduce_amount"),
        col("actual_amount"),
        col("rating")
    )

In [0]:
# 将订单表和订单详细信息表连接，构成一张完整描述订单信息的 dwd 表
exp1_dwd_order_detail_1di = detail_filtered_df \
    .join(info_filtered_df, detail_filtered_df["order_info_id"] == info_filtered_df["id"], "left") \
    .select(
        detail_filtered_df["id"],
        detail_filtered_df["order_time"],
        detail_filtered_df["order_date"],
        detail_filtered_df["split_orginal_amount"],
        info_filtered_df["reduce_amount"].alias("split_reduce_amount"),
        info_filtered_df["actual_amount"].alias("split_actual_amount"),
        detail_filtered_df["item_num"],
        detail_filtered_df["user_id"],
        detail_filtered_df["order_info_id"],
        detail_filtered_df["product_category_id"],
        detail_filtered_df["product_sku_id"],
        detail_filtered_df["shop_id"],
        info_filtered_df["promotion_id"],
        info_filtered_df["rating"],
        detail_filtered_df["order_date"]
    )

In [0]:
# 显示DWD表的结果
exp1_dwd_order_detail_1di.show()

## 七、构建DWS层

- DWS层的作用是保存业务实体参与业务过程的聚合数据，在本实验中就是统计营销活动的一些指标

对 exp1_dwd_order_detail_1di 表执行聚合操作
- dws 表示该表为 dws 层级
- order_detail 表示对应营销活动数据，每个记录都代表营销活动的聚合统计信息
- 1di 中 “1d”表示最近1天数据，i表示是增量数据，1di就是最近1天的增量数据

In [0]:
# 按照 promotion_id 聚合，得到各种指标
order_detail_agg_df = exp1_dwd_order_detail_1di \
    # 只筛选促销活动相关的订单记录
    .where(col("promotion_id").isNotNull()) \
    .groupby("promotion_id") \
    .agg(
        # 活动总折扣金额
        sum("split_reduce_amount").alias("total_reduce_amount"),
        # 活动订单数
        countDistinct(when(col("promotion_id").isNotNull(), col("order_info_id"))).alias("total_activity_order_count"),
        # 参与活动的店铺数量
        countDistinct(when(col("promotion_id").isNotNull(), col("shop_id"))).alias("total_activity_shop_count"),
        # 参与活动的用户数量
        countDistinct(when(col("promotion_id").isNotNull(), col("user_id"))).alias("total_activity_user_count"),
        # 参与活动的商品SKU数量
        countDistinct(when(col("promotion_id").isNotNull(), col("product_sku_id"))).alias("total_activity_sku_count"),
        # 活动评分总和
        sum(when(col("promotion_id").isNotNull(), col("rating"))).alias("total_activity_rating_amount"),
        # 活动售卖商品数量
        sum(when(col("promotion_id").isNotNull(), col("item_num"))).alias("total_item_num")
    )

In [0]:
# 从维度表中获取活动相关的维度信息
promotion_df = exp1_dim_promo_info_full \
    .select(
        col("id"),
        col("company_name"),
        col("owner_name"),
        col("promotion_name")
    )

In [0]:
# 连接order_detail_agg_df和promotion_df
exp1_dws_trade_promo_order_1di = order_detail_agg_df.join(promotion_df, order_detail_agg_df["promotion_id"] == promotion_df["id"], "inner") \
    .select(
        order_detail_agg_df["promotion_id"],
        promotion_df["company_name"],
        promotion_df["promotion_name"],
        order_detail_agg_df["total_reduce_amount"],
        order_detail_agg_df["total_activity_order_count"],
        order_detail_agg_df["total_activity_user_count"],
        order_detail_agg_df["total_activity_shop_count"],
        order_detail_agg_df["total_activity_sku_count"],
        # 计算活动的平均得分
        (order_detail_agg_df["total_activity_rating_amount"] / order_detail_agg_df["total_activity_order_count"]).alias("avg_activity_order_rating"),
        # 计算活动期间每个订单的平均商品数量
        (order_detail_agg_df["total_item_num"] / order_detail_agg_df["total_activity_order_count"]).alias("avg_activity_order_item_num")
    )

In [0]:
# 展示结果
exp1_dws_trade_promo_order_1di.show()

## 八、构建ADS层

- ADS 层的作用是保存业务实体参与业务过程的所有聚合数据
- 如果有多张 DWS 表，那么需要将这些ADS表按照分析主题（或者分析目标）聚合，在本实验中只有一张DWS表，所以不需要和其他表聚合

In [0]:
# 将 DWS 中的数据加载到 ADS 表中，在本实验中只有一张DWS表，所以不需要和其他表聚合
exp1_ads_trade_promo_stats_1di = exp1_dws_trade_promo_order_1di.select(
                      lit("2023-04-01").alias("dt"),
                      "promotion_id",
                      "company_name",
                      "promotion_name",
                      "total_reduce_amount",
                      "total_activity_order_count",
                      "total_activity_user_count",
                      "total_activity_shop_count",
                      "total_activity_sku_count",
                      "avg_activity_order_rating",
                      "avg_activity_order_item_num")

In [0]:
# 显示结果
exp1_ads_trade_promo_stats_1di.show()

## 九、停止 SparkSession

In [0]:
spark.stop()