# Imports

In [1]:
!pip3 install pyspark==3.5.0



In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as t

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from geopy.distance import geodesic
from math import sqrt

In [3]:
spark = SparkSession.builder.getOrCreate()
spark

# Задачи для данных велопарковок Сан-Франциско

## 1. Найти велосипед с максимальным временем пробега.

In [5]:
trips = spark.read.format('csv').option('header', 'true').load("trip.csv")

In [24]:
# Группировка по 'bike_id' и вычисление максимальной длительности одной поездки
max_duration_per_bike = (
    trips
    .filter(F.col("duration").isNotNull())  # Исключаем null в duration
    .groupBy('bike_id')
    .agg(
        F.sum(F.col("duration").cast(t.IntegerType())).alias("duration")
    )
)

# Нахождение велосипеда с самой длинной поездкой
top_longest_trip = max_duration_per_bike.orderBy(F.col('duration').desc())
top_longest_trip.show(1)

+-------+--------+
|bike_id|duration|
+-------+--------+
|    168|  613841|
+-------+--------+
only showing top 1 row



## 2. Найти наибольшее геодезическое расстояние между станциями.

In [13]:
stations = spark.read.format('csv').option('header', 'true').load("station.csv")

In [16]:
# Выбор столбцов 'id', 'lat' и 'long' из DataFrame stations
stations_data = stations.select("id", "lat", "long")
stations_data.show(5)

# Создание комбинации станций с фильтрацией на уровне DataFrame
combo = stations_data.selectExpr('id as A', 'lat as A_lat', 'long as A_long') \
    .crossJoin(stations_data.selectExpr('id as B', 'lat as B_lat', 'long as B_long')) \
    .filter(col('A') != col('B'))

# Функция для вычисления геодезического расстояния
def geodesic_distance(a_lat, a_long, b_lat, b_long):
    try:
        return geodesic((float(a_lat), float(a_long)), (float(b_lat), float(b_long))).km
    except ValueError:
        return None  # Возвращаем None, если координаты некорректны

# Применение функции к каждой паре станций и сохранение результатов в RDD
dists = combo.rdd.map(lambda row: (row.A, row.B, geodesic_distance(row.A_lat, row.A_long, row.B_lat, row.B_long)))

# Нахождение максимального расстояния между станциями
max_distance = dists.max(lambda row: row[2] if row[2] is not None else float('-inf'))

# Вывод результата
print(f"Станции с максимальным расстоянием: {max_distance[0]} и {max_distance[1]}, расстояние: {max_distance[2]} км")

+---+------------------+-------------------+
| id|               lat|               long|
+---+------------------+-------------------+
|  2|         37.329732|-121.90178200000001|
|  3|         37.330698|        -121.888979|
|  4|         37.333988|        -121.894902|
|  5|         37.331415|          -121.8932|
|  6|37.336721000000004|        -121.894074|
+---+------------------+-------------------+
only showing top 5 rows

Станции с максимальным расстоянием: 16 и 60, расстояние: 69.92096757764355 км


## 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [17]:
# Выбор нужных столбцов, включая 'duration', фильтрация по bike_id = 535 и поиск максимальной длительности
max_trip = (
    trips.select("id", "bike_id", "start_station_id", "end_station_id", "duration")
    .filter(F.col("bike_id") == 535)
    .orderBy(F.col("duration").cast(t.IntegerType()).desc())  # Сортировка по длительности
    .limit(1)  # Берем только одну запись с максимальной длительностью
)

# Вывод результата
max_trip.show()
print("Путь велосипеда с максимальным временем пробега:")
print(f"ID поездки: {max_trip.first()['id']}, Начальная станция: {max_trip.first()['start_station_id']}, Конечная станция: {max_trip.first()['end_station_id']}, Длительность: {max_trip.first()['duration']} секунд")

+-----+-------+----------------+--------------+--------+
|   id|bike_id|start_station_id|end_station_id|duration|
+-----+-------+----------------+--------------+--------+
|21344|    535|              62|            75|   25179|
+-----+-------+----------------+--------------+--------+

Путь велосипеда с максимальным временем пробега:
ID поездки: 21344, Начальная станция: 62, Конечная станция: 75, Длительность: 25179 секунд


## 4. Найти количество велосипедов в системе.

In [None]:
count_bikes = max_duration_per_bike.count()

count_bikes

700

## 5. Найти пользователей потративших на поездки более 3 часов.

In [18]:
# Группировка по 'zip_code' и вычисление суммарной продолжительности поездок
output_filtered = (
    trips
    .groupBy('zip_code')
    .agg(
        F.sum(F.col("duration").cast(t.IntegerType())).alias("total_duration")  # Сумма вместо максимума
    )
    .filter(F.col("total_duration") >= 10800)  # Фильтрация по суммарному времени ≥ 3 часов
)

# Подсчёт количества пользователей и вывод результата
num_users = output_filtered.count()
output_filtered.show(truncate=False)

print(f"Найдено {num_users} пользователей, потративших на поездки более 3 часов")

+--------+--------------+
|zip_code|total_duration|
+--------+--------------+
|94102   |1341919       |
|95134   |11779         |
|80305   |17577         |
|95138   |14999         |
|94610   |139809        |
|94404   |78552         |
|80301   |22735         |
|91326   |13038         |
|90742   |10965         |
|94568   |23675         |
|94015   |80531         |
|28034   |13820         |
|95130   |15006         |
|53714   |36460         |
|85251   |14117         |
|94550   |44488         |
|94107   |2381768       |
|76039   |70689         |
|2144    |23143         |
|90802   |22931         |
+--------+--------------+
only showing top 20 rows

Найдено 452 пользователей, потративших на поездки более 3 часов
