In [0]:
df = spark.read.parquet(f"wasbs://telematics@dln2blobdatalanding.blob.core.chinacloudapi.cn/FUEL_RANKING_R_TRIP_V4/report_date=2024060*").filter("duration = 0").select("ESN", "ReportDate", "X_AvgVehicleWeight","Trip","Delta_Dist")
df.count()

25432236

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, expr

window_spec = Window.partitionBy("ESN","ReportDate").orderBy("Trip")
combined_df = df.withColumn("PrevWeight", lag("X_AvgVehicleWeight").over(window_spec))
combined_df = combined_df.withColumn("Prevtrip", lag("Trip").over(window_spec))
combined_df_1 = combined_df.withColumn("WeightDiff", col("X_AvgVehicleWeight") - col("PrevWeight"))
combined_df_1 = combined_df_1.withColumn("Trip_difference", col("Trip") - col("Prevtrip"))
combined_df_1.display()

ESN,ReportDate,X_AvgVehicleWeight,Trip,Delta_Dist,PrevWeight,Prevtrip,WeightDiff,Trip_difference
4,20240602,,,,,,,
100,20240608,,,,,,,
123,20240601,,,,,,,
123,20240603,,,,,,,
123,20240608,,,,,,,
1068,20240606,,,,,,,
1068,20240609,,,,,,,
1069,20240606,,,,,,,
1853,20240603,,,,,,,
2774,20240601,,,,,,,


In [0]:
combined_df_2 = combined_df_1.dropna(subset=["WeightDiff"])
combined_df_3 = combined_df_2.filter(col("WeightDiff") != 0)
combined_df_3 = combined_df_3.filter(col("Trip_difference")==1)
combined_df_3.count()

3631947

In [0]:
from pyspark.sql.functions import col, avg, stddev, lit
stats_df = combined_df_3.select(avg(col("WeightDiff")).alias("mean"), stddev(col("WeightDiff")).alias("stddev")).collect()[0]
mean = stats_df["mean"]
stddev = stats_df["stddev"]
threshold = 2 * stddev
lower_bound = mean - threshold
upper_bound = mean + threshold

print(f"Mean: {mean}, Std Dev: {stddev}, Threshold: {threshold}")
print(f"Lower Bound: {lower_bound}, Upper Bound: {upper_bound}")

combined_df_4 = combined_df_3.filter((col("WeightDiff") <= lower_bound) | (col("WeightDiff") >= upper_bound))
combined_df_4.drop(col("Trip_difference")).count()

Mean: -0.37608772760175313, Std Dev: 9.298896348906608, Threshold: 18.597792697813215
Lower Bound: -18.97388042541497, Upper Bound: 18.221704970211462


169117

In [0]:
from pyspark.sql.functions import when
#在每个Delta_distance 0-10km定义为0 10-50km定义为1 大于50km定义为2 
combined_df_5 = combined_df_4.withColumn("Delta_Dist_Label",
                   when(col("Delta_Dist") <= 10000, 0)
                   .when((col("Delta_Dist") > 10000) & (col("Delta_Dist") <= 50000), 1)
                   .otherwise(2))
combined_df_5.display(100)


ESN,ReportDate,X_AvgVehicleWeight,Trip,Delta_Dist,PrevWeight,Prevtrip,WeightDiff,Trip_difference,Delta_Dist_Label
69808536,20240602,48.12399999999999,3.0,119870.0,23.392,2.0,24.73199999999999,1.0,2
71118573,20240604,28.023999999999997,5.0,177205.0,50.094000000000015,4.0,-22.07000000000002,1.0,2
71118574,20240602,57.82400000000008,4.0,265720.0,27.712,3.0,30.11200000000008,1.0,2
71119580,20240601,23.14,7.0,17955.0,43.06000000000003,6.0,-19.920000000000037,1.0,1
71122897,20240603,24.132000000000005,7.0,238210.0,65.7239999999999,6.0,-41.5919999999999,1.0,2
71122897,20240607,34.714000000000006,2.0,147670.0,67.11599999999999,1.0,-32.40199999999998,1.0,2
71123079,20240607,41.12999999999989,2.0,600195.0,22.239999999999984,1.0,18.889999999999905,1.0,2
71124568,20240607,32.346000000000025,4.0,188495.0,60.22999999999992,3.0,-27.883999999999894,1.0,2
71124941,20240603,20.06,6.0,244325.0,39.59799999999996,5.0,-19.537999999999965,1.0,2
71126045,20240603,15.082,7.0,41070.0,36.93399999999995,6.0,-21.851999999999947,1.0,1


