In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [33]:
import findspark

findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import udf, col, max, sum, countDistinct
from math import sin, cos, sqrt, atan2, radians

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [44]:
trips_df = spark.read.csv('trips.csv', header=True, inferSchema=True, sep=",")
trips_df.show(5)
trips_df.printSchema()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|           null|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|    null|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [45]:
stations_df = spark.read.csv('stations.csv', header=True, inferSchema=True, sep=",")
stations_df.show(5)
stations_df.printSchema()

+---+--------------------+------------------+-------------------+----------+--------+-----------------+
| id|                name|               lat|               long|dock_count|    city|installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|San Jose|         8/7/2013|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
only showing top 5 rows

root
 |-- id: integer (nullable = true)

*   Найти велосипед с максимальным временем пробега.
*   Найти наибольшее геодезическое расстояние между станциями.
*   Найти путь велосипеда с максимальным временем пробега через станции.
*   Найти количество велосипедов в системе.
*   Найти пользователей потративших на поездки более 3 часов.

In [47]:
# Найти велосипед с максимальным временем пробега.
from pyspark.sql import functions as F


total_trips_bike = trips_df.groupBy("bike_id").agg(sum(col("duration")).alias("total_trips_duration"))

bike_max_trips = total_trips_bike.orderBy(col("total_trips_duration").desc()).first()

total_id = bike_max_trips["bike_id"]
total_duration = bike_max_trips["total_trips_duration"]

print("Bike", total_id, "has total duration ", total_duration)

Bike 535 has total duration  18611693


In [48]:
# Найти путь велосипеда с максимальным временем пробега через станции.

def geodesic_distance(lat1, lon1, lat2, lon2):
    R = 6373.0
    lat1, lat2 = radians(float(lat1)), radians(float(lat2))
    lon1, lon2 = radians(float(lon1)), radians(float(lon2))
    dlon, dlat = lon2 - lon1, lat2 - lat1
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c
    return distance

distance = udf(geodesic_distance, DoubleType())

station = stations_df.alias("station1").crossJoin(stations_df.alias("station2"))

# Вычисление геодезических расстояний между всеми парами станций
station_distance = station.withColumn("geodesic_distance", distance(col("station1.lat"), col("station1.long"), col("station2.lat"), col("station2.long")))

# Нахождение максимального геодезического расстояния
dist = station_distance.selectExpr("max(geodesic_distance) as max_distance").collect()[0]["max_distance"]
print(f"Наибольшее геодезическое расстояние между станциями {dist}")

Наибольшее геодезическое расстояние между станциями 69.9428256877473


In [49]:
# Найти путь велосипеда с максимальным временем пробега через станции.

# Сортируем поездки по длительности и выбираем наиболее длительную
longest_trip = trips_df.select("start_station_name", "end_station_name", "duration") \
                        .orderBy(col('duration').desc()).first()

start_station = longest_trip["start_station_name"]
end_station = longest_trip["end_station_name"]
trip_duration = longest_trip["duration"]

print(f"Путь из \"{start_station}\" в \"{end_station}\" занял максимальное время пробега {trip_duration} секунд")

Путь из "South Van Ness at Market" в "2nd at Folsom" занял максимальное время пробега 17270400 секунд


In [50]:
# Найти количество велосипедов в системе.

bikes = trips_df.select('bike_id').distinct().count()

print(f'Количество велосипедов в системе: {bikes}')

Количество велосипедов в системе: 700


In [51]:
# Найти пользователей потративших на поездки более 3 часов.

time_of_bike_trips= trips_df.groupBy('bike_id').sum('duration').withColumnRenamed("sum(duration)", "total_duration")

time_of_bike_trips.filter('total_duration > 10800').show()

+-------+--------------+
|bike_id|total_duration|
+-------+--------------+
|    471|       1718831|
|    496|       1679568|
|    148|        332138|
|    463|       1722796|
|    540|       1752835|
|    392|       1789476|
|    623|       2037219|
|    243|        307458|
|    516|       1896751|
|     31|        407907|
|    580|       1034382|
|    137|       1529200|
|    251|       1282980|
|    451|       1695574|
|     85|       1214769|
|    458|       1647080|
|     65|        216922|
|    588|        266415|
|    255|        396395|
|     53|        226389|
+-------+--------------+
only showing top 20 rows

