# Initializing and getting session

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, DateType

In [4]:
from pyspark.sql import SparkSession
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'

spark_session = SparkSession\
    .builder\
    .getOrCreate()

In [5]:
spark_session

In [6]:
sc = spark_session._sc

In [7]:
!hadoop fs -ls /

Found 8 items
drwxr-xr-x   - root root         30 2021-12-17 12:55 /Leon_Result
-rwxr-xr-x   3 root root       3841 2021-12-14 09:31 /Untitled.ipynb
drwxr-xr-x   - mapr mapr          5 2021-12-14 08:03 /apps
drwxr-xr-x   - mapr mapr          0 2021-12-14 08:06 /opt
-rwxr-xr-x   3 root root         24 2021-12-14 10:51 /test.txt
drwxrwxrwx   - mapr mapr          0 2021-12-14 07:54 /tmp
drwxr-xr-x   - mapr mapr          2 2021-12-14 18:01 /user
drwxr-xr-x   - mapr mapr          1 2021-12-14 07:58 /var


In [8]:
from datetime import datetime
from glob import glob
from itertools import islice

import time
import timeit
import os

# _War and Peace_

In [9]:
warandpeace = sc.textFile("file:///home/workspace/data/warandsociety.txt")

In [10]:
warandpeace.count()

12851

In [11]:
try:
    nilFile = sc.textFile("file:///home/workspace/data/nil")
    nilFile.count()
