In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql

In [2]:
conf = SparkConf().setAppName("6132_Khoroshev_Lab1").setMaster('yarn')

In [3]:
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [6]:
tripData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("trips.csv")

In [10]:
tripData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [8]:
stationData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y')\
.csv("stations.csv")

In [9]:
stationData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)



### 1. Найти велосипед с максимальным временем пробега

In [78]:
from pyspark.sql.functions import col

tripData.orderBy(col("duration").desc()).limit(1).show()

+------+--------+-------------------+--------------------+----------------+-------------------+----------------+--------------+-------+-----------------+--------+
|    id|duration|         start_date|  start_station_name|start_station_id|           end_date|end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+------+--------+-------------------+--------------------+----------------+-------------------+----------------+--------------+-------+-----------------+--------+
|568474|17270400|2014-12-06 21:59:00|South Van Ness at...|              66|2015-06-24 20:18:00|   2nd at Folsom|            62|    535|         Customer|   95531|
+------+--------+-------------------+--------------------+----------------+-------------------+----------------+--------------+-------+-----------------+--------+



### 2. Найти наибольшее геодезическое расстояние между станциями

In [79]:
print("Before crossJoin: ", stationData.count())
joined_stations = stationData.crossJoin(stationData)
print("After crossJoin: ", joined_stations.count())

Before crossJoin:  70
After crossJoin:  4900


In [80]:
joined_stations.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)



Дальше вычисляем расстояние между всеми станциями и выбираем максимальное

In [103]:
from math import sin, cos, sqrt, atan2, radians

# радиус земли типо
R = 6373.0

def computeDistance(lat1, lon1, lat2, lon2):
    lat1 = radians(lat1)
    lon1 = radians(lon1)
    lat2 = radians(lat2)
    lon2 = radians(lon2)
    
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    return R * c

# x[2] - lat, x[3] - long
# x[9] - lat, x[10] - long
res = joined_stations.rdd.map(lambda x: computeDistance(x[2], x[3], x[9], x[10])).max()

# то же самое получается
# res = joined_stations.rdd.map(lambda x: computeDistance(x[2], x[3], x[9], x[10])).reduce(max)
print(f"Max distance between 2 stations: {res}")

Max distance between 2 stations: 69.9428256877473


### 3. Найти путь велосипеда с максимальным временем пробега через станции

In [99]:
second_stationData = stationData.toDF("id2", "name2", "lat2", "long2", "dock_count2", "city2", "installation_date2")
trips_join_stations = tripData.crossJoin(stationData.crossJoin(second_stationData)).where("start_station_name = name and end_station_name = name2")
trips_join_stations.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)
 |-- id2: integer (nullable = true)
 |-- name2: string (nullable = true)
 |-- lat2: double (nullable = true)
 |-- long2: double (nullable = true)
 |-- dock_count2: integer (nullable = true)
 |-- city2: string (nullable = true)
 |-- ins

In [104]:
res = trips_join_stations.rdd.map(lambda x: computeDistance(x[13], x[14], x[20], x[21])).max()
print(res) 

68.07378146810332


### 4. Найти количество велосипедов в системе

In [108]:
tripData.select("bike_id").distinct().count()

700

### 5. Найти пользователей потративших на поездки более 3 часов

In [123]:
# duration - в секундах, поэтому мы 3 часа (180 минут) умножаем на 60
res = tripData.groupBy("bike_id").agg({"duration": "sum"}).where("sum(duration) > 180 * 60")
res.show()
res.count()

+-------+-------------+
|bike_id|sum(duration)|
+-------+-------------+
|    471|      2504669|
|    496|      2492338|
|    148|       383620|
|    463|      2631994|
|    540|      2499857|
|    392|      2664633|
|    623|      2988761|
|    243|       412598|
|    516|      2760799|
|     31|       587904|
|    580|      1072279|
|    137|      2238350|
|    251|      1660026|
|    451|      2461768|
|     85|      2287621|
|    458|      2286523|
|     65|       327963|
|    588|       266415|
|    255|       477850|
|     53|       338860|
+-------+-------------+
only showing top 20 rows



699