In [1]:
!python3 -m venv venv
!source venv/bin/activate 
!python3 -m pip install numpy

Defaulting to user installation because normal site-packages is not writeable


In [2]:
!python --version

Python 3.9.5


In [3]:
#!hadoop fs -put ~/ /user

In [4]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from typing import NamedTuple
from datetime import datetime


conf = SparkConf().setAppName("lab 1").setMaster('yarn')

In [5]:
sc = SparkContext.getOrCreate(conf = conf)

In [23]:
#os.path.join(path, 'trips (1).csv')

#tripData = sc.textFile(os.path.join(path, 'trips.csv'))
tripData = sc.textFile("trips.csv")

tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))

1. Найти велосипед с максимальным временем пробега.

In [7]:
trips.count()

669959

In [8]:
trips.take(2)

[['4576',
  '63',
  '',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127'],
 ['4607',
  '',
  '8/29/2013 14:42',
  'San Jose City Hall',
  '10',
  '8/29/2013 14:43',
  'San Jose City Hall',
  '10',
  '661',
  'Subscriber',
  '95138']]

In [9]:
from typing import NamedTuple
from datetime import datetime

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            ) 
        except:
            pass

In [10]:
tripsInternal = trips.mapPartitions(initTrip)

In [11]:
trips_by_bike_id = tripsInternal.keyBy(lambda trip: trip.bike_id)
trips_by_bike_id.take(1)

[(48,
  Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214'))]

In [12]:
trips_by_duration = trips_by_bike_id.mapValues(lambda trip: trip.duration)

In [13]:
trips_by_duration.groupByKey().mapValues(sum).top(1,  key=lambda x: x[1])

[(535, 18611693)]

In [14]:
t = trips_by_duration.groupByKey().mapValues(sum)

In [15]:
%%time
m = t.map(lambda x: x).top(1, key=lambda x: x[1])

CPU times: user 15.3 ms, sys: 3.44 ms, total: 18.7 ms
Wall time: 18 s


In [16]:
print(f"Id: {m[0][0]}, max of duration: {int(m[0][1])/60} min")

Id: 535, max of duration: 310194.88333333336 min


2. Найти наибольшее геодезическое расстояние между станциями.

In [17]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

In [32]:
station_data = sc.textFile("stations.csv")

stationsHeader = station_data.first()
stations = station_data.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))
stationsInternal = stations.mapPartitions(initStation)

In [33]:
stationsInternal.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [34]:
def dist(x, y):
    return ((x.lat - y.lat)**2 + (x.long - y.long)**2)**0.5

In [35]:
all_distances = stationsInternal.cartesian(stationsInternal).map(lambda pair: (pair[0].station_id, pair[1].station_id, dist(pair[0], pair[1])))

In [36]:
max_dist = all_distances.top(1, key=lambda x: x[2])[0]

In [37]:
print(f'Max distance:\t{max_dist[2]}\nStations:\t{max_dist[0]}-{max_dist[1]}')

Max distance:	0.7058482821754397
Stations:	16-60


3. Найти путь велосипеда с максимальным временем пробега через станции

In [38]:
id = m[0][0]
print("Велосипед с максимальным временем пробега: ", id)

Велосипед с максимальным временем пробега:  535


In [39]:
tripData.take(5)

['id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code',
 '4576,63,,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127',
 '4607,,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138',
 '4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214',
 '4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060']

In [40]:
# filtering for map
trip_header=tripData.first()

trip=tripData.filter(lambda x: x != trip_header).map(lambda x: x.split(","))

In [46]:
trip_mp = trip.mapPartitions(initTrip)
trip_mp.first()

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

In [47]:
bike_max_time = trip_mp \
.map(lambda trip: (trip.bike_id, trip.duration)) \
.reduceByKey(lambda a, b: a + b) \
.top(1, key = lambda x: x[1])[0][0]

bike_max_time

535

In [48]:
#trip_header=tripData.first()
#trip=tripData.filter(lambda x: x != trip_header).map(lambda x: x.split(","))


result = trip_mp \
    .filter(lambda x: x.bike_id == bike_max_time) \
    .sortBy(lambda x: x.start_date) \
    .map(lambda x: (x.start_station_name, x.end_station_name)) \
    .first() 

result

('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)')

4. Найти количество велосипедов в системе

In [49]:
amount = trip_mp \
.map(lambda x: x.bike_id) \
.distinct() \
.count()

amount

700

5. Найти пользователей потративших на поездки более 3 часов

In [50]:
users3h = trip_mp \
.filter(lambda x: x.duration > (3 * 60 * 60)) \
.map(lambda x: x.zip_code) \
.filter(lambda x: x != "") \
.distinct() 

users3h.take(3)

['94306', '49423', '49721']