In [81]:
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz

In [1]:
import os
os.environ["SPARK_HOME"] = "spark-3.5.1-bin-hadoop3"
os.environ["JAVA_HOME"] = "/usr"

In [2]:
!pip install findspark



In [3]:
import findspark
findspark.init()

In [4]:
!pip3 install pyspark==3.5.1



In [None]:
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

spark = SparkSession \
    .builder \
    .appName("Lab1_6407_Fomin") \
    .getOrCreate()

In [6]:
import os
trips_path = os.path.join("../data/trips.csv")
stations_path = os.path.join("../data/stations.csv")

trips = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("DateTimeFormat", 'M/d/y H:m') \
        .csv(trips_path)

stations = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("DateTimeFormat", 'M/d/y H:m') \
        .csv(stations_path)

_______________________________________

### 1. Найти велосипед с максимальным временем пробега

In [27]:
trips.createOrReplaceTempView("trips")

total_duration_per_bike = trips.groupBy("bike_id").agg(sum("duration").alias("total_duration"))
most_used_bike = total_duration_per_bike.orderBy(col("total_duration").desc()).first()

most_used_bike

Row(bike_id=535, total_duration=18611693)

### 2. Найти наибольшее геодезическое расстояние между станциями

In [42]:
from pyspark.sql.functions import col

print("Кол-во станций: ", stations.count())
stations_to_stations = stations.crossJoin(stations.select(
    col("lat").alias("lat2"),
    col("long").alias("long2"),
))
print("Станции к станциям: ", stations_to_stations.count())

stations_to_stations.printSchema()

Кол-во станций:  70
Станции к станциям:  4900
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)
 |-- lat2: double (nullable = true)
 |-- long2: double (nullable = true)



In [20]:
import math
from pyspark.sql.functions import col
from pyspark.sql.functions import max

def geodesic_distance(lat1, lon1, lat2, lon2):
    lat1 = math.radians(lat1)
    lat2 = math.radians(lat2)
    lon1 = math.radians(lon1)
    lon2 = math.radians(lon2)

    try:
        distance = 6371 * math.acos(math.cos(lat1) * math.cos(lat2) * math.cos(lon1 - lon2) + math.sin(lat1) * math.sin(lat2))
    except:
        distance = 0.0

    return distance



haversine_udf = udf(geodesic_distance, DoubleType())
distances = stations_to_stations.withColumn(
    "distance",
    haversine_udf(stations_to_stations['lat'], stations_to_stations['long'], stations_to_stations['lat2'], stations_to_stations['long2']),
)

max_distance = distances.agg(max("distance").alias("max_distance")).collect()[0]["max_distance"]
print(f"Максимальное расстояние: {max_distance}")

Максимальное расстояние: 69.92087595421542


### 3. Найти путь велосипеда с максимальным временем пробега через станции

In [36]:
from pyspark.sql.functions import to_timestamp

(trips.select(col("start_date"), col("start_station_name"), col("end_station_name"), col("duration"))
    .where(col("bike_id") == most_used_bike["bike_id"])
    .withColumn("start_date", to_timestamp(col("start_date"), 'M/d/yyyy H:mm'))
    .orderBy("start_date")
    .show())

+-------------------+--------------------+--------------------+--------+
|         start_date|  start_station_name|    end_station_name|duration|
+-------------------+--------------------+--------------------+--------+
|2013-08-29 19:32:00|     Post at Kearney|San Francisco Cal...|    1245|
|2013-08-29 21:38:00|San Francisco Cal...|San Francisco Cal...|     423|
|2013-08-30 08:40:00|San Francisco Cal...|   Market at Sansome|     842|
|2013-08-30 09:10:00|   Market at Sansome|   2nd at South Park|     498|
|2013-09-01 12:58:00|     2nd at Townsend|    Davis at Jackson|    1671|
|2013-09-05 11:59:00|San Francisco Cit...|Civic Center BART...|     260|
|2013-09-06 10:55:00|Civic Center BART...|     Post at Kearney|    1192|
|2013-09-06 13:58:00|     Post at Kearney|Embarcadero at Sa...|    1248|
|2013-09-06 15:07:00|Embarcadero at Sa...|Washington at Kea...|    1272|
|2013-09-06 23:22:00|Washington at Kea...|   Market at Sansome|     398|
|2013-09-07 12:08:00|   Market at Sansome|   Market

### 4. Найти количество велосипедов в системе

In [37]:
trips.select("bike_id").distinct().count()

700

### 5. Найти пользователей потративших на поездки более 3 часов

In [41]:
seconds_3_hours = 3 * 60 * 60
answer = trips.groupBy("bike_id").sum("duration").filter(f"sum(duration)>{seconds_3_hours}")
answer.show()

+-------+-------------+
|bike_id|sum(duration)|
+-------+-------------+
|    471|      1718831|
|    496|      1679568|
|    148|       332138|
|    463|      1722796|
|    540|      1752835|
|    392|      1789476|
|    623|      2037219|
|    243|       307458|
|    516|      1896751|
|     31|       407907|
|    580|      1034382|
|    137|      1529200|
|    251|      1282980|
|    451|      1695574|
|     85|      1214769|
|    458|      1647080|
|     65|       216922|
|    588|       266415|
|    255|       396395|
|     53|       226389|
+-------+-------------+
only showing top 20 rows



699