In [74]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, when, unix_timestamp, count

spark = SparkSession.builder.appName("Luchen").getOrCreate()

customers_df = spark.read.csv("olist_customers_dataset.csv",header=True, inferSchema=True)
geolocation_df = spark.read.csv("olist_geolocation_dataset.csv",header=True, inferSchema=True)
items_df = spark.read.csv("olist_order_items_dataset.csv",header=True, inferSchema=True)
payments_df = spark.read.csv("olist_order_payments_dataset.csv",header=True, inferSchema=True)
reviews_df = spark.read.csv("olist_order_reviews_dataset.csv",header=True, inferSchema=True)
orders_df = spark.read.csv("olist_orders_dataset.csv",header=True, inferSchema=True)
products_df = spark.read.csv("olist_products_dataset.csv",header=True, inferSchema=True)
sellers_df = spark.read.csv("olist_sellers_dataset.csv",header=True, inferSchema=True)
customers_df = spark.read.csv("olist_customers_dataset.csv",header=True, inferSchema=True)
translation_df = spark.read.csv("product_category_name_translation.csv", header=True, inferSchema=True)

In [75]:

# 按照 review_score 分组并计算每个评分值的出现次数
score_counts = reviews_df.groupBy("review_score").count().orderBy("review_score")

# 重命名 count 列为 pic1_df 的特征 'count'
pic1_df = score_counts.withColumnRenamed("count", "count").withColumnRenamed("review_score", "review_score")

# 如果需要确保只包含评分值 1, 2, 3, 4, 5，可以过滤数据
pic1_df = pic1_df.filter(col("review_score").isin([1, 2, 3, 4, 5]))

from pyspark.sql import functions as F

# 假设你的DataFrame名为df，并且有一个名为"count"的列
# 首先，计算count列的总和
count_sum = pic1_df.select(F.sum("count").alias("count_sum")).collect()[0]["count_sum"]

# 然后，创建一个新列percentage，它是count列的值除以count列的总和
pic1_df = pic1_df.withColumn("percentage", F.col("count") / F.lit(count_sum))

# 查看结果
pic1_df.show()



+------------+-----+-------------------+
|review_score|count|         percentage|
+------------+-----+-------------------+
|           1|11424| 0.1151334354591631|
|           2| 3151|0.03175642989599291|
|           3| 8179| 0.0824296541159397|
|           4|19142|0.19291703620091913|
|           5|57328| 0.5777634443279852|
+------------+-----+-------------------+



In [76]:
# 评分分布饼图

# 将结果转换为Pandas DataFrame
pic1 = pic1_df.toPandas()

# 准备数据用于饼图
score = pic1['review_score'].tolist()
count = pic1['count'].tolist()
percentage = pic1['percentage'].tolist()

# 创建饼图
data_pair = [list(z) for z in zip(score, count)]
rich_data = [
    {"name": score, "value": count, "percent": f"{percentage:.2f}%"}
    for state, value, percentage in zip(score, count, percentage)
]

