In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from datetime import datetime
from typing import NamedTuple
import pyspark.sql.functions as f

In [2]:
conf = SparkConf().setAppName("Lab1").setMaster('yarn')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [8]:
trip_data = sc.textFile("trip.csv")
station_data = sc.textFile("station.csv")

In [9]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        try:
            yield Station(
                station_id = int(station[0]),
                name = station[1],
                lat = float(station[2]),
                long = float(station[3]),
                dockcount = int(station[4]),
                landmark = station[5],
                installation = datetime.strptime(station[6], '%m/%d/%Y')
            )
        except:
            pass

        
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
                 trip_id = int(trip[0]),
                 duration = int(trip[1]),
                 start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
                 start_station_name = trip[3],
                 start_station_id = int(trip[4]),
                 end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
                 end_station_name = trip[6],
                 end_station_id = trip[7],
                 bike_id = int(trip[8]),
                 subscription_type = trip[9],
                 zip_code = trip[10]
            )
        except:
            pass

In [10]:
trip_headers = trip_data.first()
station_headers = station_data.first()
trip = trip_data.filter(lambda row: row != trip_headers).map(lambda row: row.split(",", -1))
station = station_data.filter(lambda row: row != station_headers).map(lambda row: row.split(",", -1))
trip_internal = trip.mapPartitions(initTrip)
station_internal = station.mapPartitions(initStation)

In [59]:
# Найти велосипед с максимальным временем пробега.

bike_with_max_duration = trip_internal.map(lambda x: (x.bike_id, x.duration)) \
                    .reduceByKey(lambda a,b: a + b) \
                    .reduce(lambda a, b: a if a[1] > b[1] else b)
bike_with_max_duration[0]

593

In [58]:
# Найти наибольшее геодезическое расстояние между станциями

def dist(x,y):
    return ((x.lat-y.lat)**2+(x.long-y.long)**2)**0.5


max_dist = station_internal.cartesian(station_internal) \
                .filter(lambda x: x[0].station_id != x[1].station_id) \
                .map(lambda x: dist(x[0], x[1])) \
                .max()
max_dist

0.7058482821754397

In [52]:
# Найти путь велосипеда с максимальным временем пробега через станции.

trip_internal.filter(lambda x: x.bike_id == bike_with_max_duration[0]) \
         .sortBy(lambda x: x.start_date) \
         .map(lambda x: x.end_station_name) \
         .distinct() \
         .collect()

['2nd at South Park',
 'Powell Street BART',
 'Grant Avenue at Columbus Avenue',
 'Post at Kearney',
 'Market at 4th',
 'Townsend at 7th',
 'Yerba Buena Center of the Arts (3rd @ Howard)',
 'Steuart at Market',
 'San Francisco Caltrain 2 (330 Townsend)',
 'Embarcadero at Sansome',
 'Market at Sansome',
 'Harry Bridges Plaza (Ferry Building)',
 'Commercial at Montgomery',
 'Embarcadero at Vallejo',
 'Beale at Market',
 'Powell at Post (Union Square)',
 'Howard at 2nd',
 'Davis at Jackson',
 'Clay at Battery',
 'San Francisco City Hall',
 'Post at Kearny',
 'Broadway St at Battery St',
 'Civic Center BART (7th at Market)',
 'South Van Ness at Market',
 'San Francisco Caltrain (Townsend at 4th)',
 'Mechanics Plaza (Market at Battery)',
 'Market at 10th',
 'Spear at Folsom',
 '2nd at Folsom',
 'Embarcadero at Folsom',
 'Embarcadero at Bryant',
 '5th at Howard',
 '2nd at Townsend',
 'Golden Gate at Polk',
 'Temporary Transbay Terminal (Howard at Beale)',
 'Washington at Kearney',
 'Washingt

In [53]:
# Найти количество велосипедов в системе.

bikes_count = trip_internal.map(lambda x: x.bike_id) \
                           .distinct() \
                           .count()
bikes_count

698

In [60]:
# Найти пользователей потративших на поездки более 3 часов. 
    
users = trip_internal.map(lambda x: (x.zip_code, x.duration)) \
                     .reduceByKey(lambda a,b: a + b) \
                     .filter(lambda x: x[1] > 3600 * 3) \
                     .map(lambda x: x[0]) \
                     .collect()
users

['94609',
 '94002',
 '95131',
 '94556',
 '94111',
 '94117',
 '94597',
 '94010',
 '94043',
 '94403',
 '94404',
 '94133',
 '94952',
 '94558',
 '94590',
 '95125',
 '94114',
 '94608',
 '94102',
 '94610',
 '95130',
 '94040',
 '94588',
 '94612',
 '94122',
 '95127',
 '95120',
 '94086',
 '94080',
 '94124',
 '94920',
 '94044',
 '95112',
 '95113',
 '94306',
 '94085',
 '94121',
 '',
 '1778',
 '91024',
 '94703',
 '94120',
 '94945',
 '94108',
 '94041',
 '95124',
 '94502',
 '94618',
 '95136',
 '94401',
 '94587',
 '94104',
 '95138',
 '94544',
 '94704',
 '94536',
 '60614',
 '95020',
 '94303',
 '94706',
 '94070',
 '94960',
 '94534',
 '95035',
 '2139',
 '94510',
 '94565',
 '95118',
 '94708',
 '90291',
 '94305',
 '44122',
 '94547',
 '94957',
 '94530',
 '91205',
 '94503',
 '24060',
 '95050',
 '94930',
 '95060',
 '94939',
 '95032',
 '95129',
 '30900',
 '10009',
 '94598',
 '11714',
 '53819',
 '10018',
 '70119',
 '94619',
 '94546',
 '2116',
 '90039',
 '95472',
 '8540',
 '10021',
 '11217',
 '46168',
 '97816',