In [None]:
# установка
!pip3 install pyspark geopy



In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, DoubleType
import math

In [None]:
# 1) Создаём SparkSession с включённым crossJoin (нужен для всех‑на‑всех комбинаций)
spark = (
    SparkSession.builder
    .appName("BikeAnalysis")
    .config("spark.sql.crossJoin.enabled", "true")
    .getOrCreate()
)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# 2) Читаем CSV с автоматическим инференсом типов
trips = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("/content/drive/MyDrive/data/BD_labs/LR1/trips.csv")
)
stations = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("/content/drive/MyDrive/data/BD_labs/LR1/stations.csv")
)

# 1. Велосипед с максимальным временем пробега

In [None]:
max_duration_per_bike = (
    trips
    .groupBy("bike_id")
    .agg(F.max("duration").alias("max_duration"))
    .orderBy(F.col("max_duration").desc())
)
print("Bike с наибольшим max duration:")
max_duration_per_bike.show(1)

Bike с наибольшим max duration:
+-------+------------+
|bike_id|max_duration|
+-------+------------+
|    535|    17270400|
+-------+------------+
only showing top 1 row



# 2. Наибольшее геодезическое/евклидово расстояние между станциями

In [None]:
# готовим UDF для евклида (если нужна геодезика — заменить внутри на geopy.distance.geodesic)
@F.udf(returnType=DoubleType())
def euclid(ax, ay, bx, by):
    return float(math.sqrt((ax - bx)**2 + (ay - by)**2))

a = stations.select(
    F.col("id").alias("A_id"),
    F.col("lat").alias("A_lat"),
    F.col("long").alias("A_long")
)
b = stations.select(
    F.col("id").alias("B_id"),
    F.col("lat").alias("B_lat"),
    F.col("long").alias("B_long")
)

# явно — crossJoin + фильтр A_id != B_id
distances = (
    a.crossJoin(b)
     .filter(F.col("A_id") != F.col("B_id"))
     .withColumn(
         "dist",
         euclid(
             F.col("A_lat"),
             F.col("A_long"),
             F.col("B_lat"),
             F.col("B_long")
         )
     )
)

print("Пара станций с макс. евклидовым расстоянием:")
distances.orderBy(F.col("dist").desc()).select("A_id","B_id","dist").limit(1).show()

Пара станций с макс. евклидовым расстоянием:
+----+----+------------------+
|A_id|B_id|              dist|
+----+----+------------------+
|  16|  60|0.7058482821754397|
+----+----+------------------+



# 3. Путь велосипеда с максимальным временем пробега через станции

In [None]:
# сначала находим bike_id с макс. duration (из пункта 1)
best_bike = max_duration_per_bike.first()["bike_id"]

# затем фильтруем и сортируем поездки этого велосипеда
route = (
    trips
    .filter(F.col("bike_id") == best_bike)
    .select("id", "start_station_id", "end_station_id")
    .orderBy(F.col("id").cast(IntegerType()))
)
print(f"Маршрут bike_id={best_bike}:")
route.show(route.count(), truncate=False)

Маршрут bike_id=535:
+------+----------------+--------------+
|id    |start_station_id|end_station_id|
+------+----------------+--------------+
|4966  |47              |70            |
|5067  |70              |69            |
|5179  |69              |77            |
|5199  |77              |64            |
|7806  |61              |42            |
|11422 |58              |72            |
|12245 |72              |47            |
|12485 |47              |60            |
|12558 |60              |46            |
|13107 |46              |77            |
|13423 |77              |77            |
|14380 |77              |62            |
|14581 |62              |61            |
|15231 |55              |61            |
|15242 |61              |60            |
|15347 |60              |41            |
|15605 |41              |50            |
|15611 |50              |41            |
|15770 |41              |70            |
|16294 |70              |74            |
|16409 |74              |61         

# 4. Количество велосипедов в системе

In [None]:
num_bikes = max_duration_per_bike.count()
print(f"Всего разных bike_id: {num_bikes}")

Всего разных bike_id: 700


# 5. Пользователи, потратившие на поездки ≥ 3 часов (10800 с)

In [None]:
long_riders = (
    trips
    .groupBy("zip_code")
    .agg(F.max("duration").alias("max_duration"))
    .filter(F.col("max_duration") >= 10800)
)
print("zip_code пользователей с поездками ≥3ч:")
long_riders.show()

zip_code пользователей с поездками ≥3ч:
+--------+------------+
|zip_code|max_duration|
+--------+------------+
|   94102|      464952|
|   95134|       82487|
|   84606|       14575|
|   80305|       74749|
|   60070|       26540|
|   91910|       20243|
|    2136|       16010|
|   11722|       12173|
|   94610|       76287|
|   94404|       63504|
|   80301|       36931|
|   94309|       18484|
|   97239|      193241|
|   94592|       26999|
|    7650|       20150|
|   92374|       17156|
|   11106|       13773|
|   93013|       25116|
|   30324|       17117|
|   16303|       13072|
+--------+------------+
only showing top 20 rows

