# Установка и настройка окружения

In [1]:
# Установка Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Скачивание Spark с выводом статуса (убираем -q для диагностики)
!wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

# Проверка, что файл загружен
!ls -lh spark-3.5.1-bin-hadoop3.tgz || echo "Ошибка: файл не загружен!"

# Распаковка файла, если он существует
!test -f spark-3.5.1-bin-hadoop3.tgz && tar xf spark-3.5.1-bin-hadoop3.tgz || echo "Пропускаем распаковку: файл отсутствует"

# Установка PySpark
!pip install -q pyspark

--2025-04-09 17:05:47--  https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400446614 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.1-bin-hadoop3.tgz.1’


2025-04-09 17:11:41 (1.08 MB/s) - ‘spark-3.5.1-bin-hadoop3.tgz.1’ saved [400446614/400446614]

-rw-r--r-- 1 root root 382M Feb 15  2024 spark-3.5.1-bin-hadoop3.tgz


In [16]:
import os
# Устанавливаем переменную окружения JAVA_HOME, указывая путь к Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Импортируем SparkSession для работы с DataFrame API
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as sql_sum
import pyspark.sql.functions as F
# Импортируем math для математических вычислений
import math
# Импортируем udf для создания пользовательских функций в DataFrame
from pyspark.sql.functions import udf
# Импортируем типы данных для задания схемы
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType


# Создаем SparkSession — точку входа для работы с Spark
spark = SparkSession.builder \
    .appName("BikeShareAnalysis") \
    .master("local[*]") \
    .getOrCreate()

# Получаем SparkContext для работы с RDD
sc = spark.sparkContext
# Выводим версию Spark для проверки
print(f"Spark version: {spark.version}")

Spark version: 3.5.5


In [18]:
# Проверяем количество строк в файлах для диагностики
print("Количество строк в файлах:")
!wc -l /content/trip.csv
!wc -l /content/station.csv
print("Размер файла trip.csv:")
!ls -lh /content/trip.csv

# Определяем схему для trip.csv
trip_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("duration", IntegerType(), True),
    StructField("start_date", StringType(), True),
    StructField("start_station_name", StringType(), True),
    StructField("start_station_id", IntegerType(), True),
    StructField("end_date", StringType(), True),
    StructField("end_station_name", StringType(), True),
    StructField("end_station_id", IntegerType(), True),
    StructField("bike_id", IntegerType(), True),
    StructField("subscription_type", StringType(), True),
    StructField("zip_code", StringType(), True)
])

# Определяем схему для station.csv
station_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("dock_count", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("installation_date", StringType(), True)
])

# Загружаем данные из trip.csv в DataFrame без заголовка
trip_df = spark.read.schema(trip_schema) \
    .option("mode", "PERMISSIVE") \
    .csv("/content/trip.csv")

# Удаляем первую строку (заголовок)
trip_df = trip_df.filter(col("id").cast("int").isNotNull())

# Загружаем данные из station.csv в DataFrame без заголовка
station_df = spark.read.schema(station_schema) \
    .option("mode", "PERMISSIVE") \
    .csv("/content/station.csv")

# Удаляем первую строку (заголовок)
station_df = station_df.filter(col("id").cast("int").isNotNull())

# Выводим первые 5 строк trip.csv для проверки данных
print("Первые 5 строк trip.csv:")
trip_df.show(5)

# Выводим первые 5 строк station.csv для проверки данных
print("Первые 5 строк station.csv:")
station_df.show(5)

Количество строк в файлах:
669960 /content/trip.csv
71 /content/station.csv
Размер файла trip.csv:
-rw-r--r-- 1 root root 77M Apr  9 14:22 /content/trip.csv
Первые 5 строк trip.csv:
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mo

# Задание 1 — Найти велосипед с максимальным временем пробега

In [4]:
# RDD подход
trip_rdd = trip_df.rdd
bike_duration_rdd = trip_rdd.map(lambda x: (x["bike_id"], x["duration"]))
total_bike_duration_rdd = bike_duration_rdd.reduceByKey(lambda a, b: a + b)
max_bike_rdd = total_bike_duration_rdd.takeOrdered(1, key=lambda x: -x[1])[0]

max_bike_rdd_df = spark.createDataFrame(
    [(max_bike_rdd[0], max_bike_rdd[1])],
    ["bike_id", "duration"]
)
print("Задание 1: Велосипед с максимальным временем пробега (RDD):")
max_bike_rdd_df.show()

# DataFrame подход
max_bike_df = trip_df.groupBy("bike_id") \
    .agg(sql_sum("duration").alias("duration")) \
    .orderBy(col("duration").desc()) \
    .limit(1)

print("Задание 1: Велосипед с максимальным временем пробега (DataFrame):")
max_bike_df.show()

