In [71]:
from pyspark.sql import SparkSession


In [72]:
spark = SparkSession \
    .builder \
    .appName("Taxi Data App") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [73]:
# 读取 CSV.BZ2 文件
df = spark.read.csv("./taxi-data-sorted-small.csv.bz2", header=False, inferSchema=True)

# 显示前几行
df.show()

# 打印数据的 schema
df.printSchema()

+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+
|                 _c0|                 _c1|                _c2|                _c3|_c4| _c5|       _c6|      _c7|       _c8|      _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|
+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+
|07290D3599E7A0D62...|E7750A37CAB07D0DF...|2013-01-01 00:00:00|2013-01-01 00:02:00|120|0.44|-73.956528|40.716976| -73.96244|40.715008| CSH| 3.5| 0.5| 0.5| 0.0| 0.0| 4.5|
|22D70BF00EEB0ADC8...|3FF2709163DE7036F...|2013-01-01 00:02:00|2013-01-01 00:02:00|  0| 0.0|       0.0|      0.0|       0.0|      0.0| CSH|27.0| 0.0| 0.5| 0.0| 0.0|27.5|
|0EC22AAF491A8BD91...|778C92B26AE78A9EB...|2013-01-01 00:01:00|2013-01-01 00:03:00|120|0.71|-73.973145|40.752827|-73.965897|40.760445| CSH| 4.0| 0.5| 

In [74]:
from pyspark.sql.functions import col

In [75]:

filtered_df = df.filter(
    (col("_c5").cast("float").isNotNull()) & # 第 6 列可以转换为 float
    (col("_c11").cast("float").isNotNull()) &  # 第 12 列可以转换为 float
    (col("_c5") != 0) & # 第 6 和 12 列不为 0
    (col("_c11") != 0)
)

if len(df.columns) != 17:
    raise ValueError("The number of columns is not 17!")
# 显示过滤后的结果
filtered_df.show()

+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+
|                 _c0|                 _c1|                _c2|                _c3|_c4| _c5|       _c6|      _c7|       _c8|      _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|
+--------------------+--------------------+-------------------+-------------------+---+----+----------+---------+----------+---------+----+----+----+----+----+----+----+
|07290D3599E7A0D62...|E7750A37CAB07D0DF...|2013-01-01 00:00:00|2013-01-01 00:02:00|120|0.44|-73.956528|40.716976| -73.96244|40.715008| CSH| 3.5| 0.5| 0.5| 0.0| 0.0| 4.5|
|0EC22AAF491A8BD91...|778C92B26AE78A9EB...|2013-01-01 00:01:00|2013-01-01 00:03:00|120|0.71|-73.973145|40.752827|-73.965897|40.760445| CSH| 4.0| 0.5| 0.5| 0.0| 0.0| 5.0|
|1390FB380189DF6BB...|BE317B986700F63C4...|2013-01-01 00:01:00|2013-01-01 00:03:00|120|0.48|-74.004173|40.720947|-74.003838|40.726189| CSH| 4.0| 0.5| 

In [76]:
taxi = filtered_df.withColumnRenamed("_c0", "medallion") \
           .withColumnRenamed("_c1", "hack_license")\
            .withColumnRenamed("_c2", "pickup datetime")



In [77]:
print(taxi.columns)

