In [20]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

^C


In [21]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [22]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [23]:
from pyspark import SparkContext, SparkConf
from typing import NamedTuple
from datetime import datetime
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql import functions as func
from pyspark.sql.types import DoubleType

# Решение задач для данных велопарковок Сан-Франциско

1. Найти велосипед с максимальным временем пробега.
2. Найти наибольшее геодезическое расстояние между станциями.
3. Найти путь велосипеда с максимальным временем пробега через станции.
4. Найти количество велосипедов в системе.
5. Найти пользователей потративших на поездки более 3 часов.

In [44]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("LR1")\
        .getOrCreate()

In [45]:
trip_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("trips.csv")

In [46]:
trip_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [47]:
trip_data.show(n=5)

+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|         start_date|  start_station_name|start_station_id|           end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|               NULL|South Van Ness at...|              66|2013-08-29 14:14:00|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|    NULL|2013-08-29 14:42:00|  San Jose City Hall|              10|2013-08-29 14:43:00|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|2013-08-29 10:16:00|Mountain View Cit...|              27|2013-08-29 10:17:00|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|    

In [48]:
station_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("stations.csv")

In [49]:
station_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



In [50]:
station_data.show(n=5)

+---+--------------------+------------------+-------------------+----------+--------+-----------------+
| id|                name|               lat|               long|dock_count|    city|installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|San Jose|         8/7/2013|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
only showing top 5 rows



#1. Найти велосипед с максимальным временем пробега.

In [51]:
# группировка по bike_id, применение агрегатной функции sum и сортировка по убыванию полученной суммы.
# берём первое значение из списка => c максимальным временем пробега
bike_max_duration = trip_data.groupBy("bike_id").agg({"duration": "sum"}).sort("sum(duration)", ascending=False).first()
print(f"Велосипед с id = {bike_max_duration['bike_id']} имеет наибольший пробег ({bike_max_duration['sum(duration)']})")

Велосипед с id = 535 имеет наибольший пробег (18611693)


#2.Найти наибольшее геодезическое расстояние между станциями.

In [52]:
!pip install haversine



In [53]:
from haversine import haversine

In [34]:
# функция вычисления расстояния с помощью формулы гаверсинуса
def get_distance(lat_1, long_1, lat_2, long_2):
  return haversine((lat_1, long_1), (lat_2, long_2))

# создание udf функции
udf_get_distance = func.udf(get_distance)
# выборка необходимых столцов
station_coords = station_data.select("id", "lat", "long")
# создание пар всех станций со всеми
station_pairs = station_coords.crossJoin(station_coords)\
                .toDF("station_1_id", "lat_1", "long_1", "station_2_id", "lat_2", "long_2")
# удаление строк с одинаковым id станций
station_pairs = station_pairs.filter(station_pairs.station_1_id != station_pairs.station_2_id)
# применение функции расчёта расстояния ко всем строкам
station_pairs_distance = station_pairs.withColumn("distance",\
                                        udf_get_distance(station_pairs.lat_1, station_pairs.long_1,\
                                        station_pairs.lat_2, station_pairs.long_2))
# сортировка по убыванию, получение наибольшего значения
biggest_dist_stations = station_pairs_distance.sort("distance", ascending=False).first()

print(f"Наибольшее геодезическое расстояние ({biggest_dist_stations['distance']} км) \
между станциями {biggest_dist_stations['station_1_id']} и {biggest_dist_stations['station_2_id']}")

Наибольшее геодезическое расстояние (9.663183643537442 км) между станциями 24 и 36


#3. Найти путь велосипеда с максимальным временем пробега через станции.

In [54]:
# сортировка по убыванию, получение наибольшего пробега
bike_max_trip = trip_data.sort("duration", ascending=False).first()
# вывод пути
print(f"Максимальный путь велосипеда id={bike_max_trip['bike_id']} \
из станции {bike_max_trip['start_station_name']} на станцию {bike_max_trip['end_station_name']} \
({bike_max_trip['duration']})")

Максимальный путь велосипеда id=535 из станции South Van Ness at Market на станцию 2nd at Folsom (17270400)


# 4. Найти количество велосипедов в системе.

In [55]:
# счёт уникальных значений bike_id
bikes_count = trip_data.select("bike_id").distinct().count()
print(f"Количество велосипедов в системе: {bikes_count}")

Количество велосипедов в системе: 700


# 5. Найти пользователей потративших на поездки более 3 часов.

In [56]:
# группировка по zip_code, применение агрегатной функции sum, переименование полученного столбца.
users = trip_data.groupBy("zip_code").agg({"duration": "sum"}).withColumnRenamed("sum(duration)", "total_duration")
# оставляем строки с total_duration > 3 часов (3ч * 60 мин * 60 сек)
long_term_users = users.filter(users.total_duration > (3 * 60 * 60))
print("Пользователи, которые потратили на поездки более 3 часов:")
long_term_users.show()

Пользователи, которые потратили на поездки более 3 часов:
+--------+--------------+
|zip_code|total_duration|
+--------+--------------+
|   94102|      19128021|
|   95134|        728023|
|   84606|         95145|
|   80305|        180906|
|   60070|         28919|
|   95519|         30303|
|   43085|         11670|
|   91910|         50488|
|   77339|         13713|
|   48063|         13755|
|   85022|         12682|
|    1090|         20391|
|    2136|         16010|
|   11722|         24331|
|   95138|        155295|
|   94610|       3630628|
|   94404|       3589350|
|   80301|        152189|
|   91326|         65885|
|   90742|         10965|
+--------+--------------+
only showing top 20 rows



In [40]:
spark.stop()