c = (
    Pie()
    .add(
        "",
        data_pair,
        radius=["30%", "60%"],
        label_opts=opts.LabelOpts(
            position="outside",
          formatter="{a|{b}}{abg|}\n{hr|}\n {c}{per|{d}%}  ",
            background_color="#eee",
            border_color="#aaa",
            border_width=1,
            border_radius=4,
            rich={
                "a": {"color": "#999", "lineHeight": 22, "align": "center"},
                "abg": {
                    "backgroundColor": "#e3e3e3",
                    "width": "100%",
                    "align": "right",
                    "height": 22,
                    "borderRadius": [4, 4, 0, 0],
                },
                "hr": {
                    "borderColor": "#aaa",
                    "width": "100%",
                    "borderWidth": 0.5,
                    "height": 0,
                },
                "b": {"fontSize": 16, "lineHeight": 33},
                "per": {
                    "color": "#eee",
                    "backgroundColor": "#334455",
                    "padding": [2, 4],
                    "borderRadius": 2,
                },
            },
        ),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
)

# 渲染图表
c.render_notebook()


In [24]:
# 一周内的销量分布图
from pyspark.sql import functions as F

# 假设你的DataFrame名为order_df，并且有一个名为"order_purchase_timestamp"的列
# 将"order_purchase_timestamp"列转换为周几
orders_df = orders_df.withColumn("day_of_week", F.date_format("order_purchase_timestamp", "EEEE"))

# 按照 review_score 分组并计算每个评分值的出现次数
seller_counts = orders_df.groupBy("day_of_week").count().orderBy("day_of_week")

# 重命名 count 列为 pic1_df 的特征 'count'
pic2_df = seller_counts.withColumnRenamed("count", "count").withColumnRenamed("day_of_week", "day_of_week")

# 如果需要确保只包含评分值 1, 2, 3, 4, 5，可以过滤数据
pic2_df = pic2_df.filter(col("day_of_week").isin(['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']))

# 创建一个辅助列用于排序，按照周一到周日的顺序
pic2_df = pic2_df.withColumn("day_order", F.when(F.col("day_of_week") == "Monday", 1)
                                      .when(F.col("day_of_week") == "Tuesday", 2)
                                      .when(F.col("day_of_week") == "Wednesday", 3)
                                      .when(F.col("day_of_week") == "Thursday", 4)
                                      .when(F.col("day_of_week") == "Friday", 5)
                                      .when(F.col("day_of_week") == "Saturday", 6)
                                      .when(F.col("day_of_week") == "Sunday", 7))

# 按照辅助列排序
pic2_df = pic2_df.orderBy("day_order")

pic2_df.show(5)

+-----------+-----+---------+
|day_of_week|count|day_order|
+-----------+-----+---------+
|     Monday|16196|        1|
|    Tuesday|15963|        2|
|  Wednesday|15552|        3|
|   Thursday|14761|        4|
|     Friday|14122|        5|
+-----------+-----+---------+
only showing top 5 rows



In [25]:
from pyecharts.charts import Bar
from pyecharts import options as opts

# 将结果转换为Pandas DataFrame
pic2 = pic2_df.toPandas()

# 创建Bar实例
bar_chart = (
    Bar()
    .add_xaxis(pic2["day_of_week"].tolist())
    .add_yaxis("销量", pic2["count"].tolist())
    .set_global_opts(
        title_opts=opts.TitleOpts(title="周内销量统计"),
        xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-45)),
        yaxis_opts=opts.AxisOpts(name="销量"),
    )
)

# 渲染图表
bar_chart.render_notebook()

In [60]:
from pyspark.sql.functions import to_date, month, count
from pyecharts.charts import Line

# 提取月份
orders_df = orders_df.withColumn("purchase_month", month(to_date("order_purchase_timestamp", "yyyy-MM-dd HH:mm:ss")))

# 统计每个月的订单数量
pic3_df = orders_df.groupBy("purchase_month").agg(count("order_id").alias("count"))

pic3_df = pic3_df.orderBy(pic3_df["purchase_month"].asc())


# 将结果转换为Pandas DataFrame
pic3 = pic3_df.toPandas()

x_data = ["一月", "二月", "三月", "四月", "五月", "六月", "七月", "八月", "九月","十月","十一月","十二月",]

# 创建Line实例
line_chart = (
    Line()
    .add_xaxis(x_data)
    .add_yaxis("", pic3["count"].tolist())
    .set_global_opts(
        title_opts=opts.TitleOpts(title="每月销量统计"),
        xaxis_opts=opts.AxisOpts(name="月份"),
        yaxis_opts=opts.AxisOpts(name="订单数")
    )
)

# 渲染图表
line_chart.render_notebook()

In [51]:

# 按照 review_score 分组并计算每个评分值的出现次数
status_counts = orders_df.groupBy("order_status").count().orderBy("order_status")

# 重命名 count 列为 pic1_df 的特征 'count'
pic4_df = status_counts.withColumnRenamed("count", "count").withColumnRenamed("order_status", "order_status")

# 如果需要确保只包含评分值 1, 2, 3, 4, 5，可以过滤数据
pic4_df = pic4_df.filter(col("order_status").isin(["approved", "canceled", "created", "delivered", "invoiced", "processing", "shipped", "unavailable"]))


# 查看结果
pic4_df.show()


+------------+-----+
|order_status|count|
+------------+-----+
|    approved|    2|
|    canceled|  625|
|     created|    5|
|   delivered|96478|
|    invoiced|  314|
|  processing|  301|
|     shipped| 1107|
| unavailable|  609|
+------------+-----+

