## Импортирование необходимых модулей

In [1]:
from pyspark import SparkContext, SparkConf

import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

## Инициализация сессии

Для начала работы с PySpark необходима инициализация сессии

In [2]:
spark = SparkSession \
    .builder \
    .appName("L1_interactive_bike_analysis") \
    .getOrCreate()

In [15]:
spark.version

'3.1.2.0-eep-800'

## Загрузка данных

In [45]:
import os
# загрузка датасетов trips, stations
data_path = os.path.join(os.curdir, "data")
# data_path = os.path.join(os.getcwd(), "data")
trips_path = os.path.join("trips.csv")
stations_path = os.path.join("stations.csv")

trips.csv


In [48]:
# Копируем файлы из local file system в HDFS

# !hadoop fs -put trips.csv /user/glebilin6
# !hadoop fs -put stations.csv /user/glebilin6

put: `/user/glebilin6/stations.csv': File exists


In [49]:
# Проверяем наличие файла в нужной директории

# !hadoop fs -ls /user/glebilin6/trips.csv
# !hadoop fs -ls /user/glebilin6/stations.csv

-rwxr-xr-x   3 glebilin6 glebilin6       5647 2023-11-20 13:54 /user/glebilin6/stations.csv


In [50]:
trip_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv(trips_path)

print("Trips")
trip_data.printSchema()

stations_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv(stations_path)

print("Stations")
stations_data.printSchema()

Trips
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

Stations
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



## Задание 1. Найти велосипед с максимальным временем пробега

In [51]:
# Группировка по id велосипеда и применение функции sum для подсчета времени пробега каждого велосипеда
max_trips_duration_per_bike = trip_data.groupBy("bike_id").agg(sum(col("duration")).alias("total_trips_duration"))

# Выбор велосипеда с максимальным пробегом
bike_with_max_trips_duration = max_trips_duration_per_bike.orderBy(col("total_trips_duration").desc()).first()

# Получение id велосипеда
bike_id_with_max_duration = bike_with_max_trips_duration["bike_id"]

# Получение значения пробега
total_duration = bike_with_max_trips_duration["total_trips_duration"]

print(f"Велосипед #{bike_id_with_max_duration} с суммарным временем пробега  = {total_duration}")

Велосипед #535 с суммарным временем пробега  = 36229902


## Задание 2. Найти наибольшее геодезическое расстояние между станциями

In [54]:
# Для вычисления 
from math import sin, cos, sqrt, atan2, radians

def geodesic_distance(lat1, lon1, lat2, lon2):
    # Радиус Земли в километрах
    R = 6373.0
    
    # Конвертация в радианы
    lat1 = radians(lat1)
    lat2 = radians(lat2)
    lon1 = radians(lon1)
    lon2 = radians(lon2)

    dlon = lon2 - lon1
    dlat = lat2 - lat1
    
    # Вычисление геодезического расстояния по формуле Хаверсина
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c
    
    return distance

# Конвертация функции в pyspark.sql.functions.udf (user-defined function)
geodesic_distance_udf = udf(geodesic_distance, DoubleType())

In [55]:
# Объединение датасета станций с самим собой для получения всех возможных пар
station_pairs = stations_data.alias("station1").crossJoin(stations_data.alias("station2"))

# Вычисление расстояния для каждой пары станций с помощью объявленной ранее функции
station_pairs_with_distance = station_pairs.withColumn(
    "geodesic_distance",
    geodesic_distance_udf(
        col("station1.lat"),
        col("station1.long"),
        col("station2.lat"),
        col("station2.long")
    )
)

# Поиск максимального геодезического расстояния среди всех расстояний для каждой пары станций
max_distance = station_pairs_with_distance.selectExpr("max(geodesic_distance) as max_distance").collect()[0]["max_distance"]

print(f"Максимальное геодезическое расстояние между станциями равно {max_distance} километрам")

Максимальное геодезическое расстояние между станциями равно 69.9428256877473 километрам


## Задание 3. Найти путь велосипеда с максимальным временем пробега через станции

In [11]:
# Сортировка по столбцу duration и выбор наиболее длительной поездки
trip_with_max_duration = trip_data.select("start_station_name", "end_station_name", "duration").orderBy(col("duration").desc()).first()

# Получение стартовой и конечной станций, а также времени поездки
start_location = trip_with_max_duration["start_station_name"]
end_location = trip_with_max_duration["end_station_name"]
trip_time = trip_with_max_duration["duration"]

print(f"Самая длинная поездка ({trip_time} секунд)  from \"{start_location}\" to \"{end_location}\"")

The longest trip (17270400 seconds) is from "South Van Ness at Market" to "2nd at Folsom"


## Задание 4. Найти количество велосипедов в системе

In [12]:
# Группировка по id велосипеда и подсчет уникальных значений id
unique_bikes_count = trip_data.agg(countDistinct("bike_id").alias("bike_count")).collect()[0]["bike_count"]

print(f"Total number of bikes is {unique_bikes_count}")

Total number of bikes is 700


## Задание 5. Найти пользователей потративших на поездки более 3 часов

In [13]:
# Группировка по id велосипеда и подсчет общего времени, проведенного в поездке
users_with_total_trip_time = trip_data.groupBy("bike_id").sum("duration").withColumnRenamed("sum(duration)", "total_time")
users_with_total_trip_time.filter("total_time>10800").show()

+-------+----------+
|bike_id|total_time|
+-------+----------+
|    471|   2504669|
|    496|   2492338|
|    148|    383620|
|    463|   2631994|
|    540|   2499857|
|    392|   2664633|
|    623|   2988761|
|    243|    412598|
|    516|   2760799|
|     31|    587904|
|    580|   1072279|
|    137|   2238350|
|    251|   1660026|
|    451|   2461768|
|     85|   2287621|
|    458|   2286523|
|     65|    327963|
|    588|    266415|
|    255|    477850|
|     53|    338860|
+-------+----------+
only showing top 20 rows

