# ЛР 1 Одобеску 6132

# 0. Подготовительные действия

In [1]:
import warnings
warnings.filterwarnings('ignore')

Задаём начальную конфигурацию

In [2]:
from pyspark import SparkContext, SparkConf
app_name = "Lab1-project"

In [3]:
conf = SparkConf().setAppName(app_name).setMaster('local[1]')

In [4]:
sc = SparkContext(conf=conf)



In [5]:
sc

Копируем файлы из локальной файловой системы в hdfs

In [6]:
!hadoop fs -put /mnt/data /data

put: `/data/data/list_of_countries_sorted_gini.txt': File exists
put: `/data/data/nycTaxiFares.gz': File exists
put: `/data/data/nycTaxiRides.gz': File exists
put: `/data/data/nyctaxi.txt': File exists
put: `/data/data/posts_sample.xml': File exists
put: `/data/data/programming-languages.csv': File exists
put: `/data/data/stations.csv': File exists
put: `/data/data/stations.txt': File exists
put: `/data/data/trips.csv': File exists
put: `/data/data/trips.txt': File exists
put: `/data/data/warandsociety.txt': File exists


In [7]:
!hadoop fs -ls /data

Found 10 items
drwxr-xr-x   - root root         11 2022-11-24 13:48 /data/data
-rwxr-xr-x   3 root root        394 2022-11-23 10:52 /data/list_of_countries_sorted_gini.txt
-rwxr-xr-x   3 root root   19459967 2022-11-23 10:52 /data/nycTaxiFares.gz
-rwxr-xr-x   3 root root   84135506 2022-11-23 10:52 /data/nycTaxiRides.gz
-rwxr-xr-x   3 root root   79500408 2022-11-23 10:52 /data/nyctaxi.txt
-rwxr-xr-x   3 root root   74162295 2022-11-23 10:52 /data/posts_sample.xml
-rwxr-xr-x   3 root root      40269 2022-11-23 10:52 /data/programming-languages.txt
-rwxr-xr-x   3 root root       5647 2022-11-23 10:52 /data/stations.txt
-rwxr-xr-x   3 root root   80208831 2022-11-23 10:52 /data/trips.txt
-rwxr-xr-x   3 root root    5315699 2022-11-23 10:52 /data/warandsociety.txt


Создаём класс-обертку для более легкой работы с данными

In [8]:
from typing import NamedTuple
from datetime import datetime

class Station(NamedTuple):
    station_id: int
    name: str
    lat: float
    long: float
    dockcount: int
    landmark: str
    installation: str

    @staticmethod
    def init_from_list(rows):
        for row in rows:
            yield Station(
                station_id = int(row[0]),
                name = row[1],
                lat = float(row[2]),
                long = float(row[3]),
                dockcount = int(row[4]),
                landmark = row[5],
                installation = datetime.strptime(row[6], '%m/%d/%Y') if row[6] != '' else None
            )

class Trip(NamedTuple):
    trip_id: int
    duration: int
    start_date: datetime
    start_station_name: str
    start_station_id: int
    end_date: datetime
    end_station_name: str
    end_station_id: int
    bike_id: int
    subscription_type: str
    zip_code: str

    @staticmethod
    def init_from_list(rows):
        for row in rows:
            yield Trip(                             
                 trip_id = int(row[0]),
                 duration = int(row[1]) if row[1] != '' else 0,
                 start_date = datetime.strptime(row[2], '%m/%d/%Y %H:%M')  if row[2] != '' else None,
                 start_station_name = row[3],
                 start_station_id = int(row[4]),
                 end_date = datetime.strptime(row[5], '%m/%d/%Y %H:%M')  if row[5] != '' else None,
                 end_station_name = row[6],
                 end_station_id = int(row[7]),
                 bike_id = int(row[8]),
                 subscription_type = row[9],
                 zip_code = row[10]
            )

Считываем данные

In [9]:
trip_data = sc.textFile("/mnt/data/trips.csv")
station_data = sc.textFile("/mnt/data/stations.csv")

Напишем функцию, которая будет отделять первую строчку, в которой находятся названия столбцов и разделять каждую строку по символу ','

In [10]:
def data_columns_split(table):
    columns = table.first()
    data = table.filter(lambda row: row != columns).map(lambda row: row.split(","))
    return columns, data

In [11]:
trip_columns, trips = data_columns_split(trip_data)
station_columns, stations = data_columns_split(station_data)

In [12]:
trip_columns

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [13]:
trips.take(1)

[['4576',
  '63',
  '',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127']]

In [14]:
station_columns

'id,name,lat,long,dock_count,city,installation_date'

In [15]:
stations.take(1)

[['2',
  'San Jose Diridon Caltrain Station',
  '37.329732',
  '-121.90178200000001',
  '27',
  'San Jose',
  '8/6/2013']]

Добавляем к данным ключ, который представляет из себя id станции

In [16]:
station_by_id = stations.keyBy(lambda row: int(row[0]))
station_by_id.take(1)

[(2,
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013'])]

Преобразуем каждую строчку из датасетов в объекты созданных выше классов для простоты работы

In [17]:
trips_objects = trips.mapPartitions(Trip.init_from_list)
trips_objects.first()

Trip(trip_id=4576, duration=63, start_date=None, start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id=66, bike_id=520, subscription_type='Subscriber', zip_code='94127')

In [18]:
station_objects = stations.mapPartitions(Station.init_from_list)
station_objects.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

# 1. Найти велосипед с максимальным временем пробега

Добавим bike_id в качестве ключа

In [19]:
trips_by_bike = trips_objects.keyBy(lambda trip: trip.bike_id)
trips_by_bike.first()

(520,
 Trip(trip_id=4576, duration=63, start_date=None, start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id=66, bike_id=520, subscription_type='Subscriber', zip_code='94127'))

Поставим каждому ключу в соответствие время пробега за этот заезд, затем с помощью reduceByKey просуммирем все значения с одинаковыми ключами

In [20]:
query = trips_by_bike.mapValues(lambda trip: trip.duration).reduceByKey(lambda trip1, trip2: trip1 + trip2)

Отсортируем по значению времени пробега

In [21]:
query.top(5, key=lambda x: x[1])

[(535, 18611693),
 (466, 3933272),
 (613, 2409014),
 (526, 2253019),
 (415, 2248886)]

Выведем bike_id с максимальным временем пробега

In [22]:
top_1_bike_id = query.top(1, key=lambda x: x[1])[0][0]

In [23]:
top_1_bike_id

535

# 2 Найти наибольшее геодезическое расстояние между станциями.

Геодезическое расстояние - расстояние по прямой. Вычислим его через координаты станций

In [24]:
import math

In [25]:
def degrees_to_radians(degrees):
    return degrees*math.pi/180

In [26]:
def distance_in_km_between_earth_coordinates(lat1, lon1, lat2, lon2):
    earth_radius_km = 6371

    dLat = degrees_to_radians(lat2-lat1)
    dLon = degrees_to_radians(lon2-lon1)

    lat1 = degrees_to_radians(lat1)
    lat2 = degrees_to_radians(lat2)

    a = math.sin(dLat/2) * math.sin(dLat/2) + math.sin(dLon/2) * math.sin(dLon/2) * math.cos(lat1) * math.cos(lat2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    return earth_radius_km * c

Создадим RDD, в которой ключ будет 1, а значения - id станции, широта и долгота. Это нужно чтобы через join создать все пары станций

In [27]:
all_id = station_objects.map(lambda row: (1, (row.station_id, row.lat, row.long)))

In [28]:
all_id.take(2)

[(1, (2, 37.329732, -121.90178200000001)), (1, (3, 37.330698, -121.888979))]

Сджоиним таблицу с собой, удалим ненужный ключ и лишние пары

In [29]:
pairs = all_id.join(all_id).map(lambda row: row[1]).filter(lambda row: row[0][0]<row[1][0])
pairs.take(2)

[((2, 37.329732, -121.90178200000001), (3, 37.330698, -121.888979)),
 ((2, 37.329732, -121.90178200000001), (4, 37.333988, -121.894902))]

Для каждой пары посчитаем расстояние между станциями и выведем пары с максимальным расстоянием

In [30]:
pair_distance = pairs.map(lambda row: ((row[0][0], row[1][0]), distance_in_km_between_earth_coordinates(row[0][1], row[0][2], row[1][1], row[1][2],)))

Максимальное геодезическое расстояние между станциями 16 и 60, оно равняется 69.9 км

# 3 Найти путь велосипеда с максимальным временем пробега через станции.

Найдём все пути, отсноящиеся к bike_id с максимальным временем пробега и отсортируем их по времени начала движения 

In [31]:
top_1_bike_trip_sorted = trips_objects.filter(lambda trip: trip.bike_id == top_1_bike_id).sortBy(lambda trip: trip.start_date)

In [32]:
top_1_bike_trip_sorted.take(2)

[Trip(trip_id=4966, duration=1245, start_date=datetime.datetime(2013, 8, 29, 19, 32), start_station_name='Post at Kearney', start_station_id=47, end_date=datetime.datetime(2013, 8, 29, 19, 53), end_station_name='San Francisco Caltrain (Townsend at 4th)', end_station_id=70, bike_id=535, subscription_type='Customer', zip_code='94123'),
 Trip(trip_id=5067, duration=423, start_date=datetime.datetime(2013, 8, 29, 21, 38), start_station_name='San Francisco Caltrain (Townsend at 4th)', start_station_id=70, end_date=datetime.datetime(2013, 8, 29, 21, 45), end_station_name='San Francisco Caltrain 2 (330 Townsend)', end_station_id=69, bike_id=535, subscription_type='Subscriber', zip_code='94133')]

Оставим в этих путях только start_station_name и end_station_name

In [33]:
longest_trip = top_1_bike_trip_sorted.map(lambda trip: (trip.start_station_name, trip.end_station_name)) 

In [34]:
longest_trip.collect()

[('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)'),
 ('San Francisco Caltrain (Townsend at 4th)',
  'San Francisco Caltrain 2 (330 Townsend)'),
 ('San Francisco Caltrain 2 (330 Townsend)', 'Market at Sansome'),
 ('Market at Sansome', '2nd at South Park'),
 ('2nd at Townsend', 'Davis at Jackson'),
 ('San Francisco City Hall', 'Civic Center BART (7th at Market)'),
 ('Civic Center BART (7th at Market)', 'Post at Kearney'),
 ('Post at Kearney', 'Embarcadero at Sansome'),
 ('Embarcadero at Sansome', 'Washington at Kearney'),
 ('Washington at Kearney', 'Market at Sansome'),
 ('Market at Sansome', 'Market at Sansome'),
 ('Market at Sansome', '2nd at Folsom'),
 ('2nd at Folsom', '2nd at Townsend'),
 ('Temporary Transbay Terminal (Howard at Beale)', '2nd at Townsend'),
 ('2nd at Townsend', 'Embarcadero at Sansome'),
 ('Embarcadero at Sansome', 'Clay at Battery'),
 ('Clay at Battery', 'Harry Bridges Plaza (Ferry Building)'),
 ('Harry Bridges Plaza (Ferry Building)', 'Clay at Batter

Количество станций в самом длинном пути

In [35]:
longest_trip.count()

1328

# 4 Найти количество велосипедов в системе.

Из trips оставим только bike_id

In [36]:
bike_ids = trips_objects.map(lambda trip: trip.bike_id)

Уберём дубликаты и посчитаем количество уникальных

In [37]:
bike_ids.distinct().count()

700

# 5 Найти пользователей потративших на поездки более 3 часов.

Вообще информации о пользователях в таблицах нет, поэтому выведем все поездки длительностью более 3 часов

In [38]:
trip_ids = trips_objects.map(lambda trip: trip.trip_id)
print(trip_ids.distinct().count())
print(trip_ids.count())

669959
669959


Все поездки уникальны по id, поэтому будем считать, что каждая поездка совершалась отдельным пользователем

Время в секундах, поэтому переведём 3 часа в секунды

Оставим только те поездки, у которых время больше 3 часов и возьмём только их id

In [39]:
id_3_hours_trip = trips_objects.filter(lambda trip: trip.duration > 60*60*3).map(lambda trip: trip.trip_id)

In [40]:
id_3_hours_trip.count()

8322

Выводим все поездки больше 3 часов

In [41]:
id_3_hours_trip.collect()

[4639,
 4637,
 4528,
 4363,
 4193,
 4190,
 4225,
 4663,
 4532,
 4521,
 5069,
 4505,
 5539,
 6032,
 6409,
 6408,
 5697,
 5683,
 5218,
 5716,
 5717,
 5241,
 5537,
 5253,
 5531,
 6131,
 6098,
 6091,
 5115,
 5114,
 5250,
 5249,
 5508,
 5506,
 5167,
 5166,
 5640,
 5638,
 6003,
 5380,
 5222,
 5120,
 5113,
 5182,
 6412,
 6178,
 6462,
 5493,
 6282,
 7069,
 6626,
 6963,
 7074,
 6962,
 7067,
 6792,
 7143,
 6613,
 7176,
 7081,
 7079,
 7021,
 7016,
 6877,
 6876,
 6828,
 7110,
 7106,
 6806,
 6821,
 7005,
 6756,
 7085,
 7084,
 6748,
 6654,
 6741,
 7015,
 6798,
 6796,
 6769,
 6771,
 6968,
 6966,
 6752,
 6745,
 6867,
 6864,
 7182,
 7179,
 6906,
 7178,
 6824,
 6679,
 6656,
 6822,
 7434,
 7437,
 7424,
 7180,
 6633,
 6631,
 7208,
 7044,
 7466,
 7495,
 7497,
 7499,
 7501,
 7503,
 7521,
 7529,
 7550,
 7551,
 7553,
 7557,
 7559,
 7569,
 7579,
 7624,
 7625,
 7630,
 7639,
 7645,
 7647,
 7655,
 7686,
 7692,
 7735,
 7767,
 7774,
 7799,
 7805,
 7832,
 7849,
 7851,
 7862,
 7929,
 7931,
 7948,
 7956,
 8085,
 8197,