In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import udf
from pyspark.sql import functions as func
from pyspark.sql.types import DoubleType

from typing import NamedTuple
from datetime import datetime
import numpy as np

# Инициализируем сессию

In [None]:
spark = SparkSession.builder.master("local[*]").appName("LR1").getOrCreate()

trip_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("trips.csv")

trip_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [None]:
station_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("stations.csv")

station_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



In [None]:
trip_data.show(n=10)

+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|         start_date|  start_station_name|start_station_id|           end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|               NULL|South Van Ness at...|              66|2013-08-29 14:14:00|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|    NULL|2013-08-29 14:42:00|  San Jose City Hall|              10|2013-08-29 14:43:00|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|2013-08-29 10:16:00|Mountain View Cit...|              27|2013-08-29 10:17:00|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|    

In [None]:
station_data.show(n=10)

+---+--------------------+------------------+-------------------+----------+--------+-----------------+
| id|                name|               lat|               long|dock_count|    city|installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|San Jose|         8/7/2013|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|        15|San Jose|         8/7/2013|
|  8| San Salvador at 1st|         37.330165|-121.88583100000001

1. Найти велосипед с максимальным временем пробега.

In [None]:
# группируем по id, считаем сумму времени через функцию sum и сортируем по убывынию
top_bike_stats = trip_data.groupBy("bike_id") \
                         .agg({"duration": "sum"}) \
                         .orderBy("sum(duration)", ascending=False) \
                         .limit(1).collect()[0]

# Форматируем вывод результатов анализа
result = (
    f"Велосипед {top_bike_stats['bike_id']} "
    f"имеет наибольший пробег - {top_bike_stats['sum(duration)']} времени "
)
result

Велосипед с идентификатором 535 использовался дольше всего - общее время поездок составило 18611693 единиц времени


2. Найти наибольшее геодезическое расстояние между станциями

In [None]:
!pip install haversine
from haversine import haversine



In [None]:
# Вычисление максимального расстояния между станциями с использованием формулы гаверсинуса

def calculate_geodesic_distance(lat1: float, lon1: float,
                              lat2: float, lon2: float) -> float:
    """Вычисляет расстояние между двумя точками на сфере
    по формуле гаверсинуса"""
    return haversine((lat1, lon1), (lat2, lon2))

distance_udf = func.udf(calculate_geodesic_distance)
stations = station_data.select(
    func.col("id").alias("station_id"),
    "lat",
    "long"
)

# все возможные пары станиций
all_station_pairs = stations.crossJoin(
    stations.select(
        func.col("station_id").alias("paired_station_id"),
        func.col("lat").alias("paired_lat"),
        func.col("long").alias("paired_long")
    )
)
unique_pairs = all_station_pairs.filter("station_id != paired_station_id")

# Вычисляем расстояния
station_distances = unique_pairs.withColumn(
    "distance_km",
    distance_udf("lat", "long", "paired_lat", "paired_long")
)

# Максимальное расстояние
max_distance_pair = station_distances.orderBy(
    func.col("distance_km").desc()
).limit(1).collect()[0]

result = (
    f"Максимальное расстояние между станциями: {float(max_distance_pair['distance_km']):.3f} км\n"
    f"Между станциями {max_distance_pair['station_id']}) и {max_distance_pair['paired_station_id']})"
)

result

Максимальное расстояние между станциями: 9.663183643537442 км
Станция A (ID: 24) и Станция B (ID: 36)


3. Найти путь велосипеда с максимальным временем пробега через станции

In [None]:
bike_max_trip = trip_data.sort("duration", ascending=False).limit(1).collect()[0]
result = (
    f"Максимальный путь у велосипеда {bike_max_trip['bike_id']} и составляет {bike_max_trip['duration']}"
    f"Из станции {bike_max_trip['start_station_name']} до станции {bike_max_trip['end_station_name']}"
)
result

Максимальный путь велосипеда id=535 из станции South Van Ness at Market на станцию 2nd at Folsom (17270400)


4. Найти количество велосипедов в системе.

In [None]:
# счёт уникальных значений по bike_id
bikes_count = trip_data.select("bike_id").distinct().count()
print(f"Количество велосипедов в системе: {bikes_count}")

Количество велосипедов в системе: 700


5. Найти пользователей потративших на поездки более 3 часов

In [None]:
# группировка по zip_code, применение агрегатной функции sum, переименование полученного столбца.
users = trip_data.groupBy("zip_code").agg({"duration": "sum"}).withColumnRenamed("sum(duration)", "total_duration")
# оставляем строки с total_duration > 3 часов (3ч * 60 мин * 60 сек)
long_term_users = users.filter(users.total_duration > (3 * 60 * 60))
print("Пользователи, которые потратили на поездки более 3 часов:")
long_term_users.show()

Пользователи, которые потратили на поездки более 3 часов:
+--------+--------------+
|zip_code|total_duration|
+--------+--------------+
|   94102|      19128021|
|   95134|        728023|
|   84606|         95145|
|   80305|        180906|
|   60070|         28919|
|   95519|         30303|
|   43085|         11670|
|   91910|         50488|
|   77339|         13713|
|   48063|         13755|
|   85022|         12682|
|    1090|         20391|
|    2136|         16010|
|   11722|         24331|
|   95138|        155295|
|   94610|       3630628|
|   94404|       3589350|
|   80301|        152189|
|   91326|         65885|
|   90742|         10965|
+--------+--------------+
only showing top 20 rows



In [None]:
spark.stop()