**Решите следующие задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):**

In [1]:
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct
import os
from math import sin, cos, sqrt, atan2, radians

In [2]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!rm spark-3.5.5-bin-hadoop3.tgz
!wget -q http://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"

!pip install findspark # для нахождения spark в системе
import findspark
findspark.init()



In [3]:
# создадим объект sparksession для входа в apache spark
spark = SparkSession \
  .builder \
  .appName("San_Francisco_bike_parks") \
  .getOrCreate()

print("version spark:", spark.version)

version spark: 3.5.5


In [4]:
# загрузка данных
# первая строка - заголовки
# определение типов
# формат времени
trips_data = spark.read \
.option("header", True) \
.option("inferSchema", True) \
.option("timestampFormat", 'M/d/y H:m') \
.csv(os.path.join("trips.csv"))

stations_data = spark.read \
.option("header", True) \
.option("inferSchema", True) \
.option("timestampFormat", 'M/d/y H:m') \
.csv(os.path.join("stations.csv"))

In [5]:
# просмотр данных
print("trips")
trips_data.printSchema()

print("stations")
stations_data.printSchema()

trips
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

stations
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



**1. Найти велосипед с максимальным временем пробега.**

In [6]:
# группировка данных по идентификатору
# сумма всех значений в "duration" для каждого идент.
# сортировка по убыванию
# возврат первой строки
max_duration = trips_data.groupBy("bike_id") \
  .agg(F.sum("duration").alias("total_trips_duration")) \
  .orderBy(F.desc("total_trips_duration")) \
  .first()

print(f"bike_id = {max_duration['bike_id']}")
print(f"duration = {max_duration['total_trips_duration']}")

bike_id = 535
duration = 18611693


**2. Найти наибольшее геодезическое расстояние между станциями.**

In [7]:
# функция для вычисления геодезического расстояния
# принимает широту и долготу двух точек
def geodesic_dist(lat_st1, long_st1, lat_st2, long_st2):
  # перевод широты и долготы из градусов в радианы
  lat_st1, long_st1, lat_st2, long_st2 = map(radians, [lat_st1, long_st1, lat_st2, long_st2])
  d_lat, d_long = lat_st2 - lat_st1, long_st2 - long_st1 # вычисление разницы
  # формула Хаверсина
  a = sin(d_lat / 2) ** 2 + cos(lat_st1) * cos(lat_st2) * sin(d_long / 2) ** 2
  c = 2 * atan2(sqrt(a), sqrt(1 - a)) # вычисление углового расстояния между точками
  return 6373 * c # домножение на радиус земли

# преобразование функции в пользовательскую spark
geodesic_dist_udf = F.udf(geodesic_dist, DoubleType())

# создаем копию stations_data
# декартово произведение, каждая станция со всеми другими
# вычисление расстояния между каждой парой станций
# макс. расстояние между всеми станциями
# извлечение результата, берем макс. расстояние
max_dist = stations_data.alias("station1") \
  .crossJoin(stations_data.alias("station2")) \
  .withColumn(
    "geodesic_dist",
    geodesic_dist_udf(
      F.col("station1.lat"),
      F.col("station1.long"),
      F.col("station2.lat"),
      F.col("station2.long")
    )
  ) \
  .agg(F.max("geodesic_dist").alias("max_distance")) \
  .collect()[0]["max_distance"]

print(f"max distance = {max_dist}")

max distance = 69.9428256877473


**3. Найти путь велосипеда с максимальным временем пробега через станции.**

In [8]:
# сортировка trips_data по "duration" в порядке убывания, отбор нужных столбцов, берем первую строку
trip_max_duration = trips_data.orderBy(col("duration").desc()).select("start_station_name", "end_station_name", "duration").first()

print(f"the longest trip = {trip_max_duration['duration']}")
print(f"from = {trip_max_duration['start_station_name']}")
print(f"to = {trip_max_duration['end_station_name']}")

the longest trip = 17270400
from = South Van Ness at Market
to = 2nd at Folsom


**4. Найти количество велосипедов в системе.**

In [9]:
# выбираем столбец "bike_id", убираем дубликаты и подсчитываем кол-во строк
bikes_count = trips_data.select("bike_id").distinct().count()

print(f"number of bikes = {bikes_count}")

number of bikes = 700


**5. Найти пользователей потративших на поездки более 3 часов.**

In [10]:
# группировка данных по id, для каждой группы подсчет суммарного времени
# обзываем столбец
# фильтрация данных, оставляем строки, где > 3 часов
trips_data.groupBy("bike_id").sum("duration") \
  .withColumnRenamed("sum(duration)", "sum_time") \
  .filter("sum_time > 10800") \
  .show()

+-------+--------+
|bike_id|sum_time|
+-------+--------+
|    471| 1718831|
|    496| 1679568|
|    148|  332138|
|    463| 1722796|
|    540| 1752835|
|    392| 1789476|
|    623| 2037219|
|    243|  307458|
|    516| 1896751|
|     31|  407907|
|    580| 1034382|
|    137| 1529200|
|    251| 1282980|
|    451| 1695574|
|     85| 1214769|
|    458| 1647080|
|     65|  216922|
|    588|  266415|
|    255|  396395|
|     53|  226389|
+-------+--------+
only showing top 20 rows