except Exception as e:
    print(e)

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/workspace/data/nil
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:289)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:317)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:273)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:273)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.api.python.PythonRDD.getPartitions(Pyth

In [12]:
warandpeace.take(10)

['Лев Николаевич Толстой',
 'Война и мир. Книга 1',
 '',
 'Война и мир – 1',
 '',
 ' ',
 ' http://www.lib.ru',
 '',
 'Аннотация ',
 '']

In [13]:
warandpeace.getNumPartitions()

2

In [14]:
lines_with_war = warandpeace.filter(lambda x: "война" in x)

In [15]:
lines_with_war.first()

"– Еh bien, mon prince. Genes et Lucques ne sont plus que des apanages, des поместья, de la famille Buonaparte. Non, je vous previens, que si vous ne me dites pas, que nous avons la guerre, si vous vous permettez encore de pallier toutes les infamies, toutes les atrocites de cet Antichrist (ma parole, j'y crois) – je ne vous connais plus, vous n'etes plus mon ami, vous n'etes plus мой верный раб, comme vous dites. [Ну, что, князь, Генуа и Лукка стали не больше, как поместьями фамилии Бонапарте. Нет, я вас предупреждаю, если вы мне не скажете, что у нас война, если вы еще позволите себе защищать все гадости, все ужасы этого Антихриста (право, я верю, что он Антихрист) – я вас больше не знаю, вы уж не друг мой, вы уж не мой верный раб, как вы говорите.] Ну, здравствуйте, здравствуйте. Je vois que je vous fais peur, [Я вижу, что я вас пугаю,] садитесь и рассказывайте."

In [16]:
lines_with_war.cache()

PythonRDD[8] at RDD at PythonRDD.scala:53

In [17]:
%%time
lines_with_war.count()

CPU times: user 14.7 ms, sys: 575 µs, total: 15.3 ms
Wall time: 1.63 s


54

In [18]:
%%time
lines_with_war.count()

CPU times: user 14.9 ms, sys: 588 µs, total: 15.5 ms
Wall time: 177 ms


54

In [19]:
word_сounts = lines_with_war\
            .flatMap(lambda line: line.split(" "))\
            .map(lambda word: (word, 1))\
            .reduceByKey(lambda a, b: a + b)


In [20]:
# word_сounts.saveAsTextFile('warandpeace_histogram.txt')
# !hadoop fs -cat warandpeace_histogram.txt/*

# encoding is trashed ('cat' is stupid)

In [21]:
for file in glob('/home/workspace/data/warandpeace_histogram.txt/*'):
    os.remove(file)

!rm -rf /home/workspace/data/warandpeace_histogram.txt

word_сounts.saveAsTextFile('file:///home/workspace/data/warandpeace_histogram.txt')

In [22]:
lines = []
for file in glob('/home/workspace/data/warandpeace_histogram.txt/*'):
    with open(file, 'r', encoding="utf-8") as f:
        lines.extend(f.readlines())
print(*lines[:10])

('Еh', 1)
 ('mon', 6)
 ('prince.', 1)
 ('sont', 1)
 ('que', 9)
 ('des', 2)
 ('поместья,', 1)
 ('Buonaparte.', 1)
 ('Non,', 2)
 ('previens,', 1)



# Set operations

In [23]:
a = sc.parallelize([1,2,3,4])
b = sc.parallelize([3,4,6,7])


In [24]:
a.union(b).collect()

[1, 2, 3, 4, 3, 4, 6, 7]

In [25]:
a.union(b).distinct().collect()


[4, 1, 2, 6, 3, 7]

In [26]:
a.intersection(b).collect()

[4, 3]

In [27]:
a.subtract(b).collect()


[1, 2]

# Shared variables

## Broadcast vars

In [28]:
broadcast_var = sc.broadcast([1, 2, 3])


In [29]:
broadcast_var.value

[1, 2, 3]

## Accumulator vars

In [30]:
accum = sc.accumulator(0)

In [31]:
sc.parallelize([1,2,3,4]).foreach(lambda x: accum.add(x))

In [32]:
accum.value

10

In [33]:
pair = ('a', 'b')

# Top 10 taxi numbers

In [34]:
taxi_ = sc.textFile("file:///home/workspace/data/nyctaxi.csv")

In [35]:
taxi_.take(5)

['"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',
 '"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"',
 '"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"',
 '"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.0727

In [36]:
taxi = taxi_.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)

In [37]:
taxi.take(3)

['"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"',
 '"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"',
 '"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+001,-7.39942860000000E+001,"2D73B0C44F1699C67AB8AE322433BDB7","6F907BC9A85B7034C8418A24A0A75489",5,"2013-01-11 21:46:00",+4.07577480000000E+001,-7.39649810000000E+001,1,,+3.01000000000000E+000,960,"VTS"']

In [38]:
taxi_parse = taxi.map(lambda line: line.split(','))

In [39]:
taxi_med_key = taxi_parse.map(lambda row: (row[6], 1))

In [40]:
taxi_med_counts = taxi_med_key.reduceByKey(lambda a, b: a + b)

In [41]:
taxi_med_counts.map(lambda a: a[::-1]).top(10)

[(44, '"AB44AD9A03B7CFAF3925103BDCC0AF23"'),
 (41, '"71CACFBADF9568AAE88A843DB511D172"'),
 (41, '"6483B9BFCB216EC88986EA3AB13064E7"'),
 (41, '"4C73459B430339981D78795300433438"'),
 (40, '"67E71D24AF704D814A0A825005ADA72E"'),
 (40, '"02E5A4136FD0A775A023A005A4EABC62"'),
 (39, '"9DFBCD218E7116F34C044F0680A0FB8A"'),
 (39, '"8DEB70907D00AA1D7FF5E2683240549B"'),
 (39, '"7989C2AB3F345F4AB54D3CF1E0480D67"'),
 (39, '"6C9F67DF658DC5636F9E7752F203F70A"')]

In [42]:
taxiCounts = taxi.map(lambda line: line.split(',')).map(lambda row: (row[6], 1)).reduceByKey(lambda a, b: a + b)

In [43]:
taxiCounts.cache()

PythonRDD[58] at RDD at PythonRDD.scala:53

In [44]:
%%time
taxiCounts.count()

CPU times: user 13.5 ms, sys: 0 ns, total: 13.5 ms
Wall time: 1.82 s


13370

In [45]:
%%time
taxiCounts.count()

CPU times: user 14.3 ms, sys: 0 ns, total: 14.3 ms
Wall time: 111 ms


13370

# Storage levels

In [46]:
# import pyspark
# taxi.persist(storageLevel=pyspark.StorageLevel.MEMORY_ONLY)


# --------------------------------------------------------------------------------------------------

# Lab 1 TASKS

In [47]:
trips_data = sc.textFile('file:///home/workspace/data/trips.csv')
stations_data = sc.textFile('file:///home/workspace/data/stations.csv')

Let's see the headers

In [48]:
trips_header = trips_data.first()
trips_header

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [49]:
stations_header = stations_data.first()
stations_header

'id,name,lat,long,dock_count,city,installation_date'

Cut off the headers and split the rows

In [50]:
trips = trips_data\
        .mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)\
        .map(lambda row: row.split(","))

trips.take(3)

[['4576',
  '63',
  '',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127'],
 ['4607',
  '',
  '8/29/2013 14:42',
  'San Jose City Hall',
  '10',
  '8/29/2013 14:43',
  'San Jose City Hall',
  '10',
  '661',
  'Subscriber',
  '95138'],
 ['4130',
  '71',
  '8/29/2013 10:16',
  'Mountain View City Hall',
  '27',
  '8/29/2013 10:17',
  'Mountain View City Hall',
  '27',
  '48',
  'Subscriber',
  '97214']]

In [51]:
stations = stations_data\
            .mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)\
            .map(lambda row: row.split(","))

stations.take(3)

