Подготовка

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Функция для преобразования колонок в тип double
def convert_columns_to_double(df, column_names):
    for column in column_names:
        df = df.withColumn(column, F.col(column).cast("double"))
    return df

# Чтение CSV-файлов
def load_csv_data(spark, file_path):
    try:
        return spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
    except Exception as e:
        print(f"Ошибка при чтении: {file_path}: {e}")
        return None

# Инициализация Spark-сессии
def init_spark_session(app_name):
    return SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .getOrCreate()

spark = init_spark_session("BikeAnalysis")

trips_df = load_csv_data(spark, "data/trips.csv")
stations_df = load_csv_data(spark, "data/stations.csv")

if trips_df is None or stations_df is None:
    print("При загрузке данных произошла ошибка!")
    spark.stop()
    exit()

# Приведение координат станций к типу double
stations_df = convert_columns_to_double(stations_df, ["lat", "long"])

stations_df.show(5)
trips_df.show(5)


+---+--------------------+------------------+-------------------+----------+--------+-----------------+
| id|                name|               lat|               long|dock_count|    city|installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|San Jose|         8/7/2013|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
only showing top 5 rows

+----+--------+---------------+--------

Задание 1. Найти велосипед с максимальным временем пробега.

In [14]:
from pyspark.sql.functions import unix_timestamp

try:
    trips_with_duration = trips_df.withColumn(
        "start_timestamp", unix_timestamp("start_date", "M/d/yyyy H:mm").cast("long")
    ).withColumn(
        "end_timestamp", unix_timestamp("end_date", "M/d/yyyy H:mm").cast("long")
    ).withColumn(
        "duration_minutes", (F.col("end_timestamp") - F.col("start_timestamp")) / 60
    )
except Exception as e:
    print(f"Ошибка при преобразовании дат: {e}. Убедитесь в корректности формата даты.")
    spark.stop()
    exit()

# Нахождение велосипеда с максимальным значением времени
bike_max_duration = trips_with_duration.groupBy("bike_id").agg(
    F.sum("duration_minutes").alias("total_duration_minutes")
).orderBy(F.col("total_duration_minutes").desc()).limit(1)

bike_max_duration.show()

+-------+----------------------+
|bike_id|total_duration_minutes|
+-------+----------------------+
|    535|              310262.0|
+-------+----------------------+



Задание 2. Найти наибольшее геодезическое расстояние между станциями.


In [16]:
from pyspark.sql.functions import max, radians, sin, cos, sqrt, atan2, col, unix_timestamp, sum as spark_sum

# Функция для вычисления геодезического расстояния между двумя точками
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0
    dlat = radians(lat2) - radians(lat1)
    dlon = radians(lon2) - radians(lon1)
    a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c

# Регистрируем функцию haversine
spark.udf.register("haversine", haversine)

# Поиск всех пар станций, для которых вычисляется расстояние
station_pairs = stations_df.alias("station1").crossJoin(stations_df.alias("station2")).filter(col("station1.id") < col("station2.id"))
# Добавляем колонку с расстоянием между станциями
station_pairs = station_pairs.withColumn("distance", haversine(col("station1.lat"), col("station1.long"), col("station2.lat"), col("station2.long")))

max_distance = station_pairs.agg(max("distance").alias("max_distance")).collect()[0]["max_distance"]

print(f"Максимальное геодезическое расстояние между станциями: {max_distance:.2f} км")

Максимальное геодезическое расстояние между станциями: 69.92 км


Задание 3. Найти путь велосипеда с максимальным временем пробега через станции.


In [17]:
try:
    trips_with_duration = trips_df.withColumn(
        "start_timestamp", unix_timestamp(col("start_date"), "M/d/yyyy H:mm").cast("long")
    ).withColumn(
        "end_timestamp", unix_timestamp(col("end_date"), "M/d/yyyy H:mm").cast("long")
    ).withColumn(
        "duration_minutes", (col("end_timestamp") - col("start_timestamp")) / 60
    )
except Exception as e:
    print(f"Ошибка при преобразовании дат: {e}. Убедитесь в корректности формата даты в файле trips.csv.")
    spark.stop()
    exit()

# Нахождение велосипеда с максимальным временем пробега
bike_max_duration = trips_with_duration.groupBy("bike_id").agg(
    spark_sum("duration_minutes").alias("total_duration_minutes")
).orderBy(col("total_duration_minutes").desc()).limit(1)

# Получаем bike_id с максимальным временем пробега
max_bike_duration = bike_max_duration.collect()
if not max_bike_duration:
    print("Не удалось найти велосипед с максимальным временем пробега.")
    spark.stop()
    exit()

bike_id_max = max_bike_duration[0]["bike_id"]

# Отбираем поездки для этого велосипеда и сортируем по времени начала
bike_trips = trips_with_duration.filter(col("bike_id") == bike_id_max) \
    .select("start_date", "start_station_name", "end_date", "end_station_name") \
    .orderBy("start_date")

bike_trips.show(truncate=False)

+---------------+---------------------------------------------+---------------+---------------------------------------------+
|start_date     |start_station_name                           |end_date       |end_station_name                             |
+---------------+---------------------------------------------+---------------+---------------------------------------------+
|1/1/2014 13:42 |Mechanics Plaza (Market at Battery)          |1/1/2014 14:36 |Embarcadero at Sansome                       |
|1/1/2014 18:51 |Embarcadero at Sansome                       |1/1/2014 19:13 |Market at 4th                                |
|1/1/2014 19:48 |Market at 4th                                |1/1/2014 20:01 |South Van Ness at Market                     |
|1/10/2014 20:13|Market at 10th                               |1/10/2014 20:17|Powell Street BART                           |
|1/10/2014 8:09 |Embarcadero at Folsom                        |1/10/2014 8:19 |San Francisco Caltrain (Townsend at 4th

Задание 4. Найти количество велосипедов в системе.

In [18]:
total_bikes = trips_df.select("bike_id").distinct().count()
print(f"Количество уникальных велосипедов в системе: {total_bikes}")

Количество уникальных велосипедов в системе: 700


Задание 5. Найти пользователей потративших на поездки более 3 часов.


In [19]:
filtered_users = trips_with_duration.filter(F.col("zip_code").isNotNull())
user_trip_time = filtered_users.groupBy("zip_code").agg(
    F.sum("duration_minutes").alias("total_trip_time_minutes")
)
active_users = user_trip_time.filter(F.col("total_trip_time_minutes") > 180)
active_users.select("zip_code", "total_trip_time_minutes").show()

+--------+-----------------------+
|zip_code|total_trip_time_minutes|
+--------+-----------------------+
|   94102|               318746.0|
|   95134|                12114.0|
|   84606|                 1583.0|
|   80305|                 3010.0|
|   60070|                  483.0|
|   95519|                  505.0|
|   43085|                  194.0|
|   91910|                  840.0|
|   77339|                  230.0|
|   48063|                  228.0|
|   85022|                  209.0|
|    1090|                  340.0|
|    2136|                  266.0|
|   11722|                  405.0|
|   95138|                 2583.0|
|   94610|                60490.0|
|   94404|                59813.0|
|   80301|                 2537.0|
|   91326|                 1096.0|
|   90742|                  183.0|
+--------+-----------------------+
only showing top 20 rows



In [20]:
spark.stop()
