### 一、加载数据集

In [1]:
import os
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("RetailStatistics")\
    .getOrCreate()

In [3]:
root_path = "D:\\大三下\\Big_Data_Application_Spark\\datasets\\"
retail_path = os.path.join(root_path, "data.json")
# dropna删除缺失值，thresh =1【至少有一个】
# 过滤单笔订单过大的数据【这些数据是测试数据】
df = spark.read.\
    json(retail_path).\
    dropna(thresh=1,subset=["storeProvince"]).\
    filter("storeProvince != 'null'") \
    .filter("receivable < 10000") \
    .select("storeProvince", "storeID", "receivable", "dateTS", "payType")

In [5]:
df.show(5)

+--------------+-------+----------+-------------+-------+
| storeProvince|storeID|receivable|       dateTS|payType|
+--------------+-------+----------+-------------+-------+
|        湖南省|   4064|      22.5|1563758583000| alipay|
|        湖南省|    718|       7.0|1546737450000| alipay|
|        湖南省|   1786|      10.0|1546478081000|   cash|
|        广东省|   3702|      10.5|1559133703000| wechat|
|广西壮族自治区|   1156|      10.0|1548594458000|   cash|
+--------------+-------+----------+-------------+-------+
only showing top 5 rows



### 二、任务要求
#### 2.1 各省销售指标每个省份的总销售额统计

In [7]:
from pyspark.sql import functions as F
# withColumn("money", F.round("money", 2)) 保留两位小数
province_sale_df = df.groupBy("storeProvince") \
        .sum("receivable") \
        .withColumnRenamed("sum(receivable)", "money") \
        .withColumn("money", F.round("money", 2)) \
        .orderBy("money", ascending=False)

In [8]:
province_sale_df.show(4)

+--------------+----------+
| storeProvince|     money|
+--------------+----------+
|        广东省|1713207.92|
|        湖南省|1701303.53|
|广西壮族自治区|  37828.22|
|        北京市|  10926.91|
+--------------+----------+
only showing top 4 rows



In [12]:
# 数据存储MySQL
# java.sql.SQLException: No suitable driver【缺少mysql的jar包，复制jdk1.8.0_261\jre\lib\ext目录】
# useUnicode=true 使用统一编码
# option(k, v)
# k = dbtable, v = 表名
province_sale_df.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/db2?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true") \
    .option("dbtable", "province_sale") \
    .option("user", "root") \
    .option("password", "123456") \
    .option("encode", "utf8") \
    .save()

#### 2.2 TOP5 销售省份中有多少家店铺日销售额 1000+(有过达到的即可)
* 每个省份得出一个满足条件的店铺统计（也可能存在该省份没有满足条件的店铺）

In [11]:
# 2.2.1 先获取TOP5销售省份
top5_province_sale_df = province_sale_df.limit(5).select("storeProvince").withColumnRenamed("storeProvince", "top5_province")

In [17]:
# 2.2.2 与原始的DF进行内连接操作，查出所有的销售数据
top5_province_join_df = df.join(top5_province_sale_df, on=df["storeProvince"]==top5_province_sale_df["top5_province"])

In [18]:
top5_province_join_df.show(3)

+-------------+-------+----------+-------------+-------+-------------+
|storeProvince|storeID|receivable|       dateTS|payType|top5_province|
+-------------+-------+----------+-------------+-------+-------------+
|       湖南省|   4064|      22.5|1563758583000| alipay|       湖南省|
|       湖南省|    718|       7.0|1546737450000| alipay|       湖南省|
|       湖南省|   1786|      10.0|1546478081000|   cash|       湖南省|
+-------------+-------+----------+-------------+-------+-------------+
only showing top 3 rows



In [63]:
# 查看storeID==1546这个店铺每天的交易次数
top5_province_join_df.filter("storeID==1546").groupBy(F.from_unixtime(df["dateTS"].substr(0,10), "yyyy-MM-dd").alias("day")).count().show()

+----------+-----+
|       day|count|
+----------+-----+
|2019-06-19|    1|
|2019-08-04|    1|
|2018-12-13|    1|
|2019-06-07|    2|
|2019-08-07|    2|
|2019-01-13|    1|
|2019-01-19|    1|
|2019-04-12|    1|
|2019-04-13|    1|
|2018-12-08|    3|
|2019-01-08|    2|
|2018-12-16|    1|
|2019-03-03|    1|
|2019-06-17|    1|
|2018-12-24|    1|
|2019-01-18|    1|
|2019-01-05|    1|
|2019-06-02|    1|
|2019-07-23|    1|
|2019-02-27|    1|
+----------+-----+
only showing top 20 rows



In [69]:
top5_province_join_df.groupBy("storeID", F.from_unixtime(df["dateTS"].substr(0,10), "yyyy-MM-dd").alias("day"))\
    .sum("receivable").withColumnRenamed("sum(receivable)", "money")\
    .filter("storeID==1546").show(4)