[['2',
  'San Jose Diridon Caltrain Station',
  '37.329732',
  '-121.90178200000001',
  '27',
  'San Jose',
  '8/6/2013'],
 ['3',
  'San Jose Civic Center',
  '37.330698',
  '-121.888979',
  '15',
  'San Jose',
  '8/5/2013'],
 ['4',
  'Santa Clara at Almaden',
  '37.333988',
  '-121.894902',
  '11',
  'San Jose',
  '8/6/2013']]

Keying stations by id

In [52]:
stations_indexed = stations.keyBy(lambda row: int(row[0]))
stations_indexed.take(1)

[(2,
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013'])]

Keying trips by station id (both by start station and by end station)

In [53]:
trips_by_start = trips.keyBy(lambda row: row[4])
trips_by_end = trips.keyBy(lambda row: row[7])

trips_by_start.take(1), trips_by_end.take(1)

([('66',
   ['4576',
    '63',
    '',
    'South Van Ness at Market',
    '66',
    '8/29/2013 14:14',
    'South Van Ness at Market',
    '66',
    '520',
    'Subscriber',
    '94127'])],
 [('66',
   ['4576',
    '63',
    '',
    'South Van Ness at Market',
    '66',
    '8/29/2013 14:14',
    'South Van Ness at Market',
    '66',
    '520',
    'Subscriber',
    '94127'])])

In [54]:
stations_indexed_df = stations_indexed.toDF()
trips_by_start_df = trips_by_start.toDF()
trips_by_end_df = trips_by_end.toDF()

start_trips_df = stations_indexed_df.join(trips_by_start_df, stations_indexed_df._1 == trips_by_start_df._1)
end_trips_df = stations_indexed_df.join(trips_by_end_df, stations_indexed_df._1 == trips_by_end_df._1)

start_trips_df.take(2)

[Row(_1=26, _2=['26', 'Redwood City Medical Center', '37.487682', '-122.223492', '15', 'Redwood City', '8/12/2013'], _1='26', _2=['4917', '169', '8/29/2013 18:45', 'Redwood City Medical Center', '26', '8/29/2013 18:48', 'Broadway at Main', '25', '229', 'Subscriber', '94041']),
 Row(_1=26, _2=['26', 'Redwood City Medical Center', '37.487682', '-122.223492', '15', 'Redwood City', '8/12/2013'], _1='26', _2=['6212', '385', '8/30/2013 17:18', 'Redwood City Medical Center', '26', '8/30/2013 17:24', 'Redwood City Caltrain Station', '22', '249', 'Subscriber', '94041'])]

As you can see, we can do the tasks operating with dataframes (but we would need to work with them from the very beginning (not keying anything, not losing the columns names etc.)). A good dataframe approach was shown in the interactive example.

However, in this lab I would like to use Data Models approach (creating data classes in Python manually). This allows more convinient control over the data in terms of type casting, changing date format etc. due to the fact that everything related to this will be in one place -- in the class description.

## Creating data models

In [55]:
from typing import NamedTuple
from datetime import datetime

In [56]:
stations_header

'id,name,lat,long,dock_count,city,installation_date'

In [57]:
class Station(NamedTuple):
    station_id: int
    name: str
    lat: float
    long: float
    dock_count: int
    city: str
    installation_date: str

    @staticmethod
    def init_from_list(row):
        try:
            return Station(
                station_id=int(row[0]),
                name=row[1],
                lat=float(row[2]),
                long=float(row[3]),
                dock_count=int(row[4]),
                city=row[5],
#                 installation_date=row[6],
                instalation_date = datetime.strptime(row[6], '%m/%/d/%Y'),
            )
        except Exception as e:
            print(e)
            return None  # if some field except str if empty, we'll return None

In [58]:
trips_header

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [59]:
class Trip(NamedTuple):
    trip_id: int
    duration: int
    start_date: datetime
    start_station_name: str
    start_station_id: int
    end_date: datetime
    end_station_name: str
    end_station_id: int
    bike_id: int
    subscription_type: str
    zip_code: str
        
    @staticmethod
    def init_from_list(row):
        try:
            return Trip(                             
                 trip_id=int(row[0]),
                 duration=int(row[1]),
                 start_date=datetime.strptime(row[2], '%m/%d/%Y %H:%M'),
                 start_station_name=row[3],
                 start_station_id=int(row[4]),
                 end_date=datetime.strptime(row[5], '%m/%d/%Y %H:%M'),
                 end_station_name=row[6],
                 end_station_id=row[7],
                 bike_id=int(row[8]),
                 subscription_type=row[9],
                 zip_code=row[10]
            )
        except Exception as e:
            print(e)
            return None  # if some field except str if empty, we'll return None

In [60]:
trips_internal = trips.map(Trip.init_from_list).filter(lambda x: x is not None)

In [61]:
trips_internal.first()

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

In [62]:
trips_internal.first().start_date

datetime.datetime(2013, 8, 29, 10, 16)

In [63]:
stations_internal = stations.map(Station.init_from_list).filter(lambda x: x is not None)

In [64]:
stations_internal.take(3)

[]

In [65]:
stations_internal.count()

0

## Recall the tasks:

1. Найти велосипед с максимальным временем пробега.
2. Найти наибольшее геодезическое расстояние между станциями.
3. Найти путь велосипеда с максимальным временем пробега через станции.
4. Найти количество велосипедов в системе.
5. Найти пользователей потративших на поездки более 3 часов.

1. Find a bike with the largest total trip time

In [66]:
trips_by_bike_id = trips_internal.keyBy(lambda x: x.bike_id)

In [67]:
durations_by_bike_id = trips_by_bike_id.mapValues(lambda x: x.duration)

In [68]:
total_durations_by_bike_id = durations_by_bike_id.reduceByKey(lambda a, b: a + b)

In [69]:
top10_total_durations = total_durations_by_bike_id.top(10, key=lambda x: x[1])
top10_total_durations

[(535, 18611693),
 (466, 3933272),
 (613, 2409014),
 (526, 2253019),
 (415, 2248886),
 (572, 2234149),
 (524, 2214314),
 (542, 2213422),
 (465, 2185170),
 (376, 2178177)]

In [70]:
top_bike_id = top10_total_durations[0][0]
top_bike_id

535

2. Find the largest distance between stations

In [71]:
from math import sqrt

In [72]:
coords_by_st = stations_internal.keyBy(lambda x: (x.station_id, x.name)).mapValues(lambda x: (x.lat, x.long))

In [73]:
pairs_coords_by_st = coords_by_st.cartesian(coords_by_st)  
# elements of type: (((station_id, name), (lat, long)), ((station_id, name), (lat, long)))

In [74]:
pairs_coords_by_st.take(3)

[]

In [75]:
def dist(a, b):
    return sqrt((a[0] - b[0])**2 + (a[1] - b[1])**2)

In [76]:
distances = pairs_coords_by_st.keyBy(lambda x: (x[0][0], x[1][0])).mapValues(lambda x: dist(x[0][1], x[1][1]))

In [77]:
distances.top(1, key=lambda x: x[1])

[]

3. Find the path went by a bicycle with the largest total trip time

In [78]:
top_bike_trips_sorted = trips_internal.filter(lambda trip: trip.bike_id == top_bike_id).sortBy(lambda trip: trip.start_date)

In [79]:
path = []

In [80]:
path.append(top_bike_trips_sorted.first().start_station_name)

In [81]:
path.extend(top_bike_trips_sorted.map(lambda trip: trip.end_station_name).collect())

In [82]:
path

['Post at Kearney',
 'San Francisco Caltrain (Townsend at 4th)',
 'San Francisco Caltrain 2 (330 Townsend)',
 'Market at Sansome',
 '2nd at South Park',
 'Davis at Jackson',
 'Civic Center BART (7th at Market)',
 'Post at Kearney',
 'Embarcadero at Sansome',
 'Washington at Kearney',
 'Market at Sansome',
 'Market at Sansome',
 '2nd at Folsom',
 '2nd at Townsend',
 '2nd at Townsend',
 'Embarcadero at Sansome',
 'Clay at Battery',
 'Harry Bridges Plaza (Ferry Building)',
 'Clay at Battery',
 'San Francisco Caltrain (Townsend at 4th)',
 'Steuart at Market',
 '2nd at Townsend',
 'Harry Bridges Plaza (Ferry Building)',
 'Townsend at 7th',
 'San Francisco Caltrain (Townsend at 4th)',
 'San Francisco Caltrain 2 (330 Townsend)',
 'Townsend at 7th',
 'San Francisco Caltrain (Townsend at 4th)',
 '2nd at South Park',
 '5th at Howard',
 'San Francisco Caltrain 2 (330 Townsend)',
 'Mechanics Plaza (Market at Battery)',
 'Powell at Post (Union Square)',
 'Powell at Post (Union Square)',
 '5th at Ho

4. Find the total number of bikes in the whole system

In [83]:
trips_internal.map(lambda trip: trip.bike_id).distinct().count()

700

5. Find users that spent more than 3 hours on their trips

In [84]:
# one trip is considered as one user

time_sec = 3 * 60 * 60
required_users = trips_internal.filter(lambda trip: (trip.duration > time_sec)).map(lambda trip: trip.trip_id)

In [85]:
required_users.count()

8322

In [86]:
required_users.take(10)

[4639, 4637, 4528, 4363, 4193, 4190, 4225, 4663, 4532, 4521]