In [19]:
!pip3 install pyspark==3.0.0



In [20]:
from pyspark import SparkContext, SparkConf
from typing import NamedTuple
from datetime import datetime
from functools import reduce

In [None]:
conf = SparkConf().setAppName("lr1").setMaster('local[1]')
sc = SparkContext(conf=conf)

In [22]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )

def initStat(stat):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stat:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

In [23]:
def get_header(data):
    head = data.first()
    table_data = data.filter(lambda row: row != head).map(lambda row: row.split(","))
    return table_data

In [24]:
trips = get_header(sc.textFile("trip.csv"))
stations = get_header(sc.textFile("station.csv"))

In [25]:
stations_mapped = stations.mapPartitions(initStat)
trips_mapped= trips.mapPartitions(initTrip)

# 1. Найти велосипед с максимальным временем пробега.

In [26]:
bike_duration_max = trips_mapped.keyBy(lambda row: row.bike_id)\
                                .mapValues(lambda row: row.duration).reduceByKey(lambda a, b: a + b)\
                                .top(1, key=lambda x: x[1])
bike_duration_max

[(535, 18611693)]

# 2. Найти наибольшее геодезическое расстояние между станциями.

In [27]:
trips_stations = trips_mapped.filter(lambda trip: str(trip.start_station_id) != str(trip.end_station_id))\
                             .keyBy(lambda trip: (str(trip.start_station_id), str(trip.end_station_id)))\
                             .mapValues(lambda trip: trip.duration)

query = trips_stations.aggregateByKey((0, 0),
                                      lambda a, b: (a[0] + b, a[1] + 1),
                                      lambda a, b: (a[0] + b[0], a[1] + b[1]),)\
                      .mapValues(lambda values: values[0] / values[1])

query.map(lambda x: x[::-1]).top(1)[0][0]

229914.0

# 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [28]:
bike_path = trips_mapped.filter(lambda x: x.bike_id == bike_duration_max[0][0])\
                        .sortBy(lambda x: x.start_date)\
                        .map(lambda x: (x.start_station_name, x.end_station_name))

bike_path.first()

('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)')

# 4. Найти количество велосипедов в системе.

In [29]:
count = trips_mapped.map(lambda x: x.bike_id).distinct().count()
count

700

# 5. Найти пользователей потративших на поездки более 3 часов.

In [30]:
users = trips_mapped.filter(lambda x: x.duration > (10800))\
                    .map(lambda x: x.zip_code)\
                    .filter(lambda x: x != "")\
                    .distinct()
users

PythonRDD[72] at RDD at PythonRDD.scala:53