In [0]:
#对于每个 WeightDiff进行标记 阈值是-18到18，标记[-30，-18]为-1,(18，30]为1（装货），标记[-60,-30)为-2，(30，60]为2(完成一次较重的交付)，大于-60为-3，60以上为3（完成一次大交付）
combined_df_6 = combined_df_5.withColumn("WeightDiff_Label",
                    when((col("WeightDiff") >= -30) & (col("WeightDiff") <= -18), -1)
                   .when((col("WeightDiff") > 18) & (col("WeightDiff") <= 30), 1)
                   .when((col("WeightDiff") >= -60) & (col("WeightDiff") < -30), -2)
                   .when((col("WeightDiff") > 30) & (col("WeightDiff") <= 60), 2)
                   .when(col("WeightDiff") < -60, -3)
                   .when(col("WeightDiff") > 60, 3))
result = combined_df_6.groupBy("WeightDiff_Label").count()
result.display()

WeightDiff_Label,count
-1,48318
1,43435
3,6055
-2,36337
2,27376
-3,7596


In [0]:
result_df = df.join(combined_df_6.select("ESN","WeightDiff_Label","Delta_Dist_Label"), on="ESN", how="left")
result_df.display()

ESN,ReportDate,X_AvgVehicleWeight,Trip,Delta_Dist,WeightDiff_Label,Delta_Dist_Label
1069,20240601,,,,,
37018224,20240601,,,,,
41331192,20240601,,,,,
42341502,20240601,,,,,
42509287,20240601,,,,,
69831627,20240601,,,,,
71123737,20240601,,4.0,16210.0,,
71123737,20240601,16.253999999999998,5.0,45655.0,,
71123737,20240601,14.89399999999999,3.0,39425.0,,
71123737,20240601,15.004000000000003,2.0,46100.0,,


In [0]:
oneside= combined_df_6.filter(col("Delta_Dist_Label") == 2) \
                      .filter(col("WeightDiff_Label").isin(-2, 2,3,-3)) \
                      .drop("Trip_difference")
oneside.count()

35182

In [0]:
oneside_2= combined_df_6.filter(col("Delta_Dist_Label") == 2) \
                      .filter(col("WeightDiff_Label").isin(3,-3)) \
                      .drop("Trip_difference")
oneside_2.groupBy("ESN").agg(count("ESN").alias("count_ESN")).orderBy("count_ESN", ascending=False).display()

ESN,count_ESN
77529357,13
77541571,10
77527420,10
77537822,9
76942228,8
76689372,8
76658515,8
77406064,8
77429284,7
76679922,7


In [0]:
oneside_2.filter(col("ESN") == "77529357").display()

ESN,ReportDate,X_AvgVehicleWeight,Trip,Delta_Dist,PrevWeight,Prevtrip,WeightDiff,Delta_Dist_Label,WeightDiff_Label
77529357,20240601,49.04300000000005,7.0,160225.0,151.05799999999994,6.0,-102.01499999999989,2,-3
77529357,20240601,35.45600000000001,11.0,87680.0,134.80999999999997,10.0,-99.35399999999996,2,-3
77529357,20240606,135.07600000000008,5.0,100925.0,29.297999999999988,4.0,105.77800000000008,2,3
77529357,20240606,50.335999999999984,6.0,91640.0,135.07600000000008,5.0,-84.7400000000001,2,-3
77529357,20240606,46.186000000000014,10.0,91615.0,128.254,9.0,-82.06799999999998,2,-3
77529357,20240606,144.05600000000007,11.0,185490.0,46.186000000000014,10.0,97.87000000000006,2,3
77529357,20240607,36.386,2.0,91085.0,119.29000000000003,1.0,-82.90400000000005,2,-3
77529357,20240607,166.306,3.0,188070.0,36.386,2.0,129.92000000000002,2,3
77529357,20240608,55.408,14.0,91440.0,118.53899999999994,13.0,-63.13099999999994,2,-3
77529357,20240608,40.356000000000016,18.0,58090.0,126.12800000000011,17.0,-85.7720000000001,2,-3


In [0]:
oneside_2.filter(col("ESN") == "76738076").display()

ESN,ReportDate,X_AvgVehicleWeight,Trip,Delta_Dist,PrevWeight,Prevtrip,WeightDiff,Delta_Dist_Label,WeightDiff_Label
76738076,20240604,142.33599999999996,12.0,73150.0,54.83799999999997,11.0,87.49799999999999,2,3
76738076,20240604,73.44600000000003,13.0,69555.0,142.33599999999996,12.0,-68.88999999999993,2,-3
76738076,20240604,140.35200000000003,14.0,103725.0,73.44600000000003,13.0,66.906,2,3
76738076,20240605,64.6539999999999,3.0,190585.0,128.42599999999993,2.0,-63.772000000000034,2,-3
76738076,20240605,65.25599999999997,6.0,246245.0,127.30799999999996,5.0,-62.05199999999998,2,-3


In [0]:
oneside.filter(col("ESN") == "77529357").drop(col("Trip_difference")).display()

