In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as t
from geopy.distance import geodesic
from math import sqrt
import pandas as pd

In [None]:
spark = SparkSession.builder.getOrCreate()
spark

In [None]:
trips = spark.read.format('csv').option('header', 'true').load("/content/trips.csv")
print("Trips", trips)


Trips DataFrame[id: string, duration: string, start_date: string, start_station_name: string, start_station_id: string, end_date: string, end_station_name: string, end_station_id: string, bike_id: string, subscription_type: string, zip_code: string]


In [None]:
stations = spark.read.format('csv').option('header', 'true').load("/content/stations.csv")
print("Stations", stations)

Stations DataFrame[id: string, name: string, lat: string, long: string, dock_count: string, city: string, installation_date: string]


## **№1  Найти велосипед с максимальным временем пробега.**

In [None]:
#чтобы найти велосипед с самым большим временем поездки на нем
max_trip = (
    trips.groupBy('bike_id')  #группируем по идентификатору велика
    .agg(F.max(F.col("duration").cast(t.IntegerType())).alias("duration"))  #ищем масимальное время поездки для каждого велосипеда
    .orderBy(F.col('duration').desc())  #сортировка по убыванию
    .limit(1)  #выводим первую строку, которая содержит самое большое время
)

max_trip.show()

+-------+--------+
|bike_id|duration|
+-------+--------+
|    535|17270400|
+-------+--------+



## **№2  Найти наибольшее геодезическое расстояние между станциями.**

In [None]:

#выбираем нужные столбцы из stations
stations_data = stations.select("id", "lat", "long")
stations_data.show(10)
#получаем все возможные пары станций с фильтром, чтоб не было пар самим с собой
combo = stations_data.alias("A").crossJoin(stations_data.alias("B")).filter("A.id != B.id")

distance_dif = combo.withColumn(#вычисляем расстояние между парами станций
    "distance",
    F.sqrt(
        F.pow(F.col("A.lat") - F.col("B.lat"), 2) +
        F.pow(F.col("A.long") - F.col("B.long"), 2)
    )
)
#сортируем по убыванию и оставляем только первую строку
max_distance = distance_dif.orderBy(F.desc("distance")).select("A.id", "B.id", "distance").first()
print(max_distance)


+---+------------------+-------------------+
| id|               lat|               long|
+---+------------------+-------------------+
|  2|         37.329732|-121.90178200000001|
|  3|         37.330698|        -121.888979|
|  4|         37.333988|        -121.894902|
|  5|         37.331415|          -121.8932|
|  6|37.336721000000004|        -121.894074|
|  7|         37.333798|-121.88694299999999|
|  8|         37.330165|-121.88583100000001|
|  9|         37.348742|-121.89471499999999|
| 10|         37.337391|        -121.886995|
| 11|         37.335885|-121.88566000000002|
+---+------------------+-------------------+
only showing top 10 rows

Row(id='16', id='60', distance=0.7058482821754397)


## **№3  Найти путь велосипеда с максимальным временем пробега через станции.**

In [None]:
bike_longest_trip = max_trip.select("bike_id").first()[0]#используя первое задание, находим индетификатор велика с макс.временем пробега

# Фильтруем поездки для найденного велосипеда
filtered_trips = (
    trips.filter(F.col("bike_id") == bike_longest_trip)#фильтруем по найденному id
    .select("id", "bike_id", "start_station_id", "end_station_id")
    .orderBy(F.col("id").cast(t.IntegerType()))
)

# Выводим данные
filtered_trips.show()

+-----+-------+----------------+--------------+
|   id|bike_id|start_station_id|end_station_id|
+-----+-------+----------------+--------------+
| 4966|    535|              47|            70|
| 5067|    535|              70|            69|
| 5179|    535|              69|            77|
| 5199|    535|              77|            64|
| 7806|    535|              61|            42|
|11422|    535|              58|            72|
|12245|    535|              72|            47|
|12485|    535|              47|            60|
|12558|    535|              60|            46|
|13107|    535|              46|            77|
|13423|    535|              77|            77|
|14380|    535|              77|            62|
|14581|    535|              62|            61|
|15231|    535|              55|            61|
|15242|    535|              61|            60|
|15347|    535|              60|            41|
|15605|    535|              41|            50|
|15611|    535|              50|        

## **№4  Найти количество велосипедов в системе.**

In [None]:
#считаем количество уникальных велосипедов по id
bikes_count = trips.select("bike_id").distinct().count()

print(f"Количество байков в системе: {bikes_count}")


Количество байков в системе: 700


## **№5  Найти пользователей потративших на поездки более 3 часов.**

In [None]:
#фильтруем поездки, продолжительность которых >= 10800 секунд (3 часа)
long_trips = trips.filter(F.col("duration").cast(t.IntegerType()) >= 10800)
print(f"Количество поездок, длительность которых больше 3 часов: {long_trips.count()}")

output_filtered = long_trips.groupBy("zip_code").agg(
    F.max(F.col("duration").cast(t.IntegerType())).alias("max_duration")  #находим максимальную длительность для каждой группы
)

# Выводим результат
output_filtered.show()


Количество поездок, длительность которых больше 3 часов: 8323
+--------+------------+
|zip_code|max_duration|
+--------+------------+
|   94102|      464952|
|   60070|       26540|
|   95134|       82487|
|   91910|       20243|
|   84606|       14575|
|   80305|       74749|
|    2136|       16010|
|   11722|       12173|
|   94610|       76287|
|   94309|       18484|
|   94404|       63504|
|    7650|       20150|
|   11106|       13773|
|   93013|       25116|
|   16303|       13072|
|   94015|      103760|
|   60661|       24042|
|    4665|       16342|
|   94568|      295512|
|   94079|       33057|
+--------+------------+
only showing top 20 rows