Задание 1: Велосипед с максимальным временем пробега (RDD):
+-------+--------+
|bike_id|duration|
+-------+--------+
|    535|18611693|
+-------+--------+

Задание 1: Велосипед с максимальным временем пробега (DataFrame):
+-------+--------+
|bike_id|duration|
+-------+--------+
|    535|18611693|
+-------+--------+



# Задание 2 — Найти наибольшее геодезическое расстояние между станциями

In [5]:
# Определяем функцию для вычисления геодезического расстояния (формула Хаверсина)
def haversine(lat1, lon1, lat2, lon2):
    R = 6371
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    return R * c

# RDD подход
stations_rdd = station_df.rdd
station_pairs = stations_rdd.cartesian(stations_rdd)
station_pairs = station_pairs.filter(lambda x: x[0]["id"] < x[1]["id"])
distances_rdd = station_pairs.map(lambda x: (
    (x[0]["id"], x[1]["id"]),
    haversine(x[0]["lat"], x[0]["long"], x[1]["lat"], x[1]["long"])
))
max_distance_rdd = distances_rdd.takeOrdered(1, key=lambda x: -x[1])[0]

max_distance_rdd_df = spark.createDataFrame(
    [(max_distance_rdd[0][0], max_distance_rdd[0][1], max_distance_rdd[1])],
    ["station_id_1", "station_id_2", "distance_km"]
)
print("Задание 2: Наибольшее геодезическое расстояние между станциями (RDD):")
max_distance_rdd_df.show()

# DataFrame подход
haversine_udf = udf(haversine, DoubleType())
station_pairs_df = station_df.alias("s1").crossJoin(station_df.alias("s2"))
station_pairs_df = station_pairs_df.filter(col("s1.id") < col("s2.id"))
station_pairs_df = station_pairs_df.select(
    col("s1.id").alias("station_id_1"),
    col("s2.id").alias("station_id_2"),
    haversine_udf(col("s1.lat"), col("s1.long"), col("s2.lat"), col("s2.long")).alias("distance_km")
)
max_distance_df = station_pairs_df.orderBy(col("distance_km").desc()).limit(1)

print("Задание 2: Наибольшее геодезическое расстояние между станциями (DataFrame):")
max_distance_df.show()


Задание 2: Наибольшее геодезическое расстояние между станциями (RDD):
+------------+------------+-----------------+
|station_id_1|station_id_2|      distance_km|
+------------+------------+-----------------+
|          16|          60|69.92087595428183|
+------------+------------+-----------------+

Задание 2: Наибольшее геодезическое расстояние между станциями (DataFrame):
+------------+------------+-----------------+
|station_id_1|station_id_2|      distance_km|
+------------+------------+-----------------+
|          16|          60|69.92087595428183|
+------------+------------+-----------------+



# Задание 3 — Найти путь велосипеда с максимальным временем пробега через станции

In [6]:
max_bike_id = max_bike_df.first()["bike_id"]
bike_path_df = trip_df.filter(col("bike_id") == max_bike_id)
bike_path_df = bike_path_df.select(
    "start_station_id",
    "end_station_id",
    "start_date",
    "duration"
).orderBy("start_date")

print(f"Задание 3: Путь велосипеда с максимальным временем пробега (ID {max_bike_id}):")
bike_path_df.show(10)
print(f"Количество поездок для велосипеда ID {max_bike_id}: {bike_path_df.count()}")

Задание 3: Путь велосипеда с максимальным временем пробега (ID 535):
+----------------+--------------+---------------+--------+
|start_station_id|end_station_id|     start_date|duration|
+----------------+--------------+---------------+--------+
|              75|            60| 1/1/2014 13:42|    3289|
|              60|            76| 1/1/2014 18:51|    1286|
|              76|            66| 1/1/2014 19:48|     795|
|              67|            39|1/10/2014 20:13|     235|
|              51|            70| 1/10/2014 8:09|     596|
|              70|            55| 1/10/2014 8:21|     600|
|              55|            67| 1/10/2014 9:19|     802|
|              39|            67|1/11/2014 19:06|     336|
|              67|            76|1/12/2014 12:21|     480|
|              76|            70|1/12/2014 17:36|    1309|
+----------------+--------------+---------------+--------+
only showing top 10 rows

Количество поездок для велосипеда ID 535: 1328


# Задание 4 — Найти количество велосипедов в системе

In [7]:
# RDD подход
bike_ids_rdd = trip_df.rdd.map(lambda x: x["bike_id"])
unique_bikes_rdd = bike_ids_rdd.distinct()
bike_count_rdd = unique_bikes_rdd.count()

bike_count_rdd_df = spark.createDataFrame(
    [(bike_count_rdd,)],
    ["total_bikes"]
)
print("Задание 4: Количество велосипедов в системе (RDD):")
bike_count_rdd_df.show()


