# Init spark/files/base classes

In [1]:
from pyspark import SparkContext, SparkConf
app_name = "Lab1-project"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc



In [5]:
!hadoop fs -put /mnt/data /data

In [6]:
!hadoop fs -ls /data

Found 11 items
-rwxr-xr-x   3 root root       2915 2022-09-18 11:04 /data/README.md
drwxr-xr-x   - root root         10 2022-09-18 11:08 /data/data
-rwxr-xr-x   3 root root        394 2022-09-18 11:04 /data/list_of_countries_sorted_gini.txt
-rwxr-xr-x   3 root root   19459967 2022-09-18 11:04 /data/nycTaxiFares.gz
-rwxr-xr-x   3 root root   84135506 2022-09-18 11:04 /data/nycTaxiRides.gz
-rwxr-xr-x   3 root root   79500408 2022-09-18 11:05 /data/nyctaxi.csv
-rwxr-xr-x   3 root root   74162295 2022-09-18 11:05 /data/posts_sample.xml
-rwxr-xr-x   3 root root      40269 2022-09-18 11:05 /data/programming-languages.csv
-rwxr-xr-x   3 root root       5647 2022-09-18 11:06 /data/stations.csv
-rwxr-xr-x   3 root root   80208831 2022-09-18 11:06 /data/trips.csv
-rwxr-xr-x   3 root root    5315699 2022-09-18 11:06 /data/warandsociety.txt


In [154]:
from typing import NamedTuple
from datetime import datetime

class Station(NamedTuple):
    station_id: int
    name: str
    lat: float
    long: float
    dockcount: int
    landmark: str
    installation: str

    @staticmethod
    def init_from_list(rows):
        for row in rows:
            yield Station(
                station_id = int(row[0]),
                name = row[1],
                lat = float(row[2]),
                long = float(row[3]),
                dockcount = int(row[4]),
                landmark = row[5],
                installation = datetime.strptime(row[6], '%m/%d/%Y') if row[6] != '' else None
            )

class Trip(NamedTuple):
    trip_id: int
    duration: int
    start_date: datetime
    start_station_name: str
    start_station_id: int
    end_date: datetime
    end_station_name: str
    end_station_id: int
    bike_id: int
    subscription_type: str
    zip_code: str

    @staticmethod
    def init_from_list(rows):
        for row in rows:
            yield Trip(                             
                 trip_id = int(row[0]),
                 duration = int(row[1]) if row[1] != '' else 0,
                 start_date = datetime.strptime(row[2], '%m/%d/%Y %H:%M')  if row[2] != '' else None,
                 start_station_name = row[3],
                 start_station_id = int(row[4]),
                 end_date = datetime.strptime(row[5], '%m/%d/%Y %H:%M')  if row[5] != '' else None,
                 end_station_name = row[6],
                 end_station_id = row[7],
                 bike_id = int(row[8]),
                 subscription_type = row[9],
                 zip_code = row[10]
            )

### Some test and init stuff
### Link to [tasks](#tasks)

In [155]:
test = sc.textFile("/mnt/data/warandsociety.txt")
test.count()

12851

In [156]:
trip_data = sc.textFile("/mnt/data/trips.csv")
station_data = sc.textFile("/mnt/data/stations.csv")

In [157]:
!head /mnt/data/trips.csv

id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code
4576,63,,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127
4607,,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138
4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214
4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060
4299,83,8/29/2013 12:02,South Van Ness at Market,66,8/29/2013 12:04,Market at 10th,67,319,Subscriber,94103
4927,103,8/29/2013 18:54,Golden Gate at Polk,59,8/29/2013 18:56,Golden Gate at Polk,59,527,Subscriber,94109
4500,109,8/29/2013 13:25,Santa Clara at Almaden,4,8/29/2013 13:27,Adobe on Almaden,5,679,Subscriber,95112
4563,111,8/29/2013 14:02,San Salvador at 1st,8,8/29/2013 14:04,San Salvador at 1st,8,687,Subscriber,95112
4760,1

In [158]:
trip_data.first()

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [159]:
def split_header_and_table(data):
    columns = data.first()
    table = data.filter(
        lambda row: row != columns
    ).map(
        lambda row: row.split(",")
    )
    return columns, table

In [160]:
trip_columns, trips = split_header_and_table(trip_data)
station_columns, stations = split_header_and_table(station_data)

In [161]:
trip_columns, trips.take(1)

('id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code',
 [['4576',
   '63',
   '',
   'South Van Ness at Market',
   '66',
   '8/29/2013 14:14',
   'South Van Ness at Market',
   '66',
   '520',
   'Subscriber',
   '94127']])