ESN,ReportDate,X_AvgVehicleWeight,Trip,Delta_Dist,PrevWeight,Prevtrip,WeightDiff,Delta_Dist_Label,WeightDiff_Label
77529357,20240601,49.04300000000005,7.0,160225.0,151.05799999999994,6.0,-102.01499999999989,2,-3
77529357,20240601,35.45600000000001,11.0,87680.0,134.80999999999997,10.0,-99.35399999999996,2,-3
77529357,20240602,168.4,4.0,52160.0,126.07999999999996,3.0,42.32000000000004,2,2
77529357,20240602,40.207999999999984,17.0,91760.0,91.65400000000002,16.0,-51.44600000000004,2,-2
77529357,20240603,33.800000000000004,2.0,91115.0,91.58400000000007,1.0,-57.78400000000008,2,-2
77529357,20240603,29.88800000000001,6.0,91430.0,64.16200000000002,5.0,-34.27400000000001,2,-2
77529357,20240603,139.13999999999993,8.0,83990.0,105.75399999999996,7.0,33.38599999999997,2,2
77529357,20240604,73.70400000000001,4.0,96405.0,36.082,3.0,37.62200000000001,2,2
77529357,20240604,29.35599999999998,5.0,91570.0,73.70400000000001,4.0,-44.34800000000003,2,-2
77529357,20240604,53.53999999999998,7.0,173970.0,88.42699999999999,6.0,-34.887000000000015,2,-2


In [0]:
df.filter(col("ESN")=="77529357").display()

ESN,ReportDate,X_AvgVehicleWeight,Trip,Delta_Dist
77529357,20240601,32.35399999999997,4.0,51405.0
77529357,20240601,50.876000000000005,8.0,11195.0
77529357,20240601,151.05799999999994,6.0,15730.0
77529357,20240601,45.59199999999999,3.0,6135.0
77529357,20240601,35.45600000000001,11.0,87680.0
77529357,20240601,49.04300000000005,7.0,160225.0
77529357,20240601,64.43400000000001,5.0,33165.0
77529357,20240601,134.80999999999997,10.0,96195.0
77529357,20240602,84.65200000000003,8.0,94200.0
77529357,20240602,40.207999999999984,17.0,91760.0


Databricks visualization. Run in Databricks to view.

In [0]:
df.filter(col("ESN")=="76733718").display()

ESN,ReportDate,X_AvgVehicleWeight,Trip,Delta_Dist
76733718,20240602,19.583999999999996,2.0,6375.0
76733718,20240603,174.22200000000004,4.0,267265.0
76733718,20240603,31.94800000000004,5.0,273985.0
76733718,20240603,30.39199999999998,6.0,237885.0
76733718,20240603,33.963999999999984,7.0,90250.0
76733718,20240603,33.964000000000034,9.0,99585.0
76733718,20240603,41.4340000000001,3.0,270985.0
76733718,20240603,33.964000000000006,8.0,12460.0
76733718,20240604,63.55199999999995,2.0,153820.0
76733718,20240606,53.14999999999998,8.0,177930.0


Databricks visualization. Run in Databricks to view.

In [0]:
df.createOrReplaceTempView("df_table")
oneside_2.createOrReplaceTempView("oneside2_table")

In [0]:
diff_esn = spark.sql("""
SELECT ESN
FROM df_table
WHERE ESN NOT IN (SELECT ESN FROM oneside2_table)
""")

In [0]:
diff_esn.show(100)

+--------+
|     ESN|
+--------+
|    1069|
|37018224|
|41331192|
|42341502|
|42509287|
|69831627|
|71123737|
|71123737|
|71123737|
|71123737|
|71124935|
|71126972|
|71126972|
|71126972|
|71127012|
|76021004|
|76021075|
|76021237|
|76021696|
|76023194|
|76023927|
|76024679|
|76024692|
|76024915|
|76025113|
|76025399|
|76025404|
|76025582|
|76025603|
|76026102|
|76026163|
|76026503|
|76027237|
|76027430|
|76028275|
|76028633|
|76028635|
|76029402|
|76029513|
|76029554|
|76029659|
|76029809|
|76030480|
|76030646|
|76030719|
|76031045|
|76031410|
|76032804|
|76032850|
|76033235|
|76033308|
|76033415|
|76033553|
|76033922|
|76033934|
|76034102|
|76034932|
|76035012|
|76035244|
|76035606|
|76035866|
|76035873|
|76037200|
|76037349|
|76038486|
|76038545|
|76038633|
|76038783|
|76039181|
|76039880|
|76040507|
|76040599|
|76041653|
|76041676|
|76041699|
|76041769|
|76042295|
|76042295|
|76042295|
|76042295|
|76042492|
|76042554|
|76042747|
|76042765|
|76043132|
|76043566|
|76043579|
|76043829|