<a href="https://colab.research.google.com/github/Won20/Big-Data/blob/main/LR1_Dubman__Introduction%20to%20Apache%20Spark/LR1_Dubman_Introduction_to_Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1) Установка PySpark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=c18054741896062dcbd5311901665c15ea6a415483dbed5ce1eeb9bc73d50cce
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


# 2) Импорт необходимых библиотек и создание проекта

In [2]:
import pyspark
from pyspark import SparkContext, SparkConf

In [3]:
from typing import NamedTuple
from datetime import datetime
from functools import reduce

In [4]:
from math import *

In [5]:
#SparkContext-Основная точка входа для функциональности Spark. SparkContext представляет подключение к кластеру Spark
##и может использоваться для создания переменных RDD и широковещательной рассылки в этом кластере.
##когда вы создаете новый SparkContext, необходимо указать как минимум имя мастера и приложения либо через именованные параметры здесь, либо через conf.
#SparkConf - для конфигурации приложения Spark. Используется для установки различных параметров Spark в виде пар ключ-значение.
#setAppName - установка имени приложения
#setMaster-установка главного URL-адреса для подключения
sc = SparkContext(conf=SparkConf().setAppName("L1").setMaster("local[*]"))

In [6]:
sc

# 3) Считывание данных

In [7]:
data_trip = sc.textFile("/content/data/trips.csv") #считывает текстовый файл из HDFS, локальной файловой системы (доступной на всех узлах) или любого URI файловой системы,
# поддерживаемой Hadoop, и везвращает его как RDD строк. Текстовые файлы должны быть в кодировке UTF-8.
data_station = sc.textFile("/content/data/stations.csv") #то есть это RDD

# 4) Предобработка данных

In [8]:
trip_header=data_trip.first()#возвращает первый элемент в этом RDD.
station_header=data_station.first()

In [9]:
#filter- возвращает новый RDD, содержащий только элементы, удовлетворяющие предикату (не равен первому элементу).
#map- возвращает новый RDD, применив функцию к каждому элементу этого RDD.
trips=data_trip.filter(lambda x: x != trip_header).map(lambda x: x.split(","))
stations=data_station.filter(lambda x: x != station_header).map(lambda x: x.split(","))

In [10]:
trips.take(3)
#возвращает первые 3 элементa. Он работает путем первого сканирования одного раздела и использования результатов
#этого раздела для оценки количества дополнительных разделов, необходимых для удовлетворения ограничения. (list)