In [162]:
station_columns, stations.take(1)

('id,name,lat,long,dock_count,city,installation_date',
 [['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013']])

In [163]:
# key as id
station_by_id = stations.keyBy(lambda row: int(row[0]))
station_by_id.take(2)

[(2,
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013']),
 (3,
  ['3',
   'San Jose Civic Center',
   '37.330698',
   '-121.888979',
   '15',
   'San Jose',
   '8/5/2013'])]

# Tasks <a id='tasks'></a>

In [165]:
trips_objects = trips.mapPartitions(Trip.init_from_list)
trips_objects.first()

Trip(trip_id=4576, duration=63, start_date=None, start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

In [167]:
station_objects = stations.mapPartitions(Station.init_from_list)
station_objects.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

## 1 Найти велосипед с максимальным временем пробега.

In [170]:
trips_by_bike = trips_objects.keyBy(lambda trip: trip.bike_id)
trips_by_bike.first()

(520,
 Trip(trip_id=4576, duration=63, start_date=None, start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127'))

In [173]:
query = trips_by_bike.mapValues(
    lambda trip: trip.duration
).reduceByKey(
    lambda trip1, trip2: trip1 + trip2
)

In [174]:
query.top(5)

[(878, 1343120), (877, 1031136), (876, 3304), (740, 82046), (717, 78645)]

In [175]:
query.map(lambda x: x[::-1]).top(5)

[(18611693, 535),
 (3933272, 466),
 (2409014, 613),
 (2253019, 526),
 (2248886, 415)]

In [176]:
top_1_bike_id = query.map(lambda x: x[::-1]).top(1)[0][1]
top_1_bike_id

535

## 2 Найти наибольшее геодезическое расстояние между станциями.

In [180]:
trips_stations = trips_objects.filter(
    lambda trip: trip.start_station_id != trip.end_station_id
).keyBy(
    lambda trip: (trip.start_station_id, trip.end_station_id)
).mapValues(lambda trip: trip.duration)

In [181]:
query = trips_stations.aggregateByKey(
    (0.0, 0.0),
    lambda acc, value: (acc[0] + value, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),
).mapValues(lambda values: values[0] / values[1])

In [182]:
query.map(lambda x: x[::-1]).top(5)

[(229914.0, (26, '16')),
 (179212.5, (32, '63')),
 (169308.0, (80, '36')),
 (156461.03603603604, (66, '62')),
 (101207.5, (28, '2'))]

## 3 Найти путь велосипеда с максимальным временем пробега через станции.

In [185]:
query = trips_objects.filter(
    lambda trip: trip.bike_id == top_1_bike_id
).sortBy(
    lambda trip: trip.start_date
).map(lambda trip: (trip.start_station_name, trip.end_station_name)) 

In [186]:
query.collect()

[('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)'),
 ('San Francisco Caltrain (Townsend at 4th)',
  'San Francisco Caltrain 2 (330 Townsend)'),
 ('San Francisco Caltrain 2 (330 Townsend)', 'Market at Sansome'),
 ('Market at Sansome', '2nd at South Park'),
 ('2nd at Townsend', 'Davis at Jackson'),
 ('San Francisco City Hall', 'Civic Center BART (7th at Market)'),
 ('Civic Center BART (7th at Market)', 'Post at Kearney'),
 ('Post at Kearney', 'Embarcadero at Sansome'),
 ('Embarcadero at Sansome', 'Washington at Kearney'),
 ('Washington at Kearney', 'Market at Sansome'),
 ('Market at Sansome', 'Market at Sansome'),
 ('Market at Sansome', '2nd at Folsom'),
 ('2nd at Folsom', '2nd at Townsend'),
 ('Temporary Transbay Terminal (Howard at Beale)', '2nd at Townsend'),
 ('2nd at Townsend', 'Embarcadero at Sansome'),
 ('Embarcadero at Sansome', 'Clay at Battery'),
 ('Clay at Battery', 'Harry Bridges Plaza (Ferry Building)'),
 ('Harry Bridges Plaza (Ferry Building)', 'Clay at Batter

## 4 Найти количество велосипедов в системе.

In [187]:
query = trips_objects.map(lambda trip: trip.bike_id)

In [188]:
query.distinct().count()

700

## 5 Найти пользователей потративших на поездки более 3 часов.

In [189]:
max_time = 60 * 60 * 3

In [190]:
query = trips_objects.filter(
    lambda trip: trip.duration > max_time
).map(lambda trip: trip.trip_id)

In [191]:
query.take(5)

[4639, 4637, 4528, 4363, 4193]