In [44]:
!pip3 install pyspark==3.0.0



In [None]:
from pyspark import SparkContext, SparkConf
from typing import NamedTuple
from datetime import datetime
from functools import reduce
app_name = "Lab1"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc

In [61]:
def initStation(stations):
    ID = 0
    NAME = 1
    LAT = 2
    LONG = 3
    DOCK_COUNT = 4
    CITY = 5
    INSTALLATION_DATE = 6
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[ID]),
            name = station[NAME],
            lat = float(station[LAT]),
            long = float(station[LONG]),
            dockcount = int(station[DOCK_COUNT]),
            landmark = station[CITY],
            installation = datetime.strptime(station[INSTALLATION_DATE], '%m/%d/%Y')
        )


def initTrip(trips):
    ID = 0
    DURATION = 1
    START_DATE = 2
    START_STATION_NAME = 3
    START_STATION_ID = 4
    END_DATE = 5
    END_STATION_NAME = 6
    END_STATION_ID = 7
    BIKE_ID = 8
    SUBSCRIPTION_TYPE = 9
    ZIP_CODE = 10
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        yield Trip(
             trip_id = int(trip[ID]),
             duration = int(trip[DURATION]),
             start_date = datetime.strptime(trip[START_DATE], '%m/%d/%Y %H:%M'),
             start_station_name = trip[START_STATION_NAME],
             start_station_id = int(trip[START_STATION_ID]),
             end_date = datetime.strptime(trip[END_DATE], '%m/%d/%Y %H:%M'),
             end_station_name = trip[END_STATION_NAME],
             end_station_id = trip[END_STATION_ID],
             bike_id = int(trip[BIKE_ID]),
             subscription_type = trip[SUBSCRIPTION_TYPE],
             zip_code = trip[ZIP_CODE]
            )

In [62]:
trip_data = sc.textFile("trip.csv")
station_data = sc.textFile("station.csv")

In [63]:
def get_data_without_headers(data):
    head = data.first()
    table_data = data.filter(lambda row: row != head).map(lambda row: row.split(","))
    return table_data

trips = get_data_without_headers(trip_data)
stations = get_data_without_headers(station_data)

In [64]:
stations_mapped = stations.mapPartitions(initStation)

In [65]:
trips_mapped= trips.mapPartitions(initTrip)

# 1. Найти велосипед с максимальным временем пробега.

In [66]:
group_bike_id = trips_mapped.keyBy(lambda row: row.bike_id)
bike_duration = group_bike_id.mapValues(lambda row: row.duration).reduceByKey(lambda a, b: a + b)
bike_duration_max = bike_duration.top(1, key=lambda x: x[1])
bike_duration_max

[(535, 18611693)]

# 2. Найти наибольшее геодезическое расстояние между станциями.

In [67]:
trips_stations = trips_mapped.filter(lambda trip: str(trip.start_station_id) != str(trip.end_station_id))\
                             .keyBy(lambda trip: (str(trip.start_station_id), str(trip.end_station_id)))\
                             .mapValues(lambda trip: trip.duration)

query = trips_stations\
    .aggregateByKey(
        (0, 0),
        lambda a, b: (a[0] + b, a[1] + 1),
        lambda a, b: (a[0] + b[0], a[1] + b[1]),)\
    .mapValues(lambda values: values[0] / values[1])

query.map(lambda x: x[::-1]).top(1)[0][0]

229914.0

# 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [68]:
bike_path = trips_mapped.filter(lambda x: x.bike_id == bike_duration_max[0][0])\
                        .sortBy(lambda x: x.start_date)\
                        .map(lambda x: (x.start_station_name, x.end_station_name))

bike_path.first()

('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)')

# 4. Найти количество велосипедов в системе.

In [69]:
count = trips_mapped.map(lambda x: x.bike_id).distinct().count()
count

700

# 5. Найти пользователей потративших на поездки более 3 часов.

In [70]:
users = trips_mapped.filter(lambda x: x.duration > (10800))\
                    .map(lambda x: x.zip_code)\
                    .filter(lambda x: x != "")\
                    .distinct()
users

PythonRDD[153] at RDD at PythonRDD.scala:53