top5_province_join_df.groupBy("storeID", F.from_unixtime(df["dateTS"].substr(0,10), "yyyy-MM-dd").alias("day"))\
    .sum("receivable").withColumnRenamed("sum(receivable)", "money")\
    .filter("storeID==1546").dropDuplicates(subset=["storeID"]).show()
    # 存在多个相同的storeID时，只会选择第一个

+-------+----------+-----+
|storeID|       day|money|
+-------+----------+-----+
|   1546|2019-06-07|713.0|
|   1546|2018-12-13| 50.0|
|   1546|2019-01-13|145.0|
|   1546|2019-08-07| 24.0|
+-------+----------+-----+
only showing top 4 rows

+-------+----------+-----+
|storeID|       day|money|
+-------+----------+-----+
|   1546|2019-06-07|713.0|
+-------+----------+-----+



In [72]:
# 2.2.3 根据店铺分组后再根据天进行分组统计销售额
# from_unixtime 的精度是秒级
top5_province_join_df.groupBy("storeID", F.from_unixtime(df["dateTS"].substr(0,10), "yyyy-MM-dd").alias("day"))\
    .sum("receivable").withColumnRenamed("sum(receivable)", "money")\
    .groupBy("storeID")\
    .avg("money").withColumnRenamed("avg(money)", "avg_money")\
    .filter("avg_money > 1000")\
    .show()

+-------+------------------+
|storeID|         avg_money|
+-------+------------------+
|   3243|            2105.0|
|   1460|            1903.0|
|   3737|1600.6666666666667|
|   2653|            1178.5|
|    915|            1850.0|
|   2555|           3294.98|
|   2177|            1242.0|
|    407|          1298.125|
|   1595|            1696.0|
|    769|1499.3333333333333|
|   3247|            1650.5|
|   1475|            1027.0|
|   1926|1763.7142857142858|
|   3288|            3000.0|
|   3454|            2258.0|
|   2118| 1209.321739130435|
|   3654|            4409.0|
|   3604|            1288.0|
|   1865|          2701.098|
|   3975|            1050.0|
+-------+------------------+
only showing top 20 rows



In [73]:
top5_province_join_df.groupBy("storeProvince", "storeID", F.from_unixtime(df["dateTS"].substr(0,10), "yyyy-MM-dd").alias("day"))\
    .sum("receivable").withColumnRenamed("sum(receivable)", "money")\
    .groupBy("storeID", "storeProvince")\
    .avg("money").withColumnRenamed("avg(money)", "avg_money")\
    .filter("avg_money > 1000")\
    .groupBy("storeProvince")\
    .count()\
    .show()

+-------------+-----+
|storeProvince|count|
+-------------+-----+
|       湖南省|    9|
|       广东省|   17|
|       上海市|    1|
+-------------+-----+



In [32]:
top5_province_join_df.groupBy("storeProvince", "storeID", F.from_unixtime(df["dateTS"].substr(0,10), "yyyy-MM-dd").alias("day"))\
    .sum("receivable").withColumnRenamed("sum(receivable)", "money")\
    .filter("money > 1000")\
    .dropDuplicates(subset=["storeID"])\
    .groupBy("storeProvince")\
    .count()\
    .show()

+--------------+-----+
| storeProvince|count|
+--------------+-----+
|广西壮族自治区|    3|
|        湖南省|   97|
|        广东省|  105|
|        上海市|    2|
+--------------+-----+



* 另一种求解方法

In [158]:
from pyspark.sql.functions import mean, min, max, to_date
day = to_date(F.from_unixtime(df["dateTS"].substr(0,10), "yyyy-MM-dd")).alias("day")
top5_province_join_df1 = top5_province_join_df.select("storeProvince", "storeID", "receivable", day, "payType")
top5_province_join_df1.agg(min("day")).show()

+----------+
|  min(day)|
+----------+
|1970-01-02|
+----------+



In [None]:
store_date_df = top5_province_join_df1\
    .groupBy("storeID")\
    .agg(
    min("day").name("start_date"),
    max("day").name("end_date")
)

In [126]:
store_date_df.show()

+-------+----------+----------+
|storeID|start_date|  end_date|
+-------+----------+----------+
|    121|2019-03-18|2019-04-12|
|    145|2018-11-20|2018-12-07|
|    154|2018-12-01|2019-08-01|
|    200|2018-11-17|2269-12-30|
|    209|2018-11-20|2018-12-28|
|    214|2019-02-18|2019-02-18|
|    219|2019-01-27|2019-05-28|
|    234|2018-12-04|2018-12-20|
|    248|2018-11-18|2019-06-08|
|    277|2018-11-17|2019-08-24|
|    283|2019-03-06|2019-07-01|
|    284|2018-11-23|2019-07-28|
|    285|2019-01-08|2019-02-12|
|    287|2018-12-17|2019-07-21|
|    292|2019-02-12|2019-07-11|
|    299|2018-12-01|2019-08-15|
|    300|2018-11-23|2019-07-28|
|    301|2018-12-01|2032-02-16|
|    304|2018-11-19|2019-08-13|
|    313|2018-11-27|2019-08-16|
+-------+----------+----------+
only showing top 20 rows



