In [79]:
!pip install pyspark
from pyspark import SparkContext, SparkConf
app_name = "Lab"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)



In [82]:
from typing import NamedTuple
from datetime import datetime
from functools import reduce

**Решите следующие задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):**


In [83]:
tripData = sc.textFile("trips.csv")
tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))

stationData = sc.textFile("stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))

In [84]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

In [88]:
stationsInternal = stations.mapPartitions(initStation)

In [89]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

In [90]:
tripsInternal = trips.mapPartitions(initTrip)

### Найти велосипед с максимальным временем пробега.

In [91]:
bike_max_way = tripsInternal.keyBy(lambda x: x.bike_id)
bike_duration = bike_max_way.mapValues(lambda x: x.duration).reduceByKey(lambda x1, x2: x1 + x2)
bike_max_duration = bike_duration.top(1, key=lambda x: x[1])[0][0]
bike_max_duration

535

### Найти наибольшее геодезическое расстояние между станциями.

In [92]:
trips_stations = tripsInternal.filter(lambda trip: trip.start_station_id != trip.end_station_id).keyBy(lambda trip: (trip.start_station_id, trip.end_station_id)).mapValues(lambda trip: trip.duration)
query = trips_stations.aggregateByKey(
    (0.0, 0.0), lambda acc, value: (acc[0] + value, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])).mapValues(lambda values: values[0] / values[1])

In [93]:
query.map(lambda x: x[::-1]).top(1)

[(229914.0, (26, '16'))]

### Найти путь велосипеда с максимальным временем пробега через станции.

In [94]:
bike_way = tripsInternal.filter(lambda x: x.bike_id == bike_max_duration).sortBy(lambda x: x.start_date).map(lambda x: (x.start_station_name, x.end_station_name))
bike_way.first()

('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)')

### Найти количество велосипедов в системе.

In [95]:
tripsInternal.map(lambda x: x.bike_id).distinct().count()

700

### Найти пользователей потративших на поездки более 3 часов.

In [96]:
users = tripsInternal.filter(lambda x: x.duration > 10800).map(lambda x: x.zip_code).filter(lambda x: x != "").distinct()
users.count()

2100

In [97]:
users.take(15)

['58553',
 '94301',
 '94039',
 '94133',
 '93726',
 '94123',
 '4517',
 '29200',
 '45322',
 '94080',
 '92808',
 '5024',
 '89138',
 '11515',
 '28277']