In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from datetime import datetime
from typing import NamedTuple
import pyspark.sql.functions as f

In [2]:
conf = SparkConf().setAppName("Lab1").setMaster('yarn')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [35]:
trip_data = sc.textFile("trip.csv")
station_data = sc.textFile("station.csv")
trip_data.first()

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [36]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        try:
            yield Station(
                station_id = int(station[0]),
                name = station[1],
                lat = float(station[2]),
                long = float(station[3]),
                dockcount = int(station[4]),
                landmark = station[5],
                installation = datetime.strptime(station[6], '%m/%d/%Y')
            )
        except:
            pass

In [37]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
                 trip_id = int(trip[0]),
                 duration = int(trip[1]),
                 start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
                 start_station_name = trip[3],
                 start_station_id = int(trip[4]),
                 end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
                 end_station_name = trip[6],
                 end_station_id = trip[7],
                 bike_id = int(trip[8]),
                 subscription_type = trip[9],
                 zip_code = trip[10]
            )
        except:
            pass

In [38]:
trip_headers = trip_data.first()
station_headers = station_data.first()

In [39]:
trip = trip_data.filter(lambda row: row != trip_headers).map(lambda row: row.split(",", -1))
station = station_data.filter(lambda row: row != station_headers).map(lambda row: row.split(",", -1))
trip.first()

['4576',
 '63',
 '8/29/2013 14:13',
 'South Van Ness at Market',
 '66',
 '8/29/2013 14:14',
 'South Van Ness at Market',
 '66',
 '520',
 'Subscriber',
 '94127']

In [40]:
trip_internal = trip.mapPartitions(initTrip)
station_internal = station.mapPartitions(initStation)
trip_internal.first()

Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

Найти велосипед с максимальным временем пробега.

In [60]:
# складываем все продолжительности одного bike_id и берем bike_id с максимальной суммой
bike_max_duration_id = trip_internal.map(lambda x: (x.bike_id, x.duration)) \
                                    .reduceByKey(lambda a,b:a+b).reduce(lambda a,b: a if a[1]>b[1] else b)[0]

In [61]:
bike_max_duration_id

535

Найти наибольшее геодезическое расстояние между станциями.

In [62]:
def distance(x,y):
    return ((x.lat-y.lat)**2+(x.long-y.long)**2)**0.5

max_distance = station_internal.cartesian(station_internal).map(lambda x:  distance(x[0], x[1])).max()
max_distance

0.7058482821754397

Найти путь велосипеда с максимальным временем пробега через станции.

In [63]:
# берем все поездки велосипеда с максимальным пробегом и выделяем имена стартовой и конечной станциями
path_max_duration = trip_internal.filter(lambda x:x.bike_id==bike_max_duration_id).sortBy(lambda x: x.start_date).map(lambda x: x.end_station_name).distinct().collect()
print(path_max_duration)

['San Francisco Caltrain 2 (330 Townsend)', 'Market at Sansome', '2nd at South Park', 'Davis at Jackson', 'Post at Kearney', 'Embarcadero at Sansome', 'Clay at Battery', 'Harry Bridges Plaza (Ferry Building)', 'Steuart at Market', 'Townsend at 7th', 'Powell at Post (Union Square)', 'Market at 4th', 'Beale at Market', 'Powell Street BART', 'San Francisco City Hall', 'Embarcadero at Vallejo', 'Yerba Buena Center of the Arts (3rd @ Howard)', 'Howard at 2nd', 'Commercial at Montgomery', 'Grant Avenue at Columbus Avenue', 'Broadway St at Battery St', 'Post at Kearny', 'San Francisco Caltrain (Townsend at 4th)', 'Spear at Folsom', 'Temporary Transbay Terminal (Howard at Beale)', '5th at Howard', 'Civic Center BART (7th at Market)', 'Market at 10th', '2nd at Folsom', 'South Van Ness at Market', 'Mechanics Plaza (Market at Battery)', 'Embarcadero at Folsom', '2nd at Townsend', 'Embarcadero at Bryant', 'Golden Gate at Polk', 'Washington at Kearny', 'Washington at Kearney']


Найти количество велосипедов в системе.

In [45]:
bike_count = trip_internal.map(lambda x: x.bike_id).distinct().count()
print(bike_count)

700


Найти пользователей потративших на поездки более 3 часов.

In [58]:
clients = trip_internal.map(lambda x: (x.zip_code, x.duration)).reduceByKey(lambda a,b: a+b).filter(lambda x: x[1]>3*60*60).map(lambda x:x[0])
clients.take(10)

['95138',
 '95060',
 '95112',
 '94041',
 '94122',
 '94117',
 '95819',
 '94114',
 '94102',
 '94612']

In [59]:
clients.count()

3661