['medallion', 'hack_license', 'pickup datetime', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13', '_c14', '_c15', '_c16']


The top ten taxis that have had the largest number of drivers

In [78]:
from pyspark.sql import functions as f
medallion_driver_sum = taxi.groupBy("medallion").agg(f.countDistinct("hack_license").alias("DriverNum"))
result = medallion_driver_sum.orderBy("DriverNum",ascending = False).limit(10).show()

+--------------------+---------+
|           medallion|DriverNum|
+--------------------+---------+
|3C08296D0EB7ABE24...|       20|
|65EFB7D02BAD12D5D...|       20|
|55D311AD2752BC278...|       19|
|F36564AB9C6EA3B63...|       19|
|7DEB25123AE57111F...|       19|
|CD7B02776E6948339...|       19|
|3B6AE3CF05F34ADC9...|       19|
|9FB7A7C1D7B960D8B...|       18|
|F2A08960199BCDB7E...|       18|
|799153A138F4E8334...|       18|
+--------------------+---------+



Who the top 10 best drivers are in terms of their average earned money per
minute spent carrying a customer.

The total amount field is the total money earned on a trip. In the end, we
are interested in computing a set of (driver, money per minute) pairs.

In [79]:
from pyspark.sql.types import StringType,DoubleType

In [80]:
taxi.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- pickup datetime: timestamp (nullable = true)
 |-- _c3: timestamp (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)



In [81]:
# 确保列 _c5 和 _c16 是 DoubleType
taxi = taxi.withColumn("_c5", f.col("_c5").cast(DoubleType())) \
           .withColumn("_c16", f.col("_c16").cast(DoubleType()))

In [100]:
# 避免除以零，先过滤掉 _c5 为零的行
taxi_filtered = taxi.filter(f.col("_c5") > 0)

In [101]:
taxi_new = taxi.withColumn('money_per_minute', (taxi['_c16'] / taxi['_c5'] / 60) )


In [66]:
# result = taxi_new.groupby("hack_license").mean("money_per_minute").orderBy("avg(money_per_minute)",ascending = False).limit(10)
# result = result.withColumnRenamed("avg(money_per_minute)","money_per_minute").show()

In [84]:

result = (
    taxi_filtered.groupBy("hack_license")
    .agg(
        f.sum("_c16").alias("total_money"),     # 计算总收入
        f.sum("_c5").alias("total_time")        # 计算总时间（以秒为单位）
    )
    .withColumn(
        "money_per_minute",
        f.col("total_money") / (f.col("total_time") / 60)  # 计算每分钟平均收入
    )
    .orderBy(f.col("money_per_minute").desc())  # 按每分钟收入降序排序
    .limit(10)                                  # 取前 10 名
)

In [68]:
result.show()

+--------------------+-----------+----------+-----------------+
|        hack_license|total_money|total_time| money_per_minute|
+--------------------+-----------+----------+-----------------+
|5FC9AEBF0871275E8...|       18.5|      0.01|         111000.0|
|559EDA2B1D3A352F1...|      240.6|       0.3|          48120.0|
|32187D24B8C6D9DC9...|      68.76|       0.1|          41256.0|
|6789302E98F439768...|        3.5|      0.01|          21000.0|
|CD9D0B4429613F1B6...|      99.81|       0.5|          11977.2|
|EE643DC164319505E...|       26.0|       0.2|7799.999999999999|
|7BD4876222813A92A...|        3.0|      0.03|           6000.0|
|90E3B38745F6A542B...|        3.0|      0.03|           6000.0|
|011AE79C7E6093780...|       53.5|       0.6|           5350.0|
|52C99F4F8CD2560F8...|       62.5|       0.9|4166.666666666666|
+--------------------+-----------+----------+-----------------+



In [69]:
# spark.stop()

有两种口径。
1. 单次行程平均值
- 想了解司机在单次行程中的赚钱效率，不关注行程长短。
- 数据集中的行程时间差异较小，短行程不会对结果造成显著偏差。
2. 整体平均值
- 想衡量司机的整体赚钱效率。
- 数据集中行程时间差异较大，需要考虑时间权重对结果的影响。

We would like to know which hour of the day is the best time for drivers that has the highest profit per mile.

Consider the surcharge amount in dollar for each taxi ride (without tip amount) and the distance in miles,
and sum up the rides for each hour of the day (24 hours) – consider the pickup time for your calculation.

The profit ratio is the ration surcharge in dollars divided by the travel distance in miles for each specific time
of the day.

Profit Ratio = (Surcharge Amount in US Dollar) / (Travel Distance in miles)

We are interested to know the time of the day that has the highest profit ratio.

In [96]:
taxi_filtered.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- pickup datetime: timestamp (nullable = true)
 |-- _c3: timestamp (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)



In [102]:
from pyspark.sql.functions import col, hour
taxi_filtered.withColumn("pickup hour", hour(col("pickup datetime")))\
    .withColumn("profit ratio", f.col("_c15") / f.col("_c4"))\
    .groupBy("pickup hour")\
    .agg(f.mean("profit ratio").alias("avg profit ratio"))\
    .orderBy("avg profit ratio", ascending=False)\
    .show()

+-----------+--------------------+
|pickup hour|    avg profit ratio|
+-----------+--------------------+
|          5|4.795428164858122...|
|          4|2.903061317980278...|
|          6|2.794922816611673E-4|
|         14|2.111450421139239...|
|         15|2.084174989212556E-4|
|         16|2.026969124673807E-4|
|         13|1.970843982333202...|
|         10|1.727910557850571E-4|
|         11|1.727903025158066...|
|          7|1.702610234007025...|
|         12|1.699506381213275E-4|
|         23|1.637022002363445E-4|
|         22|1.598107100871362...|
|         17|1.576998398104434...|
|          9|1.503583860894163...|
|         21|1.493756205712496E-4|
|         20|1.369666333453539...|
|          8|1.286189144887038E-4|
|          0|1.258692585771544...|
|         18|1.205037174676841...|
+-----------+--------------------+
only showing top 20 rows



How many percent of taxi customers pay with cash and how many percent using electronic cards?
Analyze these payment methods for different time of the day and provide a list of percents for each
day time? As a result provide two numbers for total percentages and a list like (hour of day, percent
paid card)
- We would like to measure the efficiency of taxis drivers by finding out their average earned money per
mile. (Consider the total amount which includes tips, as their earned money) Implement a Spark job
that can find out the top-10 efficient taxi divers.
- What are mean, median, first and third quantiles of tip amount? How do find the median?
- Using the IQR outlier detection method find out the top-10 outliers.

In [129]:
taxi_filtered = taxi_filtered.withColumn("pickup_hour", f.hour(f.col("pickup datetime")))
# 统计每小时现金和卡支付的数量
payment_count = taxi_filtered.groupBy("pickup_hour", "_c10").count() # _c10 payment type

# 计算每小时的支付百分比
# 每小时所有的支付数量
total_count_per_hour = payment_count.groupBy("pickup_hour").agg(f.sum("count").alias("total_rides"))

# 百分比：每小时现金和卡支付的数量/每小时所有的支付数量
# 都是group by pickup hour，两个表join用于计算百分比.此处的count列来自与payment_count计算，见下schema
payment_percentage = payment_count.join(total_count_per_hour, "pickup_hour") \
     .withColumn("percent", (f.col("count") / f.col("total_rides")) * 100) \
     .select("pickup_hour", "_c10", "percent")\
    .orderBy(f.col("pickup_hour"), f.col("percent"), ascending=[True, False]).show()

+-----------+----+--------------------+
|pickup_hour|_c10|             percent|
+-----------+----+--------------------+
|          0| CSH|   50.33522434244456|
|          0| CRD|   49.63595546521858|
|          0| UNK| 0.02882019233686254|
|          1| CSH|   51.61522655538373|
|          1| CRD|   48.34028851653356|
|          1| UNK|  0.0444849280826996|
|          2| CSH|   52.43464352203896|
|          2| CRD|  47.547001599496554|
|          2| UNK| 0.01835487846448331|
|          3| CSH|   53.52174352410146|
|          3| CRD|   46.44457169804965|
|          3| UNK| 0.03368477784889009|
|          4| CSH|  57.435653002859866|
|          4| CRD|   42.55134760377849|
|          4| UNK|0.012999393361643121|
|          5| CSH|   52.41218481270952|
|          5| CRD|    47.5295146480105|
|          5| UNK|0.058300539279988337|
|          6| CRD|   51.16050538137576|
|          6| CSH|   48.78334113242864|
+-----------+----+--------------------+
only showing top 20 rows



In [125]:
payment_count.printSchema()

root
 |-- pickup_hour: integer (nullable = true)
 |-- _c10: string (nullable = true)
 |-- count: long (nullable = false)



1. orderBy() 中的列名
在 PySpark 中，orderBy() 可以接受 列名字符串 作为参数，尤其在你对 DataFrame 中的原始列进行排序时，直接使用列名字符串是有效的。也就是说，在 orderBy() 中，如果你要根据原始的列名进行排序，PySpark 会自动将这些字符串列名转换为列对象。

`payment_percentage.orderBy("pickup_hour", "_c10", "percent").show()`

2. 什么时候需要 f.col()？
f.col() 是一个显式的列引用方法，通常用于以下几种情况：

处理计算生成的列（如 withColumn() 中生成的列）：当你在 DataFrame 中动态创建或计算新列时，可能会使用 f.col() 来显式引用这些列。

使用表达式：比如需要进行更复杂的操作时，或者引用不直接存在于原始 DataFrame 中的列时，f.col() 会显得非常有用。

例如，你可能会在 withColumn() 中生成一个新的列，如：

`payment_percentage.withColumn("new_col", f.col("pickup_hour") * 2)`

3. 为什么直接使用列名有效？

在 groupBy() 和 agg() 等操作后，PySpark 会将你定义的聚合列（例如通过 alias() 创建的列）视作新的 DataFrame 列，这些列名可以直接在 orderBy() 中作为字符串使用。

```python
taxi_filtered.withColumn("pickup hour", hour(col("pickup datetime")))\
    .withColumn("profit ratio", f.col("_c15") / f.col("_c4"))\
    .groupBy("pickup hour")\
    .agg(f.mean("profit ratio").alias("avg profit ratio"))\
    .orderBy("avg profit ratio", ascending=False)\
    .show()
```
在这段代码中，avg profit ratio 是通过 alias() 创建的新的列名，它直接作为排序依据使用。

这是因为 agg() 操作后，avg profit ratio 作为新的列名已经在 DataFrame 中存在，因此你可以直接使用它作为列名来排序。

总结：

直接使用列名字符串（如 orderBy("pickup_hour")）在原始列或聚合操作后创建的列中是有效的，因为 PySpark 会自动处理列名转换。

f.col() 更常用于动态计算的列（如在 withColumn() 中生成的列），或者当你需要在表达式中引用复杂的列时。


In [130]:
# http://localhost:4042/jobs/
# taxi_filtered = taxi_filtered.cache()  # 缓存 同一个 DataFrame 或 RDD 会被多次重复使用时
spark.stop()