In [1]:
#установка PySpark
!pip install pyspark
from pyspark import SparkContext, SparkConf

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=3816a074cc2d41c8f4cf75fdc02d0342f6679b41209c58d99e35654be95ff89a
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
#SparkConf - для конфигурации приложения Spark.
#setAppName - установка имени приложения
#setMaster-установка главного URL-адреса для подключения
app_name = "LR1"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc

In [4]:
from google.colab import drive
#подключаем гугл диск
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
from typing import NamedTuple
from datetime import datetime
from functools import reduce

# Создание модели для Trip и Station

In [6]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

In [7]:
def GetDataFromTable(data): # метод, вовзрающий только данные, без названия столбцов
    columns = data.first()
    table = data.filter(lambda row: row != columns)\
                .map(lambda row: row.split(","))
    return columns, table

In [8]:
#считываем данные
#считывает текстовый файл из HDFS, и возвращает его как RDD строк
trip_data = sc.textFile("/content/drive/MyDrive/BigData_LR/LR1/trip.csv")
station_data = sc.textFile("/content/drive/MyDrive/BigData_LR/LR1/station.csv")

In [9]:
#первые 10 элементов в этом RDD
!head "/content/drive/MyDrive/BigData_LR/LR1/trip.csv"

id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code
4576,63,8/29/2013 14:13,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127
4607,70,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138
4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214
4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060
4299,83,8/29/2013 12:02,South Van Ness at Market,66,8/29/2013 12:04,Market at 10th,67,319,Subscriber,94103
4927,103,8/29/2013 18:54,Golden Gate at Polk,59,8/29/2013 18:56,Golden Gate at Polk,59,527,Subscriber,94109
4500,109,8/29/2013 13:25,Santa Clara at Almaden,4,8/29/2013 13:27,Adobe on Almaden,5,679,Subscriber,95112
4563,111,8/29/2013 14:02,San Salvador at 1st,8,8/29/2013 14:04,San Salvador at 1st,8,687,Subscri

In [10]:
#первый элемент в этом RDD
trip_data.first()

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [12]:
#первый элемент в этом RDD
trip_header=trip_data.first()
station_header=station_data.first()

In [13]:
#filter - возвращает RDD без первого элемента
#map - возвращает RDD разделённый запятыми
trips=trip_data.filter(lambda x: x != trip_header).map(lambda x: x.split(","))
stations=station_data.filter(lambda x: x != station_header).map(lambda x: x.split(","))

In [14]:
#считать первые 3 строки trip.csv
trips.take(3)

