In [1]:
!pip install pyspark



In [2]:
!pip install pandas



In [3]:
from pyspark import SparkContext, SparkConf
import pandas as pd
from typing import NamedTuple
from datetime import datetime

<ol>
    <li><b>SparkConf</b> - конфигурация для приложения Spark. Используется для установки различных значений в виде пар "ключ":"значение". Данный метод поддерживает цепочку методов, так в ячейке нижу присутствуют следующие методы:</li>
    <li><i>setAppName()</i> - данный метод позволяет установить имя приложения</li>
    <li><i>setMaster()</i> - данный метод позволяет установить главный URL-адрес для подключения.</li>
</ol>
<br>
<p>========================================================================================================================</p>
<ol>
    <li><b>SparkContext()</b> - основная точка входа функциональности Spark приложения. SparkContext представляет собой соединение с сервером. В качестве параметра передается переменная <i>conf</i>, которая содержит сведения об имени приложения и об URL-адресе</li>
</ol>

In [4]:
app_name = "Labs-1 - Big Data"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)

In [5]:
sc

In [6]:
!hadoop fs -ls /data

ls: `/data': No such file or directory


In [7]:
test = sc.textFile("/mnt/data/warandsociety.txt")
test.count()

12851

## Лабораторная работа №1

### Проведем начальный анализ данных
<p>Для этого создадим два DataFrame-а, с помощью библиотеки pandas. Далее с помощью команды <i>head()</i>.</p>
<p>Первый DataFrame (trip_dataFrame) содержит таблицу для списка поездок</p>
<p>Второй DataFrame (station_dataFrame) содержит таблицу для списка велостоянок для проката велосипедов</p>
<p>* - заголовки для датафреймов взяты по ссылке: <a href="https://www.kaggle.com/benhamner/sf-bay-area-bike-share?select=trip.csv">Sf Bay Area Bike Share</a></p>

In [8]:
trip_dataFrame = pd.read_csv("/mnt/data/trips.csv", names=["ID", "Duration", "Start Date", "Start Station", "Start Terminal", "End Date", "End Station", "End Termonal", "Bike Id", "Subscription Type", "Zip Code"])
trip_dataFrame.head()

Unnamed: 0,ID,Duration,Start Date,Start Station,Start Terminal,End Date,End Station,End Termonal,Bike Id,Subscription Type,Zip Code
0,4576,63,2013-08-29 14:13,South Van Ness at Market,66,2013-08-29 14:14,South Van Ness at Market,66,520,Subscriber,94127
1,4607,70,2013-08-29 14:42,San Jose City Hall,10,2013-08-29 14:43,San Jose City Hall,10,661,Subscriber,95138
2,4130,71,2013-08-29 10:16,Mountain View City Hall,27,2013-08-29 10:17,Mountain View City Hall,27,48,Subscriber,97214
3,4251,77,2013-08-29 11:29,San Jose City Hall,10,2013-08-29 11:30,San Jose City Hall,10,26,Subscriber,95060
4,4299,83,2013-08-29 12:02,South Van Ness at Market,66,2013-08-29 12:04,Market at 10th,67,319,Subscriber,94103


In [9]:
station_dataFrame = pd.read_csv("/mnt/data/stations.csv", names=["ID", "Station name", "Latiude", "Longitude", "Number of bikes the station can hold", "City", "Installation Date", "?"])
station_dataFrame.head()

Unnamed: 0,ID,Station name,Latiude,Longitude,Number of bikes the station can hold,City,Installation Date,?
0,2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,2013-08-06,
1,3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,2013-08-05,
2,4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,2013-08-06,
3,5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,2013-08-05,
4,6,San Pedro Square,37.336721,-121.894074,15,San Jose,2013-08-07,


### Загрузка данных
<ul>
    <li>trip_data - в данной переменной храниться список поездок на велосипедах</li> 
    <li>station_data - в данной переменной храниться список велостоянок для проката велосипедов</li> 
<ul>

In [10]:
trip_data = sc.textFile("/mnt/data/trips.csv")
station_data = sc.textFile("/mnt/data/stations.csv")

### Уберем header из каждого набора данных

In [11]:
tripsHeader = trip_data.first()
trips = trip_data.filter(lambda row: row != tripsHeader).map(lambda row: row.split(","))

stationHeader = station_data.first()
stations = station_data.filter(lambda row: row != stationHeader).map(lambda row: row.split(","))

### Проведем присваивание, что бы ID в наборе данных соответствовал порядковому номеру строки, под которым храниться строка

In [34]:
tripsIndexed = trips.keyBy(lambda row: int(row[0]))
stationIndexed = stations.keyBy(lambda row: int(row[0]))

In [35]:
print(f"Station Indexed: {stationIndexed.top(5)}")
print(f"Trips Indexed: {tripsIndexed.top(5)}")

Station Indexed: [(84, ['84', 'Ryland Park', '37.342725', '-121.895617', '15', 'Redwood City', '2014-04-09', '']), (83, ['83', 'Mezes Park', '37.491269', '-122.236234', '15', 'Redwood City', '2014-02-20', '']), (82, ['82', 'Broadway St at Battery St', '37.798541', '-122.400862', '15', 'San Francisco', '2014-01-22', '']), (80, ['80', 'San Jose Government Center', '37.352601', '-121.905733', '15', 'San Jose', '2013-12-31', '']), (77, ['77', 'Market at Sansome', '37.789625', '-122.400811', '27', 'San Francisco', '2013-08-25', ''])]
Trips Indexed: [(432946, ['432946', '406', '2014-08-31 22:31', 'Mountain View Caltrain Station', '28', '2014-08-31 22:38', 'Castro Street and El Camino Real', '32', '17', 'Subscriber', '94040']), (432945, ['432945', '468', '2014-08-31 22:07', 'Beale at Market', '56', '2014-08-31 22:15', 'Market at 4th', '76', '509', 'Customer', '11231']), (432944, ['432944', '534', '2014-08-31 22:06', 'Beale at Market', '56', '2014-08-31 22:15', 'Market at 4th', '76', '342', 'C

In [36]:
startTerminalTrips = trips.keyBy(lambda row: row[3])
endTerminalTrips = trips.keyBy(lambda row: row[6])

startTerminalTrips.take(1), endTerminalTrips.take(1)

([('San Jose City Hall',
   ['4607',
    '70',
    '2013-08-29 14:42',
    'San Jose City Hall',
    '10',
    '2013-08-29 14:43',
    'San Jose City Hall',
    '10',
    '661',
    'Subscriber',
    '95138'])],
 [('San Jose City Hall',
   ['4607',
    '70',
    '2013-08-29 14:42',
    'San Jose City Hall',
    '10',
    '2013-08-29 14:43',
    'San Jose City Hall',
    '10',
    '661',
    'Subscriber',
    '95138'])])

### Создадим сущности (классы), для нашей предметной области

In [37]:
class Trip(NamedTuple):
    trip_id: int
    duration: int
    start_date: datetime
    start_station: str
    start_terminal: int
    end_date: datetime
    end_station: str
    end_terminal: int
    bike_id: int
    subscription_type: str
    zip_code: str
        
    @staticmethod
    def launch_list(row):
        return Trip(
            trip_id = int(row[0]),
            duration = int(row[1]),
            start_date = datetime.strptime(row[2], '%Y-%m-%d %H:%M'),
            start_station = str(row[3]),
            start_terminal = int(row[4]),
            end_date = datetime.strptime(row[5], '%Y-%m-%d %H:%M'),
            end_station = str(row[6]),
            end_terminal = int(row[7]),
            bike_id = int(row[8]),
            subscription_type = str(row[9]),
            zip_code = str(row[10])
        )

In [38]:
class Station(NamedTuple):
    station_id: int
    station_name: str
    latitude: float
    longitude: float
    numOfBikesStat: int
    city: str
    instalation_date: str
    q_field: str
    
    @staticmethod
    def launch_list(row):
        return Station(
            station_id = int(row[0]),
            station_name = str(row[1]),
            latitude = float(row[2]),
            longitude = float(row[3]),
            numOfBikesStat = int(row[4]),
            city = str(row[5]),
            #instalation_date = datetime.strptime(row[7], '%Y-%m-%d %H:%M'),
            instalation_date = str(row[6]),
            q_field = str(row[7])
        )

In [39]:
objsFromTrips = trips.map(Trip.launch_list)
objsFromStations = stations.map(Station.launch_list)

In [40]:
objsFromTrips.first()

Trip(trip_id=4607, duration=70, start_date=datetime.datetime(2013, 8, 29, 14, 42), start_station='San Jose City Hall', start_terminal=10, end_date=datetime.datetime(2013, 8, 29, 14, 43), end_station='San Jose City Hall', end_terminal=10, bike_id=661, subscription_type='Subscriber', zip_code='95138')

In [41]:
objsFromStations.first()

Station(station_id=3, station_name='San Jose Civic Center', latitude=37.330698, longitude=-121.888979, numOfBikesStat=15, city='San Jose', instalation_date='2013-08-05', q_field='')

## Задание на лабораторную работу 1
<ol>
    <li>Найти велосипед с максимальным пробегом.</li>
    <li>Найти наибольшее расстояние между станциями.</li>
    <li>Найти путь велосипеда с максимальным пробегом через станции.</li>
    <li>Найти количество велосипедов в системе.</li>
    <li>Найти пользователей потративших на поездки более 3 часов.</li>
</ol>

### Найти велосипед с максимальным пробегом.

Для этого воспользуемся функциями mapValues и reduceByKey

<p>reduceByKey работает следующим образом</p>
<img src="https://images.backtobazics.com/spark/apache-spark-reducebykey-example.gif" width="750" align="center">

In [42]:
max_duration_dict = objsFromTrips.keyBy(lambda idBike: idBike.bike_id)
max_duration = max_duration_dict.mapValues(lambda trip: trip.duration).reduceByKey(lambda trip1, trip2: trip1+trip2)

In [43]:
max_duration_dict.take(5)

[(661,
  Trip(trip_id=4607, duration=70, start_date=datetime.datetime(2013, 8, 29, 14, 42), start_station='San Jose City Hall', start_terminal=10, end_date=datetime.datetime(2013, 8, 29, 14, 43), end_station='San Jose City Hall', end_terminal=10, bike_id=661, subscription_type='Subscriber', zip_code='95138')),
 (48,
  Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station='Mountain View City Hall', start_terminal=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station='Mountain View City Hall', end_terminal=27, bike_id=48, subscription_type='Subscriber', zip_code='97214')),
 (26,
  Trip(trip_id=4251, duration=77, start_date=datetime.datetime(2013, 8, 29, 11, 29), start_station='San Jose City Hall', start_terminal=10, end_date=datetime.datetime(2013, 8, 29, 11, 30), end_station='San Jose City Hall', end_terminal=10, bike_id=26, subscription_type='Subscriber', zip_code='95060')),
 (319,
  Trip(trip_id=4299, duration=83, start_date=dateti

In [44]:
# Для поиска максимального пути используем lambda выражение и функцию max
list_dur = max_duration.collect()
maxDurIdBike = max(list_dur,key=lambda item:item[1])[0]
print(f"ID велосипеда для максимального пробега: {maxDurIdBike}")

ID велосипеда для максимального пробега: 465


### Найти наибольшее расстояние между станциями.

1) Уберм круговые поездки (где место начала совподает с местом окончания) с помощью filter<br>
2) Сформируем tuple с начальной и конечной станцией из каждой строки<br>
3) По каждой строке представим результирующие данные в виде расстояния<br>
4) Оставляем только уникальные строки

In [45]:
trips_stations_info = objsFromTrips.filter(lambda t: t.start_station != t.end_station)\
.keyBy(lambda tupleTrip: (tupleTrip.start_station, tupleTrip.end_station))\
.mapValues(lambda tripDuration: tripDuration.duration)\
.distinct()

In [46]:
trips_stations_info.count()

192482

In [47]:
aggregateTrips = trips_stations_info.collect()

In [48]:
finalResultMax = max(aggregateTrips,key=lambda item:item[1])
stantion = finalResultMax[0]
duration = finalResultMax[1]                  
print(f"Максимальное расстояние между станциями: {stantion}, составило: {duration}")

Максимальное расстояние между станциями: ('Harry Bridges Plaza (Ferry Building)', 'Civic Center BART (7th at Market)'), составило: 716480


In [49]:
finalResultMin = min(aggregateTrips,key=lambda item:item[1])
stantion = finalResultMin[0]
duration = finalResultMin[1]                  
print(f"Минимальное расстояние между станциями: {stantion}, составило: {duration}")

Минимальное расстояние между станциями: ('San Francisco Caltrain (Townsend at 4th)', 'San Francisco Caltrain 2 (330 Townsend)'), составило: 63


#### Версия 2
Данный пукт лабораторной работы можно так же сделать через векторное произведение
<img src="https://ds04.infourok.ru/uploads/ex/1388/000d948f-138d696d/img13.jpg" style="width:650px; heigth: 700px">

1) Векторное произведение с самим собой<br>
2) Отфильтровать значения что бы идентификаторы конечной и начальной станций не совпадали<br>
3) mapValues - для получения декартова расстояния между подставим координаты в формулу выше<br>

In [50]:
# Найдем векторы
vec = objsFromStations.cartesian(objsFromStations)\
.filter(lambda vec: vec[0].station_id != vec[1].station_id)\
.keyBy(lambda vec: (vec[0].station_name, vec[1].station_name))\
.mapValues(lambda vec: ((vec[0].latitude - vec[1].latitude)**2 + (vec[0].longitude - vec[1].longitude)**2)**(1/2))

После этого найдем максимальное декартово расстояние, найдя максимальное значение в массиве

In [51]:
maxDistanceList = vec.collect()
maxDistance = max(maxDistanceList,key=lambda item:item[1])
print(f"Максимальная длинна вектора между станциями: {max(maxDistanceList,key=lambda item:item[1])[0]}, составила: {max(maxDistanceList,key=lambda item:item[1])[1]}")

Максимальная длинна вектора между станциями: ('SJSU - San Salvador at 9th', 'Embarcadero at Sansome'), составила: 0.7058482821754244


### Найти путь велосипеда с максимальным пробегом через станции.

1) В первом задании мы получили id велосипеда с максимальным пробегом. Используем его id в фильтрации<br>
2) Выполним сортировку по дате начала движения по данному маршруту<br>
3) Используем map для получения tuple <br>
4) Используем метод collect для преобразования в list

In [52]:
theBigestPath = objsFromTrips.filter(lambda IdBike: IdBike.bike_id == maxDurIdBike)\
.sortBy(lambda startTrip: startTrip.start_date)\
.map(lambda stations: (stations.start_station, stations.end_station))\
.collect()

In [53]:
theBigestPath

[('Market at 10th', 'Market at 4th'),
 ('Market at 4th', 'Harry Bridges Plaza (Ferry Building)'),
 ('Harry Bridges Plaza (Ferry Building)', 'Howard at 2nd'),
 ('Howard at 2nd', 'Townsend at 7th'),
 ('Townsend at 7th', 'Steuart at Market'),
 ('Steuart at Market', '2nd at Townsend'),
 ('2nd at Townsend', 'Clay at Battery'),
 ('Clay at Battery', '2nd at Townsend'),
 ('2nd at Townsend', 'South Van Ness at Market'),
 ('South Van Ness at Market', 'Powell Street BART'),
 ('Powell Street BART', 'South Van Ness at Market'),
 ('South Van Ness at Market', 'Powell at Post (Union Square)'),
 ('Powell at Post (Union Square)', 'San Francisco Caltrain (Townsend at 4th)'),
 ('San Francisco Caltrain (Townsend at 4th)', 'Post at Kearney'),
 ('Post at Kearney', 'Embarcadero at Sansome'),
 ('Embarcadero at Sansome', '2nd at Townsend'),
 ('2nd at Townsend', 'Market at Sansome'),
 ('Market at Sansome', 'South Van Ness at Market'),
 ('South Van Ness at Market', 'Civic Center BART (7th at Market)'),
 ('Civic C

### Найти количество велосипедов в системе

1) С помощью map достаем все id велосипедов<br>
2) С помощью distinct оставляем только уникальные велосипеды<br>
3) С помощью count() подсчитываем количество уникальных велосипедов

In [54]:
coutBike = objsFromTrips.map(lambda trip: trip.bike_id).distinct().count()
print(f"Количество велосипедов в системе составляет: {coutBike} шт.")

Количество велосипедов в системе составляет: 699 шт.


### Найти пользователей потративших на поездки более 3-х часов

<p>Заметим, что какая-либо информация о пользователях отсутствует. Поэтому стоит взять за основу следующее. Использовать каннотацию задания в виде "Найти bike_id которые ехали более 3-х часов", либо "Найти идентификатор поездки, которая проходила более 3-х часов". Первый вариант кажется не логичным, ибо если брать все поездки вместе, то практически все велосипеды будут иметь накат более 3-х часов. Так что остановимся на втором варианте.</p><br>

<p>3 часа = 3 * 60 (минут) * 60 (секунд). Duration в таблице находится в секундах</p>

In [55]:
ItTripMoreThreeHours = objsFromTrips.filter(lambda tripRow: tripRow.duration > 3 * 60 * 60)\
.map(lambda tripKey: tripKey.trip_id)

In [56]:
ItTripMoreThreeHours.count()

4770

In [57]:
ItTripMoreThreeHours.take(5)

[4639, 4637, 4528, 4363, 4193]