In [144]:
from pyspark.sql.functions import to_date, to_timestamp, date_format

store_date_df.withColumn("day_nums", col=(to_date(store_date_df["end_date"])- to_date(store_date_df["start_date"]))).show()

+-------+----------+----------+--------------------+
|storeID|start_date|  end_date|            day_nums|
+-------+----------+----------+--------------------+
|    121|2019-03-18|2019-04-12|   INTERVAL '25' DAY|
|    145|2018-11-20|2018-12-07|   INTERVAL '17' DAY|
|    154|2018-12-01|2019-08-01|  INTERVAL '243' DAY|
|    200|2018-11-17|2269-12-30|INTERVAL '91719' DAY|
|    209|2018-11-20|2018-12-28|   INTERVAL '38' DAY|
|    214|2019-02-18|2019-02-18|    INTERVAL '0' DAY|
|    219|2019-01-27|2019-05-28|  INTERVAL '121' DAY|
|    234|2018-12-04|2018-12-20|   INTERVAL '16' DAY|
|    248|2018-11-18|2019-06-08|  INTERVAL '202' DAY|
|    277|2018-11-17|2019-08-24|  INTERVAL '280' DAY|
|    283|2019-03-06|2019-07-01|  INTERVAL '117' DAY|
|    284|2018-11-23|2019-07-28|  INTERVAL '247' DAY|
|    285|2019-01-08|2019-02-12|   INTERVAL '35' DAY|
|    287|2018-12-17|2019-07-21|  INTERVAL '216' DAY|
|    292|2019-02-12|2019-07-11|  INTERVAL '149' DAY|
|    299|2018-12-01|2019-08-15|  INTERVAL '257

#### 2.3 TOP5 省份中各个省份的平均单单价

In [80]:
top5_province_join_df.groupBy("storeProvince")\
        .avg("receivable")\
        .withColumnRenamed("avg(receivable)", "avg_price") \
        .withColumn("avg_price", F.round("avg_price", 2)) \
        .orderBy("avg_price", ascending=False)\
        .show()

+--------------+---------+
| storeProvince|avg_price|
+--------------+---------+
|        上海市|   613.21|
|广西壮族自治区|    40.03|
|        湖南省|    36.86|
|        广东省|    32.81|
|        北京市|    25.95|
+--------------+---------+



#### 2.4 TOP5 省份中各个省份的支付类型比例

In [81]:
# 2.4.1 创建一个临时视图
top5_province_join_df.createTempView("province_pay")

In [82]:
def udf_percent(percent):
    return str(round(percent*100, 2)) + "%"

In [91]:
spark.sql("""
select storeProvince, payType, count(1) over(partition by storeProvince) as total from province_pay
""").show(3)

+-------------+-------+-----+
|storeProvince|payType|total|
+-------------+-------+-----+
|       上海市|   cash|   12|
|       上海市| alipay|   12|
|       上海市| wechat|   12|
+-------------+-------+-----+
only showing top 3 rows



In [87]:
from pyspark.sql.types import StringType

spark.udf.register("udf1", udf_percent, StringType())
spark.sql("""
select storeProvince, payType, udf1((count(payType)/ total)) as percent from
(select storeProvince, payType, count(1) over(partition by storeProvince) as total from province_pay) subtable
group by storeProvince, payType, total
""").show()

+--------------+--------+-------+
| storeProvince| payType|percent|
+--------------+--------+-------+
|        上海市|    cash| 33.33%|
|        上海市|  alipay|  8.33%|
|        上海市|  wechat| 58.33%|
|        北京市|    cash| 40.14%|
|        北京市|  wechat| 46.08%|
|        北京市|  alipay| 13.78%|
|        广东省|  wechat| 39.48%|
|        广东省|    cash| 52.91%|
|        广东省|bankcard|  0.72%|
|        广东省|  alipay|  6.88%|
|广西壮族自治区|    cash| 73.23%|
|广西壮族自治区|  wechat| 21.59%|
|广西壮族自治区|  alipay|  4.23%|
|广西壮族自治区|bankcard|  0.95%|
|        湖南省|  alipay|  4.24%|
|        湖南省|    cash| 70.81%|
|        湖南省|  wechat| 24.88%|
|        湖南省|bankcard|  0.06%|
+--------------+--------+-------+

