In [1]:
# Инициализация контекста

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("L1_Apache_Spark").setMaster("local[4]") \
    .set("spark.executor.memory", "2g") \
    .set("spark.driver.memory", "2g") \
	.set("spark.python.worker.timeout", "12000")

sc = SparkContext(conf=conf)


In [2]:
# Загружаем данные

tripData = sc.textFile("trips.csv")
# запомним заголовок, чтобы затем его исключить из данных
tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))

In [3]:
stationData = sc.textFile("stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))

1.	Найти велосипед с максимальным временем пробега.

In [4]:
from typing import NamedTuple
from datetime import datetime

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

tripsInternal = trips.mapPartitions(initTrip)
tripsByBike = tripsInternal.keyBy(lambda trip: trip.bike_id)

maxDurationBike = tripsByBike \
	.mapValues(lambda trip: trip.duration) \
  .aggregateByKey(
    zeroValue=0,
    seqFunc=lambda acc, duration : acc + duration,
    combFunc=lambda lhs, rhs: lhs + rhs) \
  .reduce(lambda lhs, rhs: lhs if lhs[1] > rhs[1] else rhs)

maxDurationBike

(535, 18611693)

2.	Найти наибольшее геодезическое расстояние между станциями.

In [5]:
from geopy.distance import geodesic
from typing import NamedTuple
from datetime import datetime

def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

def distance(t):
  p1, p2 = t
  return geodesic(p1, p2).km

stationsInternal = stations.mapPartitions(initStation)

# https://stackoverflow.com/questions/38828139/spark-cartesian-product?rq=3
stationsCoordsIndexed = stationsInternal \
  .map(lambda station: (station.lat, station.long)) \
  .zipWithIndex() \
  .map(lambda t: (t[1], t[0]))

idxs = range(stationsCoordsIndexed.count())
indices = sc.parallelize([(i,j) for i in idxs for j in idxs if i < j])

maxDistance = indices \
  .join(stationsCoordsIndexed) \
  .map(lambda t: (t[1][0], (t[0], t[1][1]))) \
  .join(stationsCoordsIndexed) \
  .map(lambda t: (t[1][0][1], t[1][1])) \
  .map(distance) \
  .max()

maxDistance

69.92096757764355

3.	Найти путь велосипеда с максимальным временем пробега через станции.

In [8]:
pathOfMaxDurationBike = tripsByBike \
  .mapValues(lambda trip: (trip.start_date, trip.start_station_name, trip.end_station_name)) \
  .sortBy(lambda t: t[1][0]) \
  .lookup(maxDurationBike[0])

lastStation = pathOfMaxDurationBike[-1][-1]
pathOfMaxDurationBike = [checkpoint[1] for checkpoint in pathOfMaxDurationBike]
pathOfMaxDurationBike.append(lastStation)

pathOfMaxDurationBike

['Post at Kearney',
 'San Francisco Caltrain (Townsend at 4th)',
 'San Francisco Caltrain 2 (330 Townsend)',
 'Market at Sansome',
 '2nd at Townsend',
 'San Francisco City Hall',
 'Civic Center BART (7th at Market)',
 'Post at Kearney',
 'Embarcadero at Sansome',
 'Washington at Kearney',
 'Market at Sansome',
 'Market at Sansome',
 '2nd at Folsom',
 'Temporary Transbay Terminal (Howard at Beale)',
 '2nd at Townsend',
 'Embarcadero at Sansome',
 'Clay at Battery',
 'Harry Bridges Plaza (Ferry Building)',
 'Clay at Battery',
 'San Francisco Caltrain (Townsend at 4th)',
 'Steuart at Market',
 '2nd at Townsend',
 'Harry Bridges Plaza (Ferry Building)',
 'Townsend at 7th',
 'San Francisco Caltrain 2 (330 Townsend)',
 'San Francisco Caltrain 2 (330 Townsend)',
 'Steuart at Market',
 'San Francisco Caltrain (Townsend at 4th)',
 '2nd at South Park',
 'Post at Kearney',
 '2nd at Folsom',
 'Mechanics Plaza (Market at Battery)',
 'Powell at Post (Union Square)',
 'Powell at Post (Union Square)',

4.	Найти количество велосипедов в системе.

In [10]:
bikesCount = tripsByBike.keys().distinct().count()

bikesCount

700

5.	Найти пользователей потративших на поездки более 3 часов.

In [12]:
tripsByUsers = tripsInternal.keyBy(lambda trip: trip.zip_code)

usersWith3HourLongTrips = tripsByUsers \
  .aggregateByKey(
      0,
      lambda acc, trip: acc + trip.duration,
      lambda lhs, rhs: rhs + lhs
  ) \
  .filter(lambda t: t[1] > 10800) \
  .keys() \
  .collect()

usersWith3HourLongTrips

['95060',
 '95112',
 '94041',
 '94117',
 '94402',
 '94102',
 '94612',
 '94609',
 '94158',
 '94133',
 '94597',
 '',
 '94121',
 '95118',
 '94610',
 '95136',
 '2142',
 '94703',
 '95070',
 '94404',
 '94518',
 '94549',
 '94556',
 '94805',
 '95014',
 '97330',
 '94005',
 '92178',
 '85008',
 '94606',
 '94941',
 '94901',
 '94577',
 '94523',
 '92111',
 '95618',
 '89052',
 '94014',
 '10025',
 '78230',
 '10022',
 '95111',
 '75201',
 '94141',
 '90046',
 '34110',
 '1945',
 '75225',
 '90032',
 '4517',
 '94080',
 '95148',
 '92808',
 '63130',
 '89448',
 '94539',
 '90024',
 '20008',
 '19803',
 '91605',
 '10036',
 '90049',
 '91214',
 '5024',
 '90291',
 '34990',
 '91801',
 '94928',
 '92037',
 '16801',
 '95003',
 '95472',
 '92109',
 '90025',
 '94952',
 '11530',
 '91748',
 '95351',
 '98122',
 '10044',
 '84604',
 '93041',
 '94568',
 '1742',
 '95121',
 '92805',
 '21202',
 '91206',
 '95120',
 '94304',
 '93109',
 '94130',
 '11570',
 '91343',
 '95814',
 '91711',
 '90278',
 '10065',
 '95128',
 '94042',
 '93405',
