In [62]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import col

In [2]:
conf = SparkConf().setAppName("Bikes_analysis").setMaster('yarn')

In [3]:
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
tripData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("trips.csv")

tripData

DataFrame[id: int, duration: int, start_date: timestamp, start_station_name: string, start_station_id: int, end_date: timestamp, end_station_name: string, end_station_id: int, bike_id: int, subscription_type: string, zip_code: string]

In [14]:
stationData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y')\
.csv("stations.csv")

In [5]:
tripData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [15]:
stationData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)



Задание 1. Найти велосипед с максимальным временем пробега.

In [7]:
tripData.createOrReplaceTempView("trips")

In [13]:
res = spark.sql("SELECT bike_id, sum(duration) as dur from trips group by bike_id order by dur DESC LIMIT 1")
res.show()

+-------+--------+
|bike_id|     dur|
+-------+--------+
|    535|36229902|
+-------+--------+



Задание 2. Найти наибольшее геодезическое расстояние между станциями.

In [17]:
joined_stations = stationData.crossJoin(stationData)
joined_stations.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)



In [27]:
from math import radians, cos, sin, asin, sqrt

def calculate_distance(lat1, lon1, lat2, lon2):
    # Преобразуем координаты в радианы
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # Применяем формулу Хаверсина
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    r = 6371 # Радиус Земли в километрах
    return c * r

In [52]:
res2 = joined_stations.rdd.map(lambda x: calculate_distance(x[2], x[3], x[9], x[10])).max()

In [53]:
print(res2)

69.92087595428183


Задание 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [86]:
temp = tripData.orderBy(col("duration").desc()).limit(1).collect()
res3 = joined_stations.rdd.filter(lambda l: (l[1] == temp[0].start_station_name) and (l[8] == temp[0].end_station_name))\
.map(lambda x: calculate_distance(x[2], x[3], x[9], x[10]))
print(res3.collect())

[2.312047985749405]


Задание 4. Найти количество велосипедов в системе.

In [88]:
res4 = spark.sql("select count(distinct bike_id) from trips")
res4.show()

+-----------------------+
|count(DISTINCT bike_id)|
+-----------------------+
|                    700|
+-----------------------+



Задание 5. Найти пользователей потративших на поездки более 3 часов.

In [101]:
temp5 = spark.sql("SELECT id, sum_duration FROM ( SELECT id, sum(duration) as sum_duration FROM trips GROUP BY id) WHERE sum_duration > 10800")
temp5.show()
temp5.count()

+------+------------+
|    id|sum_duration|
+------+------------+
|862602|       27414|
|843086|       30644|
|831518|       34234|
|797846|       11798|
|796682|       55696|
|781053|       15068|
|755329|       13080|
|744184|       13622|
|730089|       15182|
|721109|       11826|
|712173|       27468|
|702265|       18332|
|701901|       23816|
|697283|       43534|
|692091|       27238|
|681960|      156566|
|645660|       19188|
|645212|       19740|
|627911|       15738|
|597804|       33998|
+------+------------+
only showing top 20 rows



11398