[['4576',
  '63',
  '',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127'],
 ['4607',
  '',
  '8/29/2013 14:42',
  'San Jose City Hall',
  '10',
  '8/29/2013 14:43',
  'San Jose City Hall',
  '10',
  '661',
  'Subscriber',
  '95138'],
 ['4130',
  '71',
  '8/29/2013 10:16',
  'Mountain View City Hall',
  '27',
  '8/29/2013 10:17',
  'Mountain View City Hall',
  '27',
  '48',
  'Subscriber',
  '97214']]

In [11]:
stations.take(3)

[['2',
  'San Jose Diridon Caltrain Station',
  '37.329732',
  '-121.90178200000001',
  '27',
  'San Jose',
  '8/6/2013'],
 ['3',
  'San Jose Civic Center',
  '37.330698',
  '-121.888979',
  '15',
  'San Jose',
  '8/5/2013'],
 ['4',
  'Santa Clara at Almaden',
  '37.333988',
  '-121.894902',
  '11',
  'San Jose',
  '8/6/2013']]

In [12]:
# Индексирование по нулевому элементу
station_idx = stations.keyBy(lambda x: int(x[0]))
#Создает кортежи элементов в этом RDD, применяя x.

In [13]:
station_idx.take(3)

[(2,
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013']),
 (3,
  ['3',
   'San Jose Civic Center',
   '37.330698',
   '-121.888979',
   '15',
   'San Jose',
   '8/5/2013']),
 (4,
  ['4',
   'Santa Clara at Almaden',
   '37.333988',
   '-121.894902',
   '11',
   'San Jose',
   '8/6/2013'])]

In [14]:
# Индексирование по начальной и конечной станции
trips_start = trips.keyBy(lambda x: x[3])
trips_end = trips.keyBy(lambda x: x[6])

In [15]:
trips_start.take(3), trips_end.take(3)

([('South Van Ness at Market',
   ['4576',
    '63',
    '',
    'South Van Ness at Market',
    '66',
    '8/29/2013 14:14',
    'South Van Ness at Market',
    '66',
    '520',
    'Subscriber',
    '94127']),
  ('San Jose City Hall',
   ['4607',
    '',
    '8/29/2013 14:42',
    'San Jose City Hall',
    '10',
    '8/29/2013 14:43',
    'San Jose City Hall',
    '10',
    '661',
    'Subscriber',
    '95138']),
  ('Mountain View City Hall',
   ['4130',
    '71',
    '8/29/2013 10:16',
    'Mountain View City Hall',
    '27',
    '8/29/2013 10:17',
    'Mountain View City Hall',
    '27',
    '48',
    'Subscriber',
    '97214'])],
 [('South Van Ness at Market',
   ['4576',
    '63',
    '',
    'South Van Ness at Market',
    '66',
    '8/29/2013 14:14',
    'South Van Ness at Market',
    '66',
    '520',
    'Subscriber',
    '94127']),
  ('San Jose City Hall',
   ['4607',
    '',
    '8/29/2013 14:42',
    'San Jose City Hall',
    '10',
    '8/29/2013 14:43',
    'San Jose City

In [16]:
# Объединение по ключу
start_trips = station_idx.join(trips_start)
end_trips = station_idx.join(trips_end)
#join - возвращает RDD, содержащий все пары элементов с совпадающими ключами в self и other.
#Каждая пара элементов будет возвращена как кортеж (k, (v1, v2)) где (k, v1) находится в себе, а (k, v2) — в другом.
#Выполняет хеш-соединение по всему кластеру.

# 5) Создание модели для Trip and Station

In [17]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int #идентификатор станции
        name: str # название станции
        lat: float #широта стадиона
        long: float # долгота стадиона
        dockcount: int #
        landmark: str # ориентир
        installation: str #дата

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y') # создает соответствующий объект даты/времени из строки, соответствующей формату format=%m/%d/%Y
        )

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int # идентификатор велосипеда
        duration: int # длительность заезда
        start_date: datetime # начальная дата
        start_station_name: str # имя станции, с которой начался заезд
        start_station_id: int # идентификатор станции, где стартовали
        end_date: datetime # конечная дата (дата финиша)
        end_station_name: str # имя станции, где финишировали
        end_station_id: int # идентификатор станции, где финишировали
        bike_id: int # идентификатор велосипеда
        subscription_type: str # (Подписчик)
        zip_code: str # идентификатор пользователя/участника

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

In [18]:
# Применяем модель данных для Stations
stations_ = stations.mapPartitions(initStation)
#возвращает новый RDD, применяя функцию к каждому разделу этого RDD.

In [19]:
stations_.first() # возвращает первый элемент в этом RDD.

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [20]:
# Применяем модель данных для Trip
trips_= trips.mapPartitions(initTrip)
#mapPartitions - возвращает новый RDD, применяя функцию к каждому разделу этого RDD.

In [21]:
trips_.first() # возвращает первый элемент в этом RDD.

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

In [23]:
trips_start = trips_.keyBy(lambda x: x.start_station_name) #Создает кортежи элементов в этом RDD, применяя x

In [24]:
# Расчет среднего времени для каждого парковочного места
avg = trips_start.mapValues(lambda x: x.duration).groupByKey()\
.mapValues(lambda y: reduce(lambda a, b: (a+b), y)/len(y))
#mapValues - передаёт каждое значение в паре ключ-значение RDD через map функцию, не меняя ключи; при этом также сохраняется исходное разделение RDD.
#groupByKey - группирует значения для каждого ключа в RDD в одну последовательность.
#reduce - кумулятивно применяет функцию к элементам итерируемой iterable последовательности, сводя её к единственному значению.

In [25]:
avg.top(10, lambda x: x[1]) # Получение 10 верхних элементов из RDD; возвращает list, отсортированный в порядке убывания.

[('University and Emerson', 7090.239417989418),
 ('California Ave Caltrain Station', 4628.005847953216),
 ('Redwood City Public Library', 4579.234741784037),
 ('Park at Olive', 4438.1613333333335),
 ('San Jose Civic Center', 4208.016938519448),
 ('Rengstorff Avenue / California Street', 4174.082373782108),
 ('Redwood City Medical Center', 3959.491961414791),
 ('Palo Alto Caltrain Station', 3210.6489815253435),
 ('San Mateo County Center', 2716.7700348432054),
 ('Broadway at Main', 2481.2537313432836)]

In [26]:
# Найдем первую поездку для каждой из стоянок
first_trip = trips_start.reduceByKey(lambda x, y: x if x.start_date < y.start_date else y)
#reduceByKey - объединяет значения для каждого ключа, используя ассоциативную и коммутативную функцию сокращения.
#также будет выполнять слияние локально на каждом преобразователе перед отправкой результатов в редуктор, аналогично «объединителю» в MapReduce.
#вывод будет разделен с помощью разделов numPartitions или уровня параллелизма по умолчанию. Разделитель по умолчанию — хеш-раздел.

In [27]:
first_trip.collect() # вернёт list, содержащий все элементы в этом RDD.

[('South Van Ness at Market',
  Trip(trip_id=4074, duration=1131, start_date=datetime.datetime(2013, 8, 29, 9, 24), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 9, 43), end_station_name='San Francisco Caltrain 2 (330 Townsend)', end_station_id='69', bike_id=317, subscription_type='Subscriber', zip_code='94115')),
 ('Santa Clara at Almaden',
  Trip(trip_id=4500, duration=109, start_date=datetime.datetime(2013, 8, 29, 13, 25), start_station_name='Santa Clara at Almaden', start_station_id=4, end_date=datetime.datetime(2013, 8, 29, 13, 27), end_station_name='Adobe on Almaden', end_station_id='5', bike_id=679, subscription_type='Subscriber', zip_code='95112')),
 ('Clay at Battery',
  Trip(trip_id=4283, duration=1712, start_date=datetime.datetime(2013, 8, 29, 11, 55), start_station_name='Clay at Battery', start_station_id=41, end_date=datetime.datetime(2013, 8, 29, 12, 23), end_station_name='Harry Bridges Plaza (Ferry Building)',

# 6) Задание на лабораторную работу

1) Найти велосипед с максимальным временем пробега.

2) Найти наибольшее геодезическое расстояние между станциями.

3) Найти путь велосипеда с максимальным временем пробега через станции.

4) Найти количество велосипедов в системе.

5) Найти пользователей потративших на поездки более 3 часов.

## 1) Найти велосипед с максимальным временем пробега

In [28]:
bike_max_mileage = trips_.keyBy(lambda x: x.bike_id)
#keyBy - создает кортежи элементов в этом RDD, применяя x

In [29]:
bike_duration = bike_max_mileage.mapValues(lambda x: x.duration).reduceByKey(lambda x1, x2: x1 + x2)
#mapValues - передаёт каждое значение в паре ключ-значение RDD через map функцию, не меняя ключи; при этом также сохраняется исходное разделение RDD.
#reduceByKey - объединяет значения для каждого ключа, используя ассоциативную и коммутативную функцию сокращения.
##также будет выполнять слияние локально на каждом преобразователе перед отправкой результатов в редуктор, аналогично «объединителю» в MapReduce.
##вывод будет разделен с помощью разделов numPartitions или уровня параллелизма по умолчанию. Разделитель по умолчанию — хеш-раздел.

In [30]:
bike_duration_top = bike_duration.top(1, key=lambda x: x[1])[0][0]
#Получение 1 верхнего элемента из RDD

In [31]:
print(f"Идентификатор велосипеда с максимальным временем пробега = {bike_duration_top}")

Идентификатор велосипеда с максимальным временем пробега = 535


## 2) Найти наибольшее геодезическое расстояние между станциями

In [32]:

def distance_between_stations(latitude1, longitude1, latitude2, longitude2):
  earth_radius = 6371.0088
  latitude1, longitude1 = radians(latitude1), radians(longitude1)
  latitude2, longitude2 = radians(latitude2), radians(longitude2)

  difference_latitude = latitude2 - latitude1
  difference_longitude = longitude2 - longitude1

  haversinus_latitude = (sin(difference_latitude / 2)) ** 2
  haversinus_longitude = (sin(difference_longitude / 2)) ** 2

  return 2 * earth_radius * sqrt(haversinus_latitude + cos(latitude1)\
                                 * cos(latitude2) * haversinus_longitude)

In [33]:
distance = stations_.cartesian(stations_)\
                         .filter(lambda x: x[0].station_id != x[1].station_id)\
                         .map(lambda x: [x[0], x[1], distance_between_stations(x[0].lat, x[0].long, x[1].lat, x[1].long)])\
                         .keyBy(lambda x: (x[0].name, x[1].name))\
                         .reduce(lambda x1, x2: x1 if x1[1] > x2[1] else x2)

#cartesian - возвращает декартово произведение этого RDD и еще одного, то есть RDD всех пар элементов (a, b), где a находится в self, а b находится в other.
#filter - возвращает новый RDD, содержащий только элементы, удовлетворяющие предикату.
#map - возвращает новый RDD, применив функцию к каждому элементу этого RDD.
#keyBy - создает кортежи элементов в этом RDD, применяя x
#reduce - кумулятивно применяет функцию к элементам итерируемой iterable последовательности, сводя её к единственному значению.

In [34]:
print(f"Максимальное расстояние между станциями {distance[0]} = {distance[1][2]}")

Максимальное расстояние между станциями ('Ryland Park', 'Mezes Park') = 34.317914350160784


## 3) Найти путь велосипеда с максимальным временем пробега через станции

In [35]:
bike_path = trips_.filter(lambda x: x.bike_id == bike_duration_top)\
.sortBy(lambda x: x.start_date).map(lambda x: (x.start_station_name, x.end_station_name))
#filter - возвращает новый RDD, содержащий только элементы, удовлетворяющие предикату. (идентификатор велосипеда = инденфикатору велосипеда с максимальным временем пробега)
#map - возвращает новый RDD, применив функцию к каждому элементу этого RDD.

In [36]:
print(f"Путь велосипеда с максимальным временем пробега через станции: {bike_path.first()}")

Путь велосипеда с максимальным временем пробега через станции: ('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)')


## 4) Найти количество велосипедов в системе

In [37]:
count_bikes = trips_.map(lambda x: x.bike_id).distinct().count()
#map - возвращает новый RDD, применив функцию к каждому элементу этого RDD.
#distinct - возвращает новый RDD, содержащий отдельные элементы в этом RDD.
#count - возвращает количество элементов в этом RDD.

In [38]:
print(f"Количество велосипедов в системе = {count_bikes}")

Количество велосипедов в системе = 700


## 5) Найти пользователей потративших на поездки более 3 часов.

In [39]:
users = trips_.filter(lambda x: x.duration > (3 * 60 * 60))\
.map(lambda x: x.zip_code).filter(lambda x: x != "").distinct()
#filter - возвращает новый RDD, содержащий только элементы, удовлетворяющие предикату. (длительность заезда > 3 часов)
#map - возвращает новый RDD, применив функцию к каждому элементу этого RDD.
#distinct - возвращает новый RDD, содержащий отдельные элементы в этом RDD.

In [40]:
print(f"Пользователи, потратившие на поездки более 3 часов: {users.take(10)}")
#take - возвращает первые 10 элементов

Пользователи, потратившие на поездки более 3 часов: ['58553', '94301', '94039', '94133', '93726', '94123', '4517', '29200', '45322', '94080']
