In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from typing import NamedTuple
from datetime import datetime

In [2]:
conf = SparkConf().setAppName("Lab1").setMaster('yarn')

In [3]:
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
tripData = sc.textFile("trips.csv")
# запомним заголовок, чтобы затем его исключить из данных
tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))

stationData = sc.textFile("stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))

In [5]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

In [6]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            ) 
        except:
            pass

In [7]:
stationsInternal = stations.mapPartitions(initStation)
tripsInternal = trips.mapPartitions(initTrip)

In [8]:
stationsInternal.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [9]:
tripsInternal.first()

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

1. Найти велосипед с максимальным временем пробега

In [10]:
run_bicycles = tripsInternal.map(lambda trip: (trip.bike_id, trip.duration)).reduceByKey(lambda a, b: a + b)

In [11]:
top_bicycle = run_bicycles.top(1, key=lambda x: x[1])[0]

In [12]:
print(f'bike_id:\t{top_bicycle[0]}\nduration:\t{top_bicycle[1]}')

bike_id:	535
duration:	18611693


2. Найти наибольшее геодезическое расстояние между станциями

In [13]:
def dist(x, y):
    return ((x.lat - y.lat)**2 + (x.long - y.long)**2)**0.5

In [14]:
all_distances = stationsInternal.cartesian(stationsInternal).map(lambda pair: (pair[0].station_id, pair[1].station_id, dist(pair[0], pair[1])))

In [15]:
max_dist = all_distances.top(1, key=lambda x: x[2])[0]

In [16]:
print(f'Max distance:\t{max_dist[2]}\nStations:\t{max_dist[0]}-{max_dist[1]}')

Max distance:	0.7058482821754397
Stations:	16-60


3. Найти путь велосипеда с максимальным временем пробега через станции

In [17]:
points_of_way = tripsInternal.filter(lambda trip: trip.bike_id == top_bicycle[0]).sortBy(lambda trip: trip.start_date)

In [18]:
for trip in points_of_way.take(5):
    print(f'Dates:\t{trip.start_date} ~ {trip.end_date}\tWay:\t{trip.start_station_id}-{trip.end_station_id}')

Dates:	2013-08-29 19:32:00 ~ 2013-08-29 19:53:00	Way:	47-70
Dates:	2013-08-29 21:38:00 ~ 2013-08-29 21:45:00	Way:	70-69
Dates:	2013-08-30 08:40:00 ~ 2013-08-30 08:54:00	Way:	69-77
Dates:	2013-08-30 09:10:00 ~ 2013-08-30 09:19:00	Way:	77-64
Dates:	2013-09-01 12:58:00 ~ 2013-09-01 13:26:00	Way:	61-42


4. Найти количество велосипедов в системе

In [19]:
bikes_count = tripsInternal.map(lambda trip: trip.bike_id).distinct().count()

In [20]:
print(f'Bikes count:\t {bikes_count}')

Bikes count:	 700


5. Найти пользователей потративших на поездки более 3 часов

In [21]:
all_users = tripsInternal.map(lambda trip: (trip.zip_code, trip.duration)).reduceByKey(lambda a, b: a + b).filter(lambda trip: trip[1] > 10800)

In [22]:
for trip in all_users.take(10):
    print(f'User:\t{trip[0]}\tTime:\t{round(trip[1]/3600, 1)}h')

User:	94102	Time:	5313.3h
User:	95113	Time:	828.2h
User:	94124	Time:	500.1h
User:	94111	Time:	3956.9h
User:	94114	Time:	1190.5h
User:	94403	Time:	1301.8h
User:	30324	Time:	18.5h
User:	94133	Time:	6010.5h
User:	94108	Time:	1424.3h
User:	94306	Time:	1541.8h
