In [71]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz

^C


In [72]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [73]:
!pip install -q findspark
import findspark
findspark.init()

Подготовка данных

In [74]:
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct
from pyspark.sql import functions as func


In [75]:
import os
data_path = os.path.join(os.curdir, "data")
trips_path = os.path.join("trips.csv")
stations_path = os.path.join("stations.csv")

In [76]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("LR1")\
        .getOrCreate()

In [77]:
trip_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("trips.csv")

trip_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [78]:
station_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("stations.csv")

station_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



#1. Найти велосипед с максимальным временем пробега

In [79]:
#представляем как временную таблицу, чтобы работать с sql запросами
trip_data.createOrReplaceTempView("trips")

result = spark.sql("""
    SELECT bike_id, SUM(duration) AS total_duration
    FROM trips
    GROUP BY bike_id
    ORDER BY total_duration DESC
    LIMIT 1
""")

# Получаем результат
bike_max_duration = result.collect()[0]
print(f"Велосипед {bike_max_duration['bike_id']}. Максимальное время пробега = {bike_max_duration['total_duration']}")

Велосипед 535. Максимальное время пробега = 18611693


#2.Найти наибольшее геодезическое расстояние между станциями

In [80]:
from math import sin, cos, sqrt, atan2, radians

# Регистрируем DataFrame как временную таблицу
station_data.createOrReplaceTempView("stations")

# Функция для вычисления геодезического расстояния
def get_distance(lat_1, long_1, lat_2, long_2):
    R = 6373.0  # Радиус Земли в километрах
    lat_1 = radians(lat_1)
    lat_2 = radians(lat_2)
    long_1 = radians(long_1)
    long_2 = radians(long_2)

    dlong = long_2 - long_1
    dlat = lat_2 - lat_1

    a = sin(dlat / 2)**2 + cos(lat_1) * cos(lat_2) * sin(dlong / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c
    return distance

# Конвертация функции в pyspark.sql.functions.udf
udf_get_distance = udf(get_distance, DoubleType())

# Выполняем SQL-запрос для объединения станций в пары
station_pairs = spark.sql("""
    SELECT
        station1.id AS station_1_id,
        station1.lat AS lat_1,
        station1.long AS long_1,
        station2.id AS station_2_id,
        station2.lat AS lat_2,
        station2.long AS long_2
    FROM stations station1
    CROSS JOIN stations station2
    WHERE station1.id != station2.id
""")

# Применяем UDF для вычисления расстояния
station_pairs_with_distance = station_pairs.withColumn(
    "distance",
    udf_get_distance(
        col("lat_1"),
        col("long_1"),
        col("lat_2"),
        col("long_2")
    )
)

# Сортируем по убыванию и находим максимальное расстояние
biggest_dist_stations = station_pairs_with_distance.orderBy(col("distance"), ascending=False).first()

print(f"Наибольшее геодезическое расстояние = {biggest_dist_stations['distance']}, станции {biggest_dist_stations['station_1_id']} - {biggest_dist_stations['station_2_id']}")

Наибольшее геодезическое расстояние = 69.9428256877473, станции 16 - 60


# 3. Найти путь велосипеда с максимальным временем пробега через станции

In [81]:
trip_data.createOrReplaceTempView("trips")

bike_max_trip = spark.sql("""
    SELECT bike_id, start_station_name, end_station_name, duration
    FROM trips
    ORDER BY duration DESC
    LIMIT 1
""").first()

# Вывод пути
if bike_max_trip:
    print(f"{bike_max_trip['start_station_name']} - {bike_max_trip['end_station_name']} за {bike_max_trip['duration']}")

South Van Ness at Market - 2nd at Folsom за 17270400


# 4. Найти количество велосипедов в системе

In [82]:
trip_data.createOrReplaceTempView("trips")

bikes_count = spark.sql("""
    SELECT COUNT(DISTINCT bike_id) AS unique_bikes_count
    FROM trips
""").collect()[0]["unique_bikes_count"]

# Вывод количества уникальных велосипедов
print(f"{bikes_count}")

700


# 5. Найти пользователей потративших на поездки более 3 часов

In [83]:
trip_data.createOrReplaceTempView("trips")

long_term_users = spark.sql("""
    SELECT zip_code
    FROM trips
    GROUP BY zip_code
    HAVING SUM(duration) > 3 * 60 * 60
""")

# Выводим результат
long_term_users.show()

+--------+
|zip_code|
+--------+
|   94102|
|   95134|
|   84606|
|   80305|
|   60070|
|   95519|
|   43085|
|   91910|
|   77339|
|   48063|
|   85022|
|    1090|
|    2136|
|   11722|
|   95138|
|   94610|
|   94404|
|   80301|
|   91326|
|   90742|
+--------+
only showing top 20 rows

