## Импортирование необходимых модулей

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=6e8894405a108ba2622bbf192e8e7845f0111be7345e3ac049d9d84830f362cb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark import SparkContext, SparkConf

import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

## Инициализация сессии

In [4]:
spark = SparkSession \
    .builder \
    .appName('LR1') \
    .getOrCreate()

In [5]:
spark.version

'3.5.1'

## Загрузка данных

In [14]:
trip_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv('trips.csv')

print("Trips")
trip_data.printSchema()

stations_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv('stations.csv')

print("Stations")
stations_data.printSchema()

Trips
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

Stations
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



## Задание 1. Найти велосипед с максимальным временем пробега

In [17]:
# Группировка по id велосипеда и применение функции sum для подсчета времени пробега каждого велосипеда
max_trips_duration_per_bike = trip_data.groupBy("bike_id").agg(sum(col("duration")).alias("total_trips_duration"))

# Выбор велосипеда с максимальным пробегом
bike_with_max_trips_duration = max_trips_duration_per_bike.orderBy(col("total_trips_duration").desc()).first()

# Получение id велосипеда
bike_id_with_max_duration = bike_with_max_trips_duration["bike_id"]

# Получение значения пробега
total_duration = bike_with_max_trips_duration["total_trips_duration"]

print(f"Bike #{bike_id_with_max_duration} has total trips duration = {total_duration}")

Bike #535 has total trips duration = 18611693


## Задание 2. Найти наибольшее геодезическое расстояние между станциями

In [20]:
from math import sin, cos, sqrt, atan2, radians

def geodesic_distance(lat1, lon1, lat2, lon2):
    # Радиус Земли в километрах
    R = 6373.0

    # Конвертация в радианы
    lat1 = radians(lat1)
    lat2 = radians(lat2)
    lon1 = radians(lon1)
    lon2 = radians(lon2)

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    # Вычисление геодезического расстояния по формуле Хаверсина
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c

    return distance

# Конвертация функции в pyspark.sql.functions.udf (user-defined function)
geodesic_distance_udf = udf(geodesic_distance, DoubleType())

In [21]:
# Объединение датасета станций с самим собой для получения всех возможных пар
station_pairs = stations_data.alias("station1").crossJoin(stations_data.alias("station2"))

# Вычисление расстояния для каждой пары станций с помощью объявленной ранее функции
station_pairs_with_distance = station_pairs.withColumn(
    "geodesic_distance",
    geodesic_distance_udf(
        col("station1.lat"),
        col("station1.long"),
        col("station2.lat"),
        col("station2.long")
    )
)

# Поиск максимального геодезического расстояния среди всех расстояний для каждой пары станций
max_distance = station_pairs_with_distance.selectExpr("max(geodesic_distance) as max_distance").collect()[0]["max_distance"]

print(f"The maximum geodesic distance between stations equals to {max_distance} kilometers")

The maximum geodesic distance between stations equals to 69.9428256877473 kilometers


## Задание 3. Найти путь велосипеда с максимальным временем пробега через станции

In [22]:
# Сортировка по столбцу duration и выбор наиболее длительной поездки
trip_with_max_duration = trip_data.select("start_station_name", "end_station_name", "duration").orderBy(col("duration").desc()).first()

# Получение стартовой и конечной станций, а также времени поездки
start_location = trip_with_max_duration["start_station_name"]
end_location = trip_with_max_duration["end_station_name"]
trip_time = trip_with_max_duration["duration"]

print(f"The longest trip ({trip_time} seconds) is from \"{start_location}\" to \"{end_location}\"")

The longest trip (17270400 seconds) is from "South Van Ness at Market" to "2nd at Folsom"


## Задание 4. Найти количество велосипедов в системе

In [23]:
# Группировка по id велосипеда и подсчет уникальных значений id
unique_bikes_count = trip_data.agg(countDistinct("bike_id").alias("bike_count")).collect()[0]["bike_count"]

print(f"Total number of bikes is {unique_bikes_count}")

Total number of bikes is 700


## Задание 5. Найти пользователей потративших на поездки более 3 часов

In [24]:
# Группировка по id велосипеда и подсчет общего времени, проведенного в поездке
users_with_total_trip_time = trip_data.groupBy("bike_id").sum("duration").withColumnRenamed("sum(duration)", "total_time")
users_with_total_trip_time.filter("total_time>10800").show()

+-------+----------+
|bike_id|total_time|
+-------+----------+
|    471|   1718831|
|    496|   1679568|
|    148|    332138|
|    463|   1722796|
|    540|   1752835|
|    392|   1789476|
|    623|   2037219|
|    243|    307458|
|    516|   1896751|
|     31|    407907|
|    580|   1034382|
|    137|   1529200|
|    251|   1282980|
|    451|   1695574|
|     85|   1214769|
|    458|   1647080|
|     65|    216922|
|    588|    266415|
|    255|    396395|
|     53|    226389|
+-------+----------+
only showing top 20 rows

