Отключение предупреждений

In [1]:
import warnings
warnings.filterwarnings('ignore')
from pyspark import SparkContext, SparkConf
app_name = "Lab1-Penskiy"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc



Перемещение данных

In [2]:
!hadoop fs -put /mnt/data /data

put: `/data/data/.DS_Store': File exists
put: `/data/data/docker-compose.yml': File exists
put: `/data/data/list_of_countries_sorted_gini.txt': File exists
put: `/data/data/nycTaxiFares.gz': File exists
put: `/data/data/nycTaxiRides.gz': File exists
put: `/data/data/nyctaxi.csv': File exists
put: `/data/data/posts_sample.xml': File exists
put: `/data/data/programming-languages.csv': File exists
put: `/data/data/stations.csv': File exists
put: `/data/data/trips.csv': File exists
put: `/data/data/warandsociety.txt': File exists


Проверка данных

In [3]:
!hadoop fs -ls /data

Found 12 items
-rwxr-xr-x   3 root root       6148 2022-12-26 20:34 /data/.DS_Store
drwxr-xr-x   - root root         11 2022-12-26 20:49 /data/data
-rwxr-xr-x   3 root root       1304 2022-12-26 20:34 /data/docker-compose.yml
-rwxr-xr-x   3 root root        394 2022-12-26 20:34 /data/list_of_countries_sorted_gini.txt
-rwxr-xr-x   3 root root   19459967 2022-12-26 20:35 /data/nycTaxiFares.gz
-rwxr-xr-x   3 root root   84135506 2022-12-26 20:35 /data/nycTaxiRides.gz
-rwxr-xr-x   3 root root   79500408 2022-12-26 20:35 /data/nyctaxi.csv
-rwxr-xr-x   3 root root   74162295 2022-12-26 20:35 /data/posts_sample.xml
-rwxr-xr-x   3 root root      40269 2022-12-26 20:35 /data/programming-languages.csv
-rwxr-xr-x   3 root root       5647 2022-12-26 20:35 /data/stations.csv
-rwxr-xr-x   3 root root   80208831 2022-12-26 20:35 /data/trips.csv
-rwxr-xr-x   3 root root    5315699 2022-12-26 20:35 /data/warandsociety.txt


Создание класса с методами для работы с данными

In [4]:
from typing import NamedTuple
from datetime import datetime

class Station(NamedTuple):
    station_id: int
    name: str
    lat: float
    long: float
    dockcount: int
    landmark: str
    installation: str

    @staticmethod
    def init_from_list(rows):
        for row in rows:
            yield Station(
                station_id = int(row[0]),
                name = row[1],
                lat = float(row[2]),
                long = float(row[3]),
                dockcount = int(row[4]),
                landmark = row[5],
                installation = datetime.strptime(row[6], '%m/%d/%Y') if row[6] != '' else None
            )

class Trip(NamedTuple):
    trip_id: int
    duration: int
    start_date: datetime
    start_station_name: str
    start_station_id: int
    end_date: datetime
    end_station_name: str
    end_station_id: int
    bike_id: int
    subscription_type: str
    zip_code: str

    @staticmethod
    def init_from_list(rows):
        for row in rows:
            yield Trip(                             
                 trip_id = int(row[0]),
                 duration = int(row[1]) if row[1] != '' else 0,
                 start_date = datetime.strptime(row[2], '%m/%d/%Y %H:%M')  if row[2] != '' else None,
                 start_station_name = row[3],
                 start_station_id = int(row[4]),
                 end_date = datetime.strptime(row[5], '%m/%d/%Y %H:%M')  if row[5] != '' else None,
                 end_station_name = row[6],
                 end_station_id = int(row[7]),
                 bike_id = int(row[8]),
                 subscription_type = row[9],
                 zip_code = row[10]
            )

In [5]:
trip_data = sc.textFile("/mnt/data/trips.csv")
station_data = sc.textFile("/mnt/data/stations.csv")

Метод для вывода данных

In [6]:
def data_columns_split(table):
    columns = table.first()
    data = table.filter(lambda row: row != columns).map(lambda row: row.split(","))
    return columns, data

In [7]:
trip_columns, trips = data_columns_split(trip_data)
station_columns, stations = data_columns_split(station_data)

In [8]:
trip_columns

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [9]:
trips.take(1)

[['4576',
  '63',
  '',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127']]

In [10]:
station_columns

'id,name,lat,long,dock_count,city,installation_date'

In [11]:
stations.take(1)

[['2',
  'San Jose Diridon Caltrain Station',
  '37.329732',
  '-121.90178200000001',
  '27',
  'San Jose',
  '8/6/2013']]

In [12]:
station_by_id = stations.keyBy(lambda row: int(row[0]))
station_by_id.take(1)

[(2,
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013'])]

# 1. Найти велосипед с максимальным временем пробега

In [15]:
trips_by_bike = trips_objects.keyBy(lambda trip: trip.bike_id)
trips_by_bike.first()

(520,
 Trip(trip_id=4576, duration=63, start_date=None, start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id=66, bike_id=520, subscription_type='Subscriber', zip_code='94127'))

In [16]:
query = trips_by_bike.mapValues(lambda trip: trip.duration).reduceByKey(lambda trip1, trip2: trip1 + trip2)

Сортировка по уменьшению времени пробега

In [17]:
query.top(5, key=lambda x: x[1])

[(535, 18611693),
 (466, 3933272),
 (613, 2409014),
 (526, 2253019),
 (415, 2248886)]

In [18]:
top_1_bike_id = query.top(1, key=lambda x: x[1])[0][0]

Решение задачи

In [19]:
top_1_bike_id

535

# 2 Найти наибольшее геодезическое расстояние между станциями.

In [20]:
import math

In [21]:
def degrees_to_radians(degrees):
    return degrees*math.pi/180

In [22]:
def distance_in_km_between_earth_coordinates(lat1, lon1, lat2, lon2):
    earth_radius_km = 6371

    dLat = degrees_to_radians(lat2-lat1)
    dLon = degrees_to_radians(lon2-lon1)

    lat1 = degrees_to_radians(lat1)
    lat2 = degrees_to_radians(lat2)

    a = math.sin(dLat/2) * math.sin(dLat/2) + math.sin(dLon/2) * math.sin(dLon/2) * math.cos(lat1) * math.cos(lat2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    return earth_radius_km * c

In [23]:
all_id = station_objects.map(lambda x: (1, (x.station_id, x.lat, x.long)))

In [24]:
greatest_dist = all_id.join(all_id).map(lambda x: x[1]).filter(lambda x: x[0][0]<x[1][0])\
                .map(lambda x: ((x[0][0], x[1][0]),distance_in_km_between_earth_coordinates(x[0][1], x[0][2], x[1][1], x[1][2],)))

In [25]:
print("Наибольшее расстояние: %.2f км" %greatest_dist.top(1, lambda x: x[1])[0][1])

Наибольшее расстояние: 69.92 км


# 3 Найти путь велосипеда с максимальным временем пробега через станции.

In [26]:
top_1_bike_trip_sorted = trips_objects.filter(lambda trip: trip.bike_id == top_1_bike_id).sortBy(lambda trip: trip.start_date)

In [27]:
top_1_bike_trip_sorted.take(2)

[Trip(trip_id=4966, duration=1245, start_date=datetime.datetime(2013, 8, 29, 19, 32), start_station_name='Post at Kearney', start_station_id=47, end_date=datetime.datetime(2013, 8, 29, 19, 53), end_station_name='San Francisco Caltrain (Townsend at 4th)', end_station_id=70, bike_id=535, subscription_type='Customer', zip_code='94123'),
 Trip(trip_id=5067, duration=423, start_date=datetime.datetime(2013, 8, 29, 21, 38), start_station_name='San Francisco Caltrain (Townsend at 4th)', start_station_id=70, end_date=datetime.datetime(2013, 8, 29, 21, 45), end_station_name='San Francisco Caltrain 2 (330 Townsend)', end_station_id=69, bike_id=535, subscription_type='Subscriber', zip_code='94133')]

In [28]:
longest_trip = top_1_bike_trip_sorted.map(lambda trip: (trip.start_station_name, trip.end_station_name)) 

In [29]:
longest_trip.collect()

[('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)'),
 ('San Francisco Caltrain (Townsend at 4th)',
  'San Francisco Caltrain 2 (330 Townsend)'),
 ('San Francisco Caltrain 2 (330 Townsend)', 'Market at Sansome'),
 ('Market at Sansome', '2nd at South Park'),
 ('2nd at Townsend', 'Davis at Jackson'),
 ('San Francisco City Hall', 'Civic Center BART (7th at Market)'),
 ('Civic Center BART (7th at Market)', 'Post at Kearney'),
 ('Post at Kearney', 'Embarcadero at Sansome'),
 ('Embarcadero at Sansome', 'Washington at Kearney'),
 ('Washington at Kearney', 'Market at Sansome'),
 ('Market at Sansome', 'Market at Sansome'),
 ('Market at Sansome', '2nd at Folsom'),
 ('2nd at Folsom', '2nd at Townsend'),
 ('Temporary Transbay Terminal (Howard at Beale)', '2nd at Townsend'),
 ('2nd at Townsend', 'Embarcadero at Sansome'),
 ('Embarcadero at Sansome', 'Clay at Battery'),
 ('Clay at Battery', 'Harry Bridges Plaza (Ferry Building)'),
 ('Harry Bridges Plaza (Ferry Building)', 'Clay at Batter

In [30]:
longest_trip.count()

1328

# 4 Найти количество велосипедов в системе.

In [31]:
bike_ids = trips_objects.map(lambda trip: trip.bike_id)

In [32]:
bike_ids.distinct().count()

700

# 5 Найти пользователей потративших на поездки более 3 часов.

In [None]:
trip_ids = trips_objects.map(lambda trip: trip.trip_id)
print(trip_ids.distinct().count())
print(trip_ids.count())

669959


In [None]:
id_3_hours_trip = trips_objects.filter(lambda trip: trip.duration > 60*60*3).map(lambda trip: trip.trip_id)

In [None]:
id_3_hours_trip.count()

Поездки длившиеся более 3 часов

In [None]:
id_3_hours_trip.collect()