[['4576',
  '63',
  '8/29/2013 14:13',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127'],
 ['4607',
  '70',
  '8/29/2013 14:42',
  'San Jose City Hall',
  '10',
  '8/29/2013 14:43',
  'San Jose City Hall',
  '10',
  '661',
  'Subscriber',
  '95138'],
 ['4130',
  '71',
  '8/29/2013 10:16',
  'Mountain View City Hall',
  '27',
  '8/29/2013 10:17',
  'Mountain View City Hall',
  '27',
  '48',
  'Subscriber',
  '97214']]

In [17]:
#считать первые 3 строки station.csv
stations.take(3)

[['2',
  'San Jose Diridon Caltrain Station',
  '37.329732',
  '-121.90178200000001',
  '27',
  'San Jose',
  '8/6/2013'],
 ['3',
  'San Jose Civic Center',
  '37.330698',
  '-121.888979',
  '15',
  'San Jose',
  '8/5/2013'],
 ['4',
  'Santa Clara at Almaden',
  '37.333988',
  '-121.894902',
  '11',
  'San Jose',
  '8/6/2013']]

In [20]:
# Индексирование по нулевому элементу
station_idx = stations.keyBy(lambda x: int(x[0]))
#Создает кортежи элементов в этом RDD, применяя x.
station_idx.take(3)

[(2,
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013']),
 (3,
  ['3',
   'San Jose Civic Center',
   '37.330698',
   '-121.888979',
   '15',
   'San Jose',
   '8/5/2013']),
 (4,
  ['4',
   'Santa Clara at Almaden',
   '37.333988',
   '-121.894902',
   '11',
   'San Jose',
   '8/6/2013'])]

In [19]:
# Индексирование по начальной и конечной станции
trips_start = trips.keyBy(lambda x: x[3])
trips_end = trips.keyBy(lambda x: x[6])

trips_start.take(3), trips_end.take(3)

([('South Van Ness at Market',
   ['4576',
    '63',
    '8/29/2013 14:13',
    'South Van Ness at Market',
    '66',
    '8/29/2013 14:14',
    'South Van Ness at Market',
    '66',
    '520',
    'Subscriber',
    '94127']),
  ('San Jose City Hall',
   ['4607',
    '70',
    '8/29/2013 14:42',
    'San Jose City Hall',
    '10',
    '8/29/2013 14:43',
    'San Jose City Hall',
    '10',
    '661',
    'Subscriber',
    '95138']),
  ('Mountain View City Hall',
   ['4130',
    '71',
    '8/29/2013 10:16',
    'Mountain View City Hall',
    '27',
    '8/29/2013 10:17',
    'Mountain View City Hall',
    '27',
    '48',
    'Subscriber',
    '97214'])],
 [('South Van Ness at Market',
   ['4576',
    '63',
    '8/29/2013 14:13',
    'South Van Ness at Market',
    '66',
    '8/29/2013 14:14',
    'South Van Ness at Market',
    '66',
    '520',
    'Subscriber',
    '94127']),
  ('San Jose City Hall',
   ['4607',
    '70',
    '8/29/2013 14:42',
    'San Jose City Hall',
    '10',
    '8/

In [22]:
# Применяем модель данных для Stations
stations_ = stations.mapPartitions(initStation)

In [23]:
# Первый элемент
stations_.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [24]:
# Применяем модель данных для Trip
trips_= trips.mapPartitions(initTrip)

In [25]:
trips_.first()

Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

## 1. Найти велосипед с максимальным временем пробега.

In [32]:
bike_max_mileage = trips_.keyBy(lambda x: x.bike_id)
bike_duration = bike_max_mileage.mapValues(lambda x: x.duration).reduceByKey(lambda x1, x2: x1 + x2)
bike_duration_top = bike_duration.top(1, key=lambda x: x[1])[0][0]
bike_duration_top

535

## 2. Найти наибольшее геодезическое расстояние между станциями.

In [35]:
trips_stations = trips_.filter(
    lambda trip: trip.start_station_id != trip.end_station_id
).keyBy(
    lambda trip: (trip.start_station_id, trip.end_station_id)
).mapValues(lambda trip: trip.duration)

In [36]:
query = trips_stations.aggregateByKey(
    (0.0, 0.0),
    lambda acc, value: (acc[0] + value, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),
).mapValues(lambda values: values[0] / values[1])

In [37]:
query.map(lambda x: x[::-1]).top(5)

[(229914.0, (26, '16')),
 (179212.5, (32, '63')),
 (169308.0, (80, '36')),
 (156461.03603603604, (66, '62')),
 (101207.5, (28, '2'))]

## 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [38]:
bike_path = trips_.filter(lambda x: x.bike_id == bike_duration_top)\
.sortBy(lambda x: x.start_date).map(lambda x: (x.start_station_name, x.end_station_name))

bike_path.first()

('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)')

## 4. Найти количество велосипедов в системе.

In [39]:
count_bikes = trips_.map(lambda x: x.bike_id).distinct().count()
count_bikes

700

## 5. Найти пользователей потративших на поездки более 3 часов.

In [26]:
users = trips_.filter(lambda x: x.duration > (3 * 60 * 60))\
.map(lambda x: x.zip_code).filter(lambda x: x != "").distinct()

users.take(10)

['58553',
 '94301',
 '94039',
 '94133',
 '93726',
 '94123',
 '4517',
 '29200',
 '45322',
 '94080']