In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
conf = SparkConf().setAppName("6131_Belov").setMaster('yarn')
context = SparkContext(conf=conf)

In [3]:
from pyspark.sql import SparkSession

session = SparkSession(context)

In [4]:
session

In [5]:
!hadoop fs -ls /

Found 6 items
drwxr-xr-x   - mapr mapr          4 2021-11-15 15:59 /apps
drwxr-xr-x   - mapr mapr          0 2021-11-15 16:02 /hbase
drwxr-xr-x   - mapr mapr          0 2021-11-15 15:59 /opt
drwxrwxrwx   - mapr mapr         14 2023-12-21 06:18 /tmp
drwxr-xr-x   - mapr mapr         72 2023-12-15 22:06 /user
drwxr-xr-x   - mapr mapr          1 2021-11-15 15:57 /var


In [6]:
trips = session.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("trip.csv")

In [7]:
trips.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [8]:
trips.head()

Row(id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id=66, bike_id=520, subscription_type='Subscriber', zip_code='94127')

In [10]:
# Task 1 Найти велосипед с максимальным временем пробега.

from pyspark.sql.functions import sum, desc

trips.groupBy('bike_id')\
.agg(sum('duration').alias('total_duration'))\
.sort(desc('total_duration'))\
.limit(1).show()

+-------+--------------+
|bike_id|total_duration|
+-------+--------------+
|    535|      36229902|
+-------+--------------+



In [12]:
station_data = session.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y')\
.csv("station.csv")

In [13]:
station_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)



In [15]:
# Task 2 Найти наибольшее геодезическое расстояние между станциями.
from math import sin, cos, sqrt, atan2, radians

def geo_distance(lat1, lon1, lat2, lon2):
    # Approximate radius of earth
    R = 6373
    
    lat1 = radians(lat1)
    lon1 = radians(lon1)
    lat2 = radians(lat2)
    lon2 = radians(lon2)
    
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    return R * c

station_data.crossJoin(station_data)\
.rdd.map(lambda entry: geo_distance(entry[2], entry[3], entry[9], entry[10])).max()

69.9428256877473

In [19]:
# Task 3 путь велосипеда с максимальным временем пробега через станции
# использован bike_id найденный в первом задании

route = trips.where('bike_id=535').select('start_station_id', 'end_station_id')
route.show()

+----------------+--------------+
|start_station_id|end_station_id|
+----------------+--------------+
|              70|            69|
|              47|            70|
|              77|            64|
|              69|            77|
|              61|            42|
|              58|            72|
|              72|            47|
|              47|            60|
|              60|            46|
|              46|            77|
|              77|            77|
|              77|            62|
|              62|            61|
|              55|            61|
|              61|            60|
|              60|            41|
|              41|            50|
|              50|            41|
|              41|            70|
|              70|            74|
+----------------+--------------+
only showing top 20 rows



In [21]:
station_data_2 = station_data.toDF("id2", "name2", "lat2", "long2", "dock_cnt2", "city2", "inst_date2")
station_data_2.show()

+---+--------------------+------------------+-------------------+---------+------------+-------------------+
|id2|               name2|              lat2|              long2|dock_cnt2|       city2|         inst_date2|
+---+--------------------+------------------+-------------------+---------+------------+-------------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|       27|    San Jose|2013-08-06 00:00:00|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|       15|    San Jose|2013-08-05 00:00:00|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|       11|    San Jose|2013-08-06 00:00:00|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|       19|    San Jose|2013-08-05 00:00:00|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|       15|    San Jose|2013-08-07 00:00:00|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|       15|    San Jose|2013-08-07 00:00:00|
|  8| San Salvador 

In [23]:
route_with_station_data = route.crossJoin(station_data).crossJoin(station_data_2).where('start_station_id=id and end_station_id=id2')
route_with_station_data.show()
route_with_station_data.rdd.map(lambda entry: geo_distance(entry[4], entry[5], entry[11], entry[12])).sum()

+----------------+--------------+---+--------------------+---------+-------------------+----------+-------------+-------------------+---+--------------------+---------+-------------------+---------+-------------+-------------------+
|start_station_id|end_station_id| id|                name|      lat|               long|dock_count|         city|  installation_date|id2|               name2|     lat2|              long2|dock_cnt2|        city2|         inst_date2|
+----------------+--------------+---+--------------------+---------+-------------------+----------+-------------+-------------------+---+--------------------+---------+-------------------+---------+-------------+-------------------+
|              70|            69| 70|San Francisco Cal...|37.776617|-122.39526000000001|        19|San Francisco|2013-08-23 00:00:00| 69|San Francisco Cal...|  37.7766|-122.39546999999999|       23|San Francisco|2013-08-23 00:00:00|
|              47|            70| 47|     Post at Kearney|37.788975|

2214.7809104001412

In [24]:
# task4 количество велосипедов

trips.select('bike_id').distinct().count()

700

In [18]:
# task5 Найти пользователей потративших на поездки более 3 часов.
# в качестве идентификатора пользователя использован атрибут zip_code, полученное значение очевидно некорректно, но какой-либо другой персональной информации в схеме нет

trips.groupBy('zip_code')\
.agg(sum('duration').alias('total_duration'))\
.where(f'user_run > {3*60*60}')\
.sort(desc('total_duration')).show()

+--------+--------+
|zip_code|user_run|
+--------+--------+
|   94107|75957358|
|     nil|70865483|
|   94105|35662786|
|   95531|34540800|
|   94133|31520319|
|    null|28944159|
|   94103|28175558|
|   94102|27269935|
|   94111|20320436|
|   95112|18257823|
|   94109|16330251|
|   94110|10902662|
|   94040|10328795|
|   94117| 9237419|
|   94301| 9156218|
|   94041| 8650800|
|   94306| 8579026|
|   94158| 8526623|
|   94010| 7667734|
|   94025| 7377243|
+--------+--------+
only showing top 20 rows



In [31]:
session.stop()