In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=62373ac5ef4bb2d4d3d064c07ffea4bf94ca98abbd71370f26e837e507ef3f45
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark import SparkContext, SparkConf

import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct
from typing import NamedTuple
from datetime import datetime
from functools import reduce

In [4]:
app_name = "LR1_Apache_Spark"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc

In [5]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

In [8]:
trip_data = sc.textFile("trips.csv")
tripsHeader = trip_data.first()
trips = trip_data.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))
stationData = sc.textFile("stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))

In [9]:
stationsIndexed = stations.keyBy(lambda station: station[0])

In [10]:
stationsIndexed.take(2)

[('2',
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013']),
 ('3',
  ['3',
   'San Jose Civic Center',
   '37.330698',
   '-121.888979',
   '15',
   'San Jose',
   '8/5/2013'])]

In [11]:
tripsByStartTerminals = trips.keyBy(lambda trip: trip[4])
tripsByEndTerminals = trips.keyBy(lambda trip: trip[7])

In [12]:
tripsByStartTerminals.take(2)

[('66',
  ['4576',
   '63',
   '',
   'South Van Ness at Market',
   '66',
   '8/29/2013 14:14',
   'South Van Ness at Market',
   '66',
   '520',
   'Subscriber',
   '94127']),
 ('10',
  ['4607',
   '',
   '8/29/2013 14:42',
   'San Jose City Hall',
   '10',
   '8/29/2013 14:43',
   'San Jose City Hall',
   '10',
   '661',
   'Subscriber',
   '95138'])]

In [13]:
tripsByEndTerminals.take(2)

[('66',
  ['4576',
   '63',
   '',
   'South Van Ness at Market',
   '66',
   '8/29/2013 14:14',
   'South Van Ness at Market',
   '66',
   '520',
   'Subscriber',
   '94127']),
 ('10',
  ['4607',
   '',
   '8/29/2013 14:42',
   'San Jose City Hall',
   '10',
   '8/29/2013 14:43',
   'San Jose City Hall',
   '10',
   '661',
   'Subscriber',
   '95138'])]

In [14]:
stations_mapped = stations.mapPartitions(initStation)

In [15]:
trips_mapped= trips.mapPartitions(initTrip)

##Задание 1. Найти велосипед с максимальным временем пробега

In [19]:
# Отображаем каждый велосипед по его идентификатору и продолжительности поездки
bikes_with_mileage = trips_mapped.keyBy(lambda x: x.bike_id)

# Суммируем продолжительность поездок для каждого велосипеда
bike_durations = bikes_with_mileage.mapValues(lambda x: x.duration).reduceByKey(lambda x1, x2: x1 + x2)

# Находим велосипед с максимальной продолжительностью поездок
bike_with_max_mileage = bike_durations.top(1, key=lambda x: x[1])[0][0]

# Print the result
print(f"Велосипед #{bike_with_max_mileage} имеет максимальную продолжительность поездок")


Велосипед #501 имеет максимальную продолжительность поездок


## Задание 2. Найти наибольшее геодезическое расстояние между станциями


In [20]:
# Отфильтруем поездки, где станции начала и конца отличаются, затем сопоставим каждую поездку с идентификаторами станций начала и конца
trips_between_stations = trips_mapped.filter(lambda trip: str(trip.start_station_id) != str(trip.end_station_id))\
                                     .keyBy(lambda trip: (trip.start_station_id, trip.end_station_id))\
                                     .mapValues(lambda trip: trip.duration)

# Сагрегируем продолжительности поездок между каждой парой станций
station_distances = trips_between_stations\
    .aggregateByKey(
        (0.0, 0.0),
        lambda acc, value: (acc[0] + value, acc[1] + 1),
        lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),)\
    .mapValues(lambda values: values[0] / values[1])

# Найдем пять самых длинных расстояний между станциями
station_distances.map(lambda x: x[::-1]).top(5)


[(83116.66666666667, (60, '65')),
 (52698.0, (27, '38')),
 (50771.5, (68, '72')),
 (50626.0, (72, '59')),
 (38225.0, (54, '67'))]

##Задание 3. Найти путь велосипеда с максимальным временем пробега через станции

In [22]:
# Фильтруем поездки по идентификатору велосипеда с максимальной продолжительностью поездок, затем сортируем их по дате начала и отображаем каждую поездку в имена станций начала и конца
bike_path = trips_mapped.filter(lambda x: x.bike_id == bike_duration_top)\
                        .sortBy(lambda x: x.start_date)\
                        .map(lambda x: (x.start_station_name, x.end_station_name))

# Получаем первый элемент в RDD
bike_path.first()


('Embarcadero at Vallejo', 'Market at Sansome')

## Задание 4. Найти количество велосипедов в системе.

In [23]:
count_bikes = trips_mapped.map(lambda x: x.bike_id).distinct().count()
count_bikes

561

## Задание 5. Найти пользователей потративших на поездки более 3 часов.

In [24]:
# Фильтруем поездки, продолжительность которых превышает 3 часа (в секундах),
# затем извлекаем почтовые индексы пользователей, исключаем пустые значения
# и находим уникальных пользователей
users = trips_mapped.filter(lambda x: x.duration > (3 * 60 * 60))\
                           .map(lambda x: x.zip_code)\
                           .filter(lambda x: x != "")\
                           .distinct()

# Получаем первые 10 пользователей
users.take(10)


['94536',
 '72150',
 '58553',
 '94301',
 '94118',
 '94111',
 '94039',
 '94133',
 '94538',
 '95112']

In [25]:
sc.stop()