# DataFrame подход
bike_count_df = trip_df.select("bike_id").distinct().count()

bike_count_df_df = spark.createDataFrame(
    [(bike_count_df,)],
    ["total_bikes"]
)
print("Задание 4: Количество велосипедов в системе (DataFrame):")
bike_count_df_df.show()



Задание 4: Количество велосипедов в системе (RDD):
+-----------+
|total_bikes|
+-----------+
|        700|
+-----------+

Задание 4: Количество велосипедов в системе (DataFrame):
+-----------+
|total_bikes|
+-----------+
|        700|
+-----------+



# Задание 5 — Найти пользователей, потративших на поездки более 3 часов

In [17]:
three_hours = 3 * 3600  # 3 часа = 10,800 секунд

# Загружаем station.csv, чтобы получить список корректных station_id
station_df = spark.read.schema(station_schema).option("mode", "PERMISSIVE").csv("/content/station.csv")
station_df = station_df.filter(col("id").cast("int").isNotNull())
valid_station_ids = station_df.select("id").distinct().rdd.map(lambda x: x["id"]).collect()

# Фильтрация данных
trip_df_filtered = trip_df.filter((col("duration") >= 60) & (col("duration") <= 24*3600)) \
    .filter(col("zip_code").isNotNull()) \
    .filter(F.length(col("zip_code")) == 5) \
    .filter(col("zip_code").cast("int").isNotNull()) \
    .filter(col("zip_code").startswith("9")) \
    .filter(col("start_station_id").isin(valid_station_ids)) \
    .filter(col("end_station_id").isin(valid_station_ids)) \
    .filter(col("subscription_type").isin(["Subscriber", "Customer"])) \
    .filter(F.year(F.to_timestamp(col("start_date"), "M/d/yyyy H:mm")).between(2013, 2015)) \
    .dropDuplicates(["id"])

# DataFrame подход
user_duration_df = trip_df_filtered.groupBy("zip_code") \
    .agg(sql_sum("duration").alias("total_duration"))
user_duration_df = user_duration_df.filter(col("total_duration") > three_hours) \
    .orderBy(col("total_duration").desc())  # Сортировка по total_duration в убывающем порядке
user_count_df = user_duration_df.count()

# Проверяем zip_code, которые не достигли порога в 3 часа
under_three_hours_df = trip_df_filtered.groupBy("zip_code") \
    .agg(sql_sum("duration").alias("total_duration")) \
    .filter(col("total_duration") <= three_hours)

user_count_df_df = spark.createDataFrame(
    [(user_count_df,)],
    ["users_over_3_hours"]
)

print("Задание 5: Первые 15 zip-кодов с временем более 3 часов (DataFrame):")
user_duration_df.show(15)


# RDD подход
user_duration_rdd = trip_df_filtered.rdd.map(lambda x: (x["zip_code"], x["duration"]))
user_duration_rdd = user_duration_rdd.reduceByKey(lambda a, b: a + b)
user_duration_rdd = user_duration_rdd.filter(lambda x: x[1] > three_hours) \
    .sortBy(lambda x: x[1], ascending=False)  # Сортировка по total_duration в убывающем порядке
user_count_rdd = user_duration_rdd.count()

user_count_rdd_df = spark.createDataFrame(
    [(user_count_rdd,)],
    ["users_over_3_hours"]
)

user_duration_rdd_list = user_duration_rdd.take(15)
user_duration_rdd_df = spark.createDataFrame(
    user_duration_rdd_list,
    ["zip_code", "total_duration_seconds"]
)
print("Задание 5: Первые 15 zip-кодов с временем более 3 часов (RDD):")
user_duration_rdd_df.show()

Задание 5: Первые 15 zip-кодов с временем более 3 часов (DataFrame):
+--------+--------------+
|zip_code|total_duration|
+--------+--------------+
|   94107|      48582326|
|   94105|      24834030|
|   94133|      21010654|
|   94103|      18197645|
|   94102|      16543128|
|   94111|      13870729|
|   94109|      11917431|
|   95112|      10150013|
|   94110|       6579292|
|   94117|       6392830|
|   94040|       6248265|
|   94158|       6111822|
|   94025|       5178237|
|   94108|       4996850|
|   94041|       4989816|
+--------+--------------+
only showing top 15 rows

Задание 5: Первые 15 zip-кодов с временем более 3 часов (RDD):
+--------+----------------------+
|zip_code|total_duration_seconds|
+--------+----------------------+
|   94107|              48582326|
|   94105|              24834030|
|   94133|              21010654|
|   94103|              18197645|
|   94102|              16543128|
|   94111|              13870729|
|   94109|              11917431|
|   9511