# Установка JDK и spark, задание переменных окружения, подключение модулей

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz


gzip: stdin: unexpected end of file
tar: Unexpected EOF in archive
tar: Unexpected EOF in archive
tar: Error is not recoverable: exiting now


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [1]:
!pip install findspark
import findspark
findspark.init()

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

In [5]:
spark = SparkSession \
    .builder \
    .appName("L1_BD") \
    .getOrCreate()

In [7]:
import os
data_path = os.path.join(os.curdir, "data")
trips_path = os.path.join("trip.csv")
stations_path = os.path.join("station.csv")

In [9]:
trip_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv(trips_path)

print("Trips")
trip_data.printSchema()

stations_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv(stations_path)

print("Stations")
stations_data.printSchema()

Trips
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

Stations
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



# Решите следующие задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):
1. Найти велосипед с максимальным временем пробега.
2. Найти наибольшее геодезическое расстояние между станциями.
3. Найти путь велосипеда с максимальным временем пробега через станции.
4. Найти количество велосипедов в системе.
5. Найти пользователей потративших на поездки более 3 часов.

# Задание №1

In [69]:
# Группировка по id велосипеда и сумммирование пробега. после выстраивание по убыванию длительности
bike_with_max_duration = trip_data.groupBy("bike_id").agg(sum(col("duration")).alias("total_trips_duration"))\
.orderBy(col("total_trips_duration").desc()).first()

print(f"Велосипед #{ bike_with_max_duration['bike_id'] } с суммарным временем пробега  = {bike_with_max_duration['total_trips_duration']}.")

Велосипед #535 с суммарным временем пробега  = 18611693.


# Задание №2

In [48]:
import math

def geodesic_distance(lat1, lon1, lat2, lon2):
    # Радиус Земли в километрах
    R = 6371.0
    lat1, lat2 = math.radians(lat1),  math.radians(lat2)

    delta_lon = math.radians(lon2 - lon1)
    delta_lat = lat2 - lat1

    # Вычисление геодезического расстояния по формуле Хаверсина
    a = math.sin(delta_lat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(delta_lon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    distance = R * c
    return distance

# Конвертация функции в pyspark.sql.functions.udf (user-defined function)
geodesic_distance_udf = udf(geodesic_distance, DoubleType())

# Объединение датасета станций с самим собой для получения всех возможных пар
station_pairs = stations_data.alias("station1").crossJoin(stations_data.alias("station2"))

# Вычисление расстояния для каждой пары станций с помощью объявленной ранее функции
station_pairs_with_distance = station_pairs.withColumn(
    "geodesic_distance",
    geodesic_distance_udf(
        col("station1.lat"),
        col("station1.long"),
        col("station2.lat"),
        col("station2.long")
    )
)

# Поиск максимального геодезического расстояния среди всех расстояний для каждой пары станций
max_distance_stations = station_pairs_with_distance.sort("geodesic_distance", ascending=False).first()
# Задаём шаг, чтобы корректно вывести названия станций
pair_step = len(stations_data.columns)
first_station = max_distance_stations[max_distance_stations.index(max_distance_stations['name'])]
second_station = max_distance_stations[max_distance_stations.index(max_distance_stations['name']) + pair_step]

print(f"Максимальное геодезическое расстояние между станциями равно {max_distance_stations['geodesic_distance']} километрам \
между станциями {first_station} и {second_station}. ")

Максимальное геодезическое расстояние между станциями равно 69.92087595428244 километрам между станциями SJSU - San Salvador at 9th и Embarcadero at Sansome. 


# Задание №3

In [67]:
# ищем максимальный путь
longest_trip = trip_data.sort("duration", ascending=False).first()

print(f"Максимальный путь ({longest_trip['duration']}) велосипеда id={longest_trip['bike_id']} \
из станции {longest_trip['start_station_name']} на станцию {longest_trip['end_station_name']}.")

Максимальный путь (17270400) велосипеда id=535 из станции South Van Ness at Market на станцию 2nd at Folsom.


# Задание №4

In [68]:
# с помощью countDistinct считаем уникальные значения
total_number_of_bikes = trip_data.select(countDistinct("bike_id")).first()[0]
print(f"Общее число велосипедов в системе {total_number_of_bikes}.")

Общее число велосипедов в системе 700.


# Задание №5

In [84]:
# выведем данные отсортированные по айди, умножаем на 3600 для правильного сравнения
strong_users = trip_data.groupBy("bike_id").agg({"duration": "sum"}).filter(col("sum(duration)") > 3* 3600)\
.orderBy(col("bike_id"))
strong_users.show()

+-------+-------------+
|bike_id|sum(duration)|
+-------+-------------+
|      9|       913730|
|     10|       551314|
|     11|       315011|
|     12|       757912|
|     13|       949523|
|     14|       399114|
|     15|       831149|
|     16|      1334601|
|     17|       509406|
|     18|       500113|
|     19|       543930|
|     20|       263431|
|     21|       282836|
|     22|       936581|
|     23|       420393|
|     24|       293533|
|     25|       453322|
|     26|       236064|
|     27|       404937|
|     28|       342587|
+-------+-------------+
only showing top 20 rows

