In [1]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:03[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 KB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488513 sha256=70bfa96b2d141dcc0b0d1c0ebddf918228b4646545d9d9dc93e9d58e0a06bf1b
  Stored in directory: /home/r_usttt/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Instal

In [3]:
!pip install geopy

Defaulting to user installation because normal site-packages is not writeable
Collecting geopy
  Downloading geopy-2.4.1-py3-none-any.whl (125 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m125.4/125.4 KB[0m [31m936.5 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting geographiclib<3,>=1.52
  Downloading geographiclib-2.0-py3-none-any.whl (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.3/40.3 KB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: geographiclib, geopy
Successfully installed geographiclib-2.0 geopy-2.4.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, desc
from geopy.distance import geodesic
from itertools import combinations

# create spark session
conf = SparkSession.builder.appName("lab1").getOrCreate()

In [3]:
trips_df = conf.read.csv("trips.csv", header=True, inferSchema=True)
stations_df = conf.read.csv("stations.csv", header=True, inferSchema=True)

                                                                                

In [4]:
trips_df

DataFrame[id: int, duration: int, start_date: string, start_station_name: string, start_station_id: int, end_date: string, end_station_name: string, end_station_id: int, bike_id: int, subscription_type: string, zip_code: string]

In [5]:
stations_df

DataFrame[id: int, name: string, lat: double, long: double, dock_count: int, city: string, installation_date: string]

In [13]:
print(trips_df.count())
print(stations_df.count())

print(f'data scheme of the trips_df: { trips_df.printSchema() }')
print(f'data scheme of the stations_df: { stations_df.printSchema() }')

trips_df.show(5)
stations_df.show(5)

print("count unique 'start_station_name':", trips_df.select("start_station_name").distinct().count())
print("count unique 'end_station_name':", trips_df.select("end_station_name").distinct().count())
print("count unique 'bike_id':", trips_df.select("bike_id").distinct().count())

669959
70
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

data scheme of the trips_df: None
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)

data scheme of the stations_df: None
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+

Задание 1. Найти велосипед с максимальным временем пробега.

In [6]:
max_duration_bike = trips_df.groupBy("bike_id").max("duration").orderBy(desc("max(duration)")).limit(1)
max_duration_bike.show()

[Stage 4:>                                                          (0 + 8) / 8]

+-------+-------------+
|bike_id|max(duration)|
+-------+-------------+
|    535|     17270400|
+-------+-------------+



                                                                                

Задание 2. Найти наибольшее геодезическое расстояние между станциями.

In [16]:
stations_coords = stations_df.select("name", "lat", "long").collect()

# create all possible station combinations
station_combinations = combinations(stations_coords, 2)

max_distance = 0
station_names = None

for comb in station_combinations:
    station1 = comb[0]
    station2 = comb[1]

    dist = geodesic((station1.lat, station1.long), (station2.lat, station2.long)).kilometers

    if dist > max_distance:
        max_distance = dist
        station_names = (station1.name, station2.name)

print(f'max dist "{station_names[0]}" and "{station_names[1]}" = {max_distance:.2f} km')

max dist "SJSU - San Salvador at 9th" and "Embarcadero at Sansome" = 69.92 km


Задание 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [17]:
max_duration_trip = trips_df.orderBy(desc("duration")).first()

print("start station:", max_duration_trip.start_station_name)
print("end station:", max_duration_trip.end_station_name)
print("duration (sec):", max_duration_trip.duration)



start station: South Van Ness at Market
end station: 2nd at Folsom
duration (sec): 17270400


                                                                                

Задание 4. Найти количество велосипедов в системе.

In [18]:
print(trips_df.select("bike_id").distinct().count())



700


                                                                                

Задание 5. Найти пользователей потративших на поездки более 3 часов.

In [19]:
# convert seconds into hours
trips_df_with_hours = trips_df.withColumn("duration_hours", col("duration") / 3600)

# group by id and summ the trips duration for each user
user_total_time = trips_df_with_hours.groupBy("bike_id").agg(sum("duration_hours").alias("total_time"))

# get ones, which lasted longer than 3 hours
long_trips = user_total_time.filter(col("total_time") > 3)

long_trips.show()

+-------+------------------+
|bike_id|        total_time|
+-------+------------------+
|    471|477.45305555555564|
|    496| 466.5466666666667|
|    148| 92.26055555555556|
|    463|478.55444444444447|
|    540|486.89861111111117|
|    392|497.07666666666677|
|    623| 565.8941666666667|
|    243| 85.40499999999999|
|    516| 526.8752777777777|
|     31|113.30749999999999|
|    580| 287.3283333333334|
|    137| 424.7777777777777|
|    251| 356.3833333333334|
|    451|470.99277777777786|
|     85| 337.4358333333334|
|    458|457.52222222222235|
|     65| 60.25611111111111|
|    588|  74.0041666666667|
|    255|110.10972222222222|
|     53|62.885833333333345|
+-------+------------------+
only showing top 20 rows



In [13]:
conf.stop()