In [116]:
# устанавим pyspark и добавим необходимые библиотеки
!pip install pyspark



In [117]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark import SparkContext, SparkConf
from typing import NamedTuple
from datetime import datetime
from functools import reduce


In [136]:
# Добавим гугд диск
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [137]:
sc = SparkContext.getOrCreate();
sc

In [155]:
# Загрузим данные для дальнейшего анализа
All_data_trip = sc.textFile("/content/drive/MyDrive/BigData/My_work/LR1/Data/trip.csv")
All_data_station = sc.textFile("/content/drive/MyDrive/BigData/My_work/LR1/Data/station.csv")

In [180]:
# Исключим заголовки из данных
Headers_trips = All_data_trip.first()
trips = All_data_trip.filter(lambda row: row != Headers_trips).map(lambda row: row.split(",", -1))

Headers_stations = All_data_station.first()
stations = All_data_station.filter(lambda row: row != Headers_stations).map(lambda row: row.split(",", -1))

In [160]:
trip_name_pr = list(enumerate(Headers_trips.split(",")))
station_name_pr = list(enumerate(Headers_stations.split(",")))
print(trip_name_pr)
print(station_name_pr)

[(0, 'id'), (1, 'duration'), (2, 'start_date'), (3, 'start_station_name'), (4, 'start_station_id'), (5, 'end_date'), (6, 'end_station_name'), (7, 'end_station_id'), (8, 'bike_id'), (9, 'subscription_type'), (10, 'zip_code')]
[(0, 'id'), (1, 'name'), (2, 'lat'), (3, 'long'), (4, 'dock_count'), (5, 'city'), (6, 'installation_date')]


In [134]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pas

In [135]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

In [181]:
tripsInter = trips.mapPartitions(initTrip)
stationsInter = stations.mapPartitions(initStation)

In [183]:
tripsInter.first()

Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

In [185]:
stationsInter.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

Задание 1. Найти велосипед с максимальным временем пробега

In [190]:
run_bicycles = tripsInter.map(lambda trip: (trip.bike_id, trip.duration)).reduceByKey(lambda a, b: a + b)
top_bicycle = run_bicycles.top(1, key=lambda x: x[1])[0]
print(f'Номер велосипеда:\t{top_bicycle[0]}\nПробег:\t{top_bicycle[1]}')

Номер велосипеда:	535
Пробег:	18611693


Задание 2. Найти наибольшее геодезическое расстояние между станциями


In [191]:
def dist(x, y):
    return ((x.lat - y.lat)**2 + (x.long - y.long)**2)**0.5

In [196]:
all_distances = stationsInter.cartesian(stationsInter).map(lambda pair: (pair[0].station_id, pair[1].station_id, dist(pair[0], pair[1])))
max_dist = all_distances.top(1, key=lambda x: x[2])[0]
print(f'Максимальное расстояние: {max_dist[2]}\nМежду станциями: {max_dist[0]}-{max_dist[1]}')

Максимальное расстояние: 0.7058482821754397
Между станциями: 16-60


Задание 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [197]:
point_way = tripsInter.filter(lambda trip: trip.bike_id == top_bicycle[0]).sortBy(lambda trip: trip.start_date)

In [198]:
for trip in point_way.take(5):
    print(f'Dates:\t{trip.start_date} ~ {trip.end_date}\tWay:\t{trip.start_station_id}-{trip.end_station_id}')

Dates:	2013-08-29 19:32:00 ~ 2013-08-29 19:53:00	Way:	47-70
Dates:	2013-08-29 21:38:00 ~ 2013-08-29 21:45:00	Way:	70-69
Dates:	2013-08-30 08:40:00 ~ 2013-08-30 08:54:00	Way:	69-77
Dates:	2013-08-30 09:10:00 ~ 2013-08-30 09:19:00	Way:	77-64
Dates:	2013-09-01 12:58:00 ~ 2013-09-01 13:26:00	Way:	61-42


Задание 4. Найти количество велосипедов в системе

In [199]:
Bicy_count = tripsInter.map(lambda x: x.bike_id).distinct().count()
print(Bicy_count)

700


Задание 5. Найти пользователей потративших на поездки более 3 часов

In [207]:
users_three_h_more = tripsInter.filter(lambda x: x.duration > (3 * 60 * 60)).map(lambda x: x.zip_code).filter(lambda x: x != "").distinct()

users_three_h_more.take(5)

['58553', '94301', '94039', '94133', '93726']