In [27]:
from pyspark import SparkContext, SparkConf

import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct
from math import sin, cos, sqrt, atan2, radians

In [28]:
spark = SparkSession.builder.appName("L1_Pakhomov").getOrCreate()

In [29]:
import os
data_path = os.path.join(os.curdir, "data")
trips_path = os.path.join(data_path, "trips.csv")
stations_path = os.path.join(data_path, "stations.csv")

In [30]:
trip_data = spark.read.option("header", True).option("inferSchema", True).option("timestampFormat", 'M/d/y H:m').csv(trips_path)

print("Trips")
trip_data.printSchema()

stations_data = spark.read.option("header", True).option("inferSchema", True).option("timestampFormat", 'M/d/y H:m').csv(stations_path)

print("Stations")
stations_data.printSchema()

Trips
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

Stations
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



### Найти велосипед с максимальным временем пробега.

In [31]:
#Общее время пробега велосипеда
max_trips_duration_per_bike = trip_data.groupBy("bike_id").agg(sum(col("duration")).alias("total_trips_duration"))

# велосипед с максимальным пробегом
bike_with_max_trips_duration = max_trips_duration_per_bike.orderBy(col("total_trips_duration").desc()).first()

# id велосипеда
bike_id_with_max_duration = bike_with_max_trips_duration["bike_id"]

# пробег велосипеда
total_duration = bike_with_max_trips_duration["total_trips_duration"]
print(f'Велосипед с id {bike_id_with_max_duration} имеет наибольший пробег равный {total_duration}')

Велосипед с id 535 имеет наибольший пробег равный 18611693


### Найти наибольшее геодезическое расстояние между станциями.

In [32]:
#функция для расчёта геодизеческого расстояния
def calc_geo_distance(lat1,lon1,lat2,lon2):
    earth_r = 6371
    lat1 = radians(lat1)
    lon1 = radians(lon1)
    lat2 = radians(lat2)
    lon2 = radians(lon2)
    
    lat_diff = lat2 - lat1
    lon_diff = lon2 - lon1
    
    tmp1 = sin(lat_diff/2)**2 + cos(lat1)*cos(lat2) * sin(lon_diff/2)**2
    tmp2 = 2* atan2(sqrt(tmp1),sqrt(1-tmp1))
    
    return earth_r * tmp2
calc_geo_spark_func = udf(calc_geo_distance,DoubleType())

stations = stations_data.alias('station1').crossJoin(stations_data.alias('station2'))
stations_distance = stations.withColumn(
'geo_distance',
 calc_geo_spark_func(
 col('station1.lat'),
 col('station1.long'),
 col('station2.lat'),
 col('station2.long'))
)
max_distance = stations_distance.selectExpr("max(geo_distance) as max_distance").collect()[0]["max_distance"]
print(f"Максимальное геодезическое расстояние между станицями равно {max_distance} километров")

Максимальное геодезическое расстояние между станицями равно 69.92087595421542 километров


### Найти путь велосипеда с максимальным временем пробега через станции. 

In [33]:
# наиболее длительная поездка
trip_with_max_duration = trip_data.select("start_station_name", "end_station_name", "duration").orderBy(col("duration").desc()).first()

# начальный и конечный пункт, время поездки 
start_location = trip_with_max_duration["start_station_name"]
end_location = trip_with_max_duration["end_station_name"]
trip_time = trip_with_max_duration["duration"]

print(f"Максимальное время пробега ({trip_time} секунд) из \"{start_location}\" в \"{end_location}\"")

Максимальное время пробега (17270400 секунд) из "South Van Ness at Market" в "2nd at Folsom"


### Найти количество велосипедов в системе.

In [34]:
# количество уникальный id
unique_bikes_count = trip_data.agg(countDistinct("bike_id").alias("bike_count")).collect()[0]["bike_count"]

print(f"Общее количество велосипедов в системе {unique_bikes_count}")

Общее количество велосипедов в системе 700


### Найти пользователей потративших на поездки более 3 часов.
 

In [35]:
# подсчёт общего времени на велосипеде, с группировкой по id
users_with_total_trip_time = trip_data.groupBy("bike_id").sum("duration").withColumnRenamed("sum(duration)", "total_time")
users_with_total_trip_time.filter("total_time>10800").show()

+-------+----------+
|bike_id|total_time|
+-------+----------+
|    471|   1718831|
|    496|   1679568|
|    148|    332138|
|    463|   1722796|
|    540|   1752835|
|    392|   1789476|
|    623|   2037219|
|    243|    307458|
|    516|   1896751|
|     31|    407907|
|    580|   1034382|
|    137|   1529200|
|    251|   1282980|
|    451|   1695574|
|     85|   1214769|
|    458|   1647080|
|     65|    216922|
|    588|    266415|
|    255|    396395|
|     53|    226389|
+-------+----------+
only showing top 20 rows