+-------+------------+---------------+
|summary|order_status|          count|
+-------+------------+---------------+
|  count|           8|              8|
|   mean|        NULL|      12430.125|
| stddev|        NULL|33962.409061606|
|    min|    approved|              2|
|    max| unavailable|          96478|
+-------+------------+---------------+



In [52]:
# 评分分布饼图

# 将结果转换为Pandas DataFrame
pic4 = pic4_df.toPandas()

# 准备数据用于饼图
status = pic4['order_status'].tolist()
count = pic4['count'].tolist()

# 创建饼图
data_pair = [list(z) for z in zip(status, count)]
rich_data = [
    {"name": status, "value": count, "percent": f"{percentage:.2f}%"}
    for state, value, percentage in zip(score, count, percentage)
]

c = (
    Pie()
    .add(
        "",
        data_pair,
        radius=["30%", "60%"],
        label_opts=opts.LabelOpts(
            position="outside",
          formatter="{a|{b}}{abg|}\n{hr|}\n {c}{per|{d}%}  ",
            background_color="#eee",
            border_color="#aaa",
            border_width=1,
            border_radius=4,
            rich={
                "a": {"color": "#999", "lineHeight": 22, "align": "center"},
                "abg": {
                    "backgroundColor": "#e3e3e3",
                    "width": "100%",
                    "align": "right",
                    "height": 22,
                    "borderRadius": [4, 4, 0, 0],
                },
                "hr": {
                    "borderColor": "#aaa",
                    "width": "100%",
                    "borderWidth": 0.5,
                    "height": 0,
                },
                "b": {"fontSize": 16, "lineHeight": 33},
                "per": {
                    "color": "#eee",
                    "backgroundColor": "#334455",
                    "padding": [2, 4],
                    "borderRadius": 2,
                },
            },
        ),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
)

# 渲染图表
c.render_notebook()


In [56]:
joined_df = reviews_df.join(orders_df, on="order_id")

# 筛选出order_status为"delivered"的数据
filtered_df1 = joined_df.filter(col("order_status") == "delivered")
filtered_df2 = joined_df.filter(col("order_status") != "delivered")


# 按照 review_score 分组并计算每个评分值的出现次数
score_counts = filtered_df1.groupBy("review_score").count().orderBy("review_score")

# 重命名 count 列为 pic1_df 的特征 'count'
pic5_df = score_counts.withColumnRenamed("count", "count").withColumnRenamed("review_score", "review_score")

score_counts = filtered_df2.groupBy("review_score").count().orderBy("review_score")
pic6_df = score_counts.withColumnRenamed("count", "count").withColumnRenamed("review_score", "review_score")


# 如果需要确保只包含评分值 1, 2, 3, 4, 5，可以过滤数据
pic5_df = pic5_df.filter(col("review_score").isin([1, 2, 3, 4, 5]))
pic6_df = pic6_df.filter(col("review_score").isin([1, 2, 3, 4, 5]))


# 查看结果
pic5_df.show(5)
pic6_df.show(5)

+------------+-----+
|review_score|count|
+------------+-----+
|           1| 9406|
|           2| 2941|
|           3| 7961|
|           4|18987|
|           5|57066|
+------------+-----+

+------------+-----+
|review_score|count|
+------------+-----+
|           1| 2018|
|           2|  210|
|           3|  218|
|           4|  155|
|           5|  262|
+------------+-----+



In [58]:
# 评分分布饼图

# 将结果转换为Pandas DataFrame
pic5 = pic5_df.toPandas()

# 准备数据用于饼图
score = pic5['review_score'].tolist()
count = pic5['count'].tolist()

# 创建饼图
data_pair = [list(z) for z in zip(score, count)]
rich_data = [
    {"name": score, "value": count, "percent": f"{percentage:.2f}%"}
    for state, value, percentage in zip(score, count, percentage)
]

c = (
    Pie()
    .add(
        "",
        data_pair,
        radius=["30%", "60%"],
        label_opts=opts.LabelOpts(
            position="outside",
          formatter="{a|{b}}{abg|}\n{hr|}\n {c}{per|{d}%}  ",
            background_color="#eee",
            border_color="#aaa",
            border_width=1,
            border_radius=4,
            rich={
                "a": {"color": "#999", "lineHeight": 22, "align": "center"},
                "abg": {
                    "backgroundColor": "#e3e3e3",
                    "width": "100%",
                    "align": "right",
                    "height": 22,
                    "borderRadius": [4, 4, 0, 0],
                },
                "hr": {
                    "borderColor": "#aaa",
                    "width": "100%",
                    "borderWidth": 0.5,
                    "height": 0,
                },
                "b": {"fontSize": 16, "lineHeight": 33},
                "per": {
                    "color": "#eee",
                    "backgroundColor": "#334455",
                    "padding": [2, 4],
                    "borderRadius": 2,
                },
            },
        ),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
)

# 渲染图表
c.render_notebook()


In [59]:
# 评分分布饼图

# 将结果转换为Pandas DataFrame
pic6 = pic6_df.toPandas()

# 准备数据用于饼图
score = pic6['review_score'].tolist()
count = pic6['count'].tolist()

# 创建饼图
data_pair = [list(z) for z in zip(score, count)]
rich_data = [
    {"name": score, "value": count, "percent": f"{percentage:.2f}%"}
    for state, value, percentage in zip(score, count, percentage)
]

c = (
    Pie()
    .add(
        "",
        data_pair,
        radius=["30%", "60%"],
        label_opts=opts.LabelOpts(
            position="outside",
          formatter="{a|{b}}{abg|}\n{hr|}\n {c}{per|{d}%}  ",
            background_color="#eee",
            border_color="#aaa",
            border_width=1,
            border_radius=4,
            rich={
                "a": {"color": "#999", "lineHeight": 22, "align": "center"},
                "abg": {
                    "backgroundColor": "#e3e3e3",
                    "width": "100%",
                    "align": "right",
                    "height": 22,
                    "borderRadius": [4, 4, 0, 0],
                },
                "hr": {
                    "borderColor": "#aaa",
                    "width": "100%",
                    "borderWidth": 0.5,
                    "height": 0,
                },
                "b": {"fontSize": 16, "lineHeight": 33},
                "per": {
                    "color": "#eee",
                    "backgroundColor": "#334455",
                    "padding": [2, 4],
                    "borderRadius": 2,
                },
            },
        ),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
)

# 渲染图表
c.render_notebook()


In [99]:
from pyspark.sql.functions import col, avg, round, max, count


filter_df = reviews_df.join(orders_df, on="order_id")

filter_df = filter_df.select("order_id", "customer_id", "review_score")

filter_df = filter_df.join(customers_df, on="customer_id").withColumnRenamed("customer_city", "geolocation_city")


# 计算每个城市的客户数量和平均经纬度
city_df = geolocation_df.groupBy("geolocation_city").agg(
    round(avg(col("geolocation_lat")), 7).alias("geolocation_lat"),
   round(avg(col("geolocation_lng")), 7).alias("geolocation_lng")
 )

filter_df = filter_df.select("review_score", "geolocation_city", "customer_state")

# pic7_df = filter_df.groupBy("geolocation_city").agg(sum(col("review_score")).alias("review_score_sum"))
filter_df.show()

pic7_df = filter_df.groupBy("geolocation_city").agg(
    round(avg(col("review_score")), 2).alias("review_score"),

)

pic7_df = pic7_df.join(city_df, on="geolocation_city")
pic7_df.show()

pic8_df = filter_df.groupBy("customer_state").agg(
    round(avg(col("review_score")), 2).alias("review_score"),
)
pic8_df.show()

+------------+--------------------+--------------+
|review_score|    geolocation_city|customer_state|
+------------+--------------------+--------------+
|           4|           sao paulo|            SP|
|           4|           barreiras|            BA|
|           5|          vianopolis|            GO|
|           5|sao goncalo do am...|            RN|
|           5|         santo andre|            SP|
|           4|        congonhinhas|            PR|
|           2|          santa rosa|            RS|
|           5|           nilopolis|            RJ|
|           1|        faxinalzinho|            RS|
|           5|            sorocaba|            SP|
|           1|      rio de janeiro|            RJ|
|           4|           sao paulo|            SP|
|           5|          ouro preto|            MG|
|           5|           sao paulo|            SP|
|           4|           sao paulo|            SP|
|           5|             goiania|            GO|
|           5|            imbit

In [91]:
from pyecharts.charts import Geo
import math

# 将结果转换为Pandas DataFrame
pic7 = pic7_df.toPandas()

# 创建Geo实例
geo_chart = (
    Geo()
    .add_schema(maptype="巴西",is_roam=False,zoom=1.2)
    .set_global_opts(
        title_opts=opts.TitleOpts(title="评分城市分布图"),
       visualmap_opts=opts.VisualMapOpts(max_=5),
    )
)

# 添加数据点
for index, row in pic7.iterrows():
    geo_chart.add_coordinate(row['geolocation_city'], row['geolocation_lng'], row['geolocation_lat'])
    geo_chart.add(
        "",
        [(row['geolocation_city'], row['review_score'])],
        type_="scatter",
        symbol_size=math.log(row['review_score']*row['review_score']),  # 根据客户数量调整符号大小
    )

# 渲染地图
geo_chart.render_notebook()

In [100]:
# 将结果转换为Pandas DataFrame
pic8 = pic8_df.toPandas()
pic8 = pic8.sort_values(by="review_score", ascending=False)

# 创建Bar实例
bar_chart = (
    Bar()
    .add_xaxis(pic8["customer_state"].tolist())
    .add_yaxis("评分", pic8["review_score"].tolist())
    .set_global_opts(
        title_opts=opts.TitleOpts(title="州平均评分"),
        xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-45)),
        yaxis_opts=opts.AxisOpts(name="评分"),
    )
)

# 渲染图表
bar_chart.render_notebook()

In [115]:
from pyspark.sql import functions as F

# 假设你的DataFrame名为reviews_df，并且有两个名为"review_answer_timestamp"和"review_creation_date"的列
# 将日期特征转换为Unix时间戳
df = orders_df.withColumn("carrier_unix", F.unix_timestamp("order_delivered_carrier_date", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("customer_unix", F.unix_timestamp("order_delivered_customer_date", "yyyy-MM-dd HH:mm:ss"))

# 计算两个时间戳之间的差值，单位为天
df = df.withColumn("time_diff", (F.col("customer_unix") - F.col("carrier_unix"))/86400.0)

df = df.join(reviews_df, on="order_id").select("review_score", "customer_id", "time_diff")
df = df.join(customers_df, on="customer_id").select("review_score", "time_diff", "customer_state")

df = df.filter(col("time_diff") > 0)

df.describe().show()

pic9_df = df.groupBy("customer_state").agg(
    round(avg(col("review_score")), 2).alias("review_score"),
    round(avg(col("time_diff")), 2).alias("deliver_days")
)
pic9_df.show(5)

+-------+------------------+--------------------+--------------+
|summary|      review_score|           time_diff|customer_state|
+-------+------------------+--------------------+--------------+
|  count|             96326|               96326|         96326|
|   mean|4.1558665365529555|   9.303464670825583|          NULL|
| stddev| 1.284860919257161|   8.676122073344892|          NULL|
|    min|                 1|1.157407407407407...|            AC|
|    max|                 5|  205.19097222222223|            TO|
+-------+------------------+--------------------+--------------+

+--------------+------------+------------+
|customer_state|review_score|deliver_days|
+--------------+------------+------------+
|            SC|        4.13|        11.6|
|            RO|        4.17|       16.52|
|            PI|         4.0|       16.31|
|            AM|        4.22|        23.4|
|            RR|         3.9|       25.64|
+--------------+------------+------------+
only showing top 5 rows



In [120]:
# 将结果转换为Pandas DataFrame
pic9 = pic9_df.toPandas()
pic9 = pic9.sort_values(by="review_score", ascending=False)

# 创建Bar实例
bar_chart = (
    Bar()
    .add_xaxis(pic9["customer_state"].tolist())
    .add_yaxis("评分", pic9["review_score"].tolist(), yaxis_index=0)
    .add_yaxis("运送时长", pic9["deliver_days"].tolist(), yaxis_index=1)
    .extend_axis(
        yaxis=opts.AxisOpts(
            name="运送天数",
            type_="value",
            position="right",
            min_=0,
            max_=30,  # 设置Y轴最大值为30
            axisline_opts=opts.AxisLineOpts(
                linestyle_opts=opts.LineStyleOpts(color="#675bba")
            )
        )
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="州平均评分及运送时长"),
        yaxis_opts=opts.AxisOpts(
            name="评分",
            type_="value",
            position="left",
            min_=0,
            max_=10,  # 设置Y轴最大值为10
        ),
        xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-45)),
    )
)

# 渲染图表
bar_chart.render_notebook()