In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from typing import NamedTuple
from datetime import datetime

In [3]:
conf = SparkConf().setAppName("lr1").setMaster('yarn')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [None]:
!hadoop fs -put ~/ /user

In [6]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            ) 
        except:
            pass

In [19]:
trip = sc.textFile("lr1/trip.csv")
station = sc.textFile("lr1/station.csv")

In [20]:
# kinda ls
trip.take(5)

['id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code',
 '4576,63,8/29/2013 14:13,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127',
 '4607,70,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138',
 '4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214',
 '4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060']

In [23]:
# filtering for map
trip_header=trip.first()
station_header=station.first()
trip=trip.filter(lambda x: x != trip_header).map(lambda x: x.split(","))
station=station.filter(lambda x: x != station_header).map(lambda x: x.split(","))

In [24]:
trip_mp = trip.mapPartitions(initTrip)
station_mp = station.mapPartitions(initStation)

In [25]:
trip_mp.first()

Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

In [None]:
"""
Найти велосипед с максимальным временем пробега.
"""

In [30]:
bike_max_time = trip_mp \
.map(lambda trip: (trip.bike_id, trip.duration)) \
.reduceByKey(lambda a, b: a + b) \
.top(1, key = lambda x: x[1])[0][0]

bike_max_time

535

In [None]:
"""
Найти наибольшее геодезическое расстояние между станциями.
"""

In [29]:
trips2stations = trip_mp \
.filter(lambda trip: trip.start_station_id != trip.end_station_id) \
.keyBy(lambda trip: (trip.start_station_id, trip.end_station_id)) \
.mapValues(lambda trip: trip.duration)

query = trips2stations \
.aggregateByKey((0.0, 0.0), \
                lambda acc, value: (acc[0] + value, acc[1] + 1), \
                lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),) \
.mapValues(lambda values: values[0] / values[1])

query.map(lambda x: x[::-1]).top(1)

[(229914.0, (26, '16'))]

In [None]:
"""
Найти путь велосипеда с максимальным временем пробега через станции.
"""

In [32]:
result = trip_mp \
.filter(lambda x: x.bike_id == bike_max_time) \
.sortBy(lambda x: x.start_date) \
.map(lambda x: (x.start_station_name, x.end_station_name)) \
.first() 

result

('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)')

In [None]:
"""
Найти количество велосипедов в системе.
"""

In [34]:
amount = trip_mp \
.map(lambda x: x.bike_id) \
.distinct() \
.count()

amount

700

In [None]:
"""
Найти пользователей потративших на поездки более 3 часов.
"""

In [38]:
users3h = trip_mp \
.filter(lambda x: x.duration > (3 * 60 * 60)) \
.map(lambda x: x.zip_code) \
.filter(lambda x: x != "") \
.distinct() 

users3h.take(3)

['94306', '49423', '49721']