### Initialization

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext, SparkConf
import pyspark as ps
from glob import glob

In [3]:
from pyspark.sql import SparkSession
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'

# Создаем объект SparkSession,точка входа для взаимодействия с оболочкой Spark
spark_session = SparkSession\
    .builder\
    .getOrCreate()

In [4]:
spark_session

In [5]:
# Создаем SparkContext, который устанавливает взаимодействие кластера и resource manager (yarn)
sc = spark_session._sc

### Введение

#### Перемещаем файлы в hdfs

In [7]:
!hadoop fs -put data/* data

#### Работа с файлом warandsociety.txt

In [8]:
# create RDD
warandpeace = sc.textFile("data/warandsociety.txt")

In [9]:
# output amountof lines
warandpeace.count()

12851

In [11]:
# show first 10 lines 
warandpeace.take(10)

['Лев Николаевич Толстой',
 'Война и мир. Книга 1',
 '',
 'Война и мир – 1',
 '',
 ' ',
 ' http://www.lib.ru',
 '',
 'Аннотация ',
 '']

In [17]:
# show number of partitions on cluster
warandpeace.getNumPartitions()

2

In [24]:
# find lines that contain word "war"
linesWithWar = warandpeace.filter(lambda x: "война" in x)

In [25]:
# show first line (till eol)
linesWithWar.first()

"– Еh bien, mon prince. Genes et Lucques ne sont plus que des apanages, des поместья, de la famille Buonaparte. Non, je vous previens, que si vous ne me dites pas, que nous avons la guerre, si vous vous permettez encore de pallier toutes les infamies, toutes les atrocites de cet Antichrist (ma parole, j'y crois) – je ne vous connais plus, vous n'etes plus mon ami, vous n'etes plus мой верный раб, comme vous dites. [Ну, что, князь, Генуа и Лукка стали не больше, как поместьями фамилии Бонапарте. Нет, я вас предупреждаю, если вы мне не скажете, что у нас война, если вы еще позволите себе защищать все гадости, все ужасы этого Антихриста (право, я верю, что он Антихрист) – я вас больше не знаю, вы уж не друг мой, вы уж не мой верный раб, как вы говорите.] Ну, здравствуйте, здравствуйте. Je vois que je vous fais peur, [Я вижу, что я вас пугаю,] садитесь и рассказывайте."

In [26]:
# move data to cache
linesWithWar.cache()

PythonRDD[10] at RDD at PythonRDD.scala:53

In [30]:
%%time
linesWithWar.count()

CPU times: user 6.01 ms, sys: 10.3 ms, total: 16.3 ms
Wall time: 662 ms


54

In [31]:
%%time
linesWithWar.count()

CPU times: user 10.3 ms, sys: 2.71 ms, total: 13 ms
Wall time: 151 ms


54

In [37]:
# fing histogramm of words
wordCounts = linesWithWar.flatMap(lambda x: map(lambda xx: (xx, 1), x.split(' '))).reduceByKey(lambda a, b: a + b)

In [40]:
wordCounts.saveAsTextFile("data/warandpeace_histogram.txt")

In [None]:
!hadoop fs -cat data/warandpeace_histogram.txt/*

In [43]:
!!hadoop fs -rm -r data/warandpeace_histogram.txt

['Deleted data/warandpeace_histogram.txt']

#### Операции с множествами

In [44]:
a = sc.parallelize([1,2,3,4])
b = sc.parallelize([3,4,6,7])

In [47]:
# find union of a and b, collect it to the driver node
a.union(b).collect()

[1, 2, 3, 4, 3, 4, 6, 7]

In [48]:
# remove duplicates
a.union(b).distinct().collect()

[4, 1, 2, 6, 3, 7]

#### Общие переменные

In [50]:
# Общие переменные удобны если вы обращаетесь к небольшому объёму данных на всех узлах.
broadcastVar = sc.broadcast([1,2,3])

In [51]:
broadcastVar.value

[1, 2, 3]

In [53]:
# Аккумулирующие переменные являются объектами, которые могут быть изменены только ассоциативной операцией добавления. Они используются для эффективной реализации счётчиков и суммарных значений. Вы можете также использовать свой тип, над котором определена ассоциативная операция при необходимости.
#Особенностью использования переменной является возможность доступа к значению только на узле в driver процессе.
accum = sc.accumulator(0)

In [54]:
# processing array in || in each task add element to acc value
sc.parallelize([1,2,3,4]).foreach(lambda x: accum.add(x))

In [55]:
accum.value

10

In [56]:
# (key, value)
pair = ('a', 'b')
pair[1]

'b'

#### Топ-10 популярных номеров такси

In [60]:
# read file to RDD
taxi = sc.textFile("data/nyctaxi.csv")

In [61]:
for t in taxi.take(3):
    print(t)

"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"
"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"
"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"


In [62]:
import itertools
# remove first row that is a title
taxi.mapPartitionsWithIndex(lambda idx, it:  itertools.islice(it,1,None) if (idx==0) else it  )

PythonRDD[52] at RDD at PythonRDD.scala:53

In [63]:
# create RDD where each line is splited into collection of sublines separated by comma
taxiParse = taxi.map(lambda line: line.split(","))

In [64]:
# array of lines -> (taxi number, 1)
taxiMedKey = taxiParse.map(lambda row: (row[6], 1))

In [65]:
# -> (taxi number, amount of rides)
taxiMedCounts = taxiMedKey.reduceByKey(lambda v1, v2: v1+v2)

In [69]:
top10 = taxiMedCounts.map(lambda x: x[::-1]).top(10)
for x in top10:
    print(x[::-1])

('"AB44AD9A03B7CFAF3925103BDCC0AF23"', 44)
('"71CACFBADF9568AAE88A843DB511D172"', 41)
('"6483B9BFCB216EC88986EA3AB13064E7"', 41)
('"4C73459B430339981D78795300433438"', 41)
('"67E71D24AF704D814A0A825005ADA72E"', 40)
('"02E5A4136FD0A775A023A005A4EABC62"', 40)
('"9DFBCD218E7116F34C044F0680A0FB8A"', 39)
('"8DEB70907D00AA1D7FF5E2683240549B"', 39)
('"7989C2AB3F345F4AB54D3CF1E0480D67"', 39)
('"6C9F67DF658DC5636F9E7752F203F70A"', 39)


In [70]:
# compact version
taxiCounts = taxi.map(lambda line: line.split(",")).map(lambda row: (row[6],1)).reduceByKey(lambda a,b: a + b)

In [73]:
taxiCounts.cache()

PythonRDD[70] at RDD at PythonRDD.scala:53

In [74]:
%%time
taxiCounts.count()

CPU times: user 2.16 ms, sys: 9.68 ms, total: 11.8 ms
Wall time: 138 ms


13371

In [75]:
%%time
taxiCounts.count()

CPU times: user 8.28 ms, sys: 2.79 ms, total: 11.1 ms
Wall time: 65.4 ms


13371

### Анализ данных велопарковок на языке Python из Jupyter книг:
L1_interactive_bike_analysis_python_with_rdd.ipynb, L1_interactive_bike_analysis_python_with_dataframes.ipynb.

#### with rdd

In [9]:
tripData = sc.textFile("data/trips.csv")
# запомним заголовок, чтобы затем его исключить из данных
tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))

In [10]:
stationData = sc.textFile("data/stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))

In [11]:
trips.take(1)

[['4576',
  '63',
  '',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127']]

In [12]:
list(enumerate(tripsHeader.split(",")))

[(0, 'id'),
 (1, 'duration'),
 (2, 'start_date'),
 (3, 'start_station_name'),
 (4, 'start_station_id'),
 (5, 'end_date'),
 (6, 'end_station_name'),
 (7, 'end_station_id'),
 (8, 'bike_id'),
 (9, 'subscription_type'),
 (10, 'zip_code')]

In [13]:
list(enumerate(stationsHeader.split(",")))

[(0, 'id'),
 (1, 'name'),
 (2, 'lat'),
 (3, 'long'),
 (4, 'dock_count'),
 (5, 'city'),
 (6, 'installation_date')]

In [14]:
# -> (id, [id, name, ...])
stationsIndexed = stations.keyBy(lambda station: station[0])

In [15]:
tripsByStartTerminals = trips.keyBy(lambda trip: trip[4])
tripsByEndTerminals = trips.keyBy(lambda trip: trip[7])

In [16]:
# join trips and stations by key
# (station id, ([station info], [trip info]))
startTrips = stationsIndexed.join(tripsByStartTerminals)
endTrips = stationsIndexed.join(tripsByEndTerminals)

In [17]:
startTrips.take(1)

[('4',
  (['4',
    'Santa Clara at Almaden',
    '37.333988',
    '-121.894902',
    '11',
    'San Jose',
    '8/6/2013'],
   ['4500',
    '109',
    '8/29/2013 13:25',
    'Santa Clara at Almaden',
    '4',
    '8/29/2013 13:27',
    'Adobe on Almaden',
    '5',
    '679',
    'Subscriber',
    '95112']))]

Lineage graph (chain of every individual step that happened i.e type of RDD created and method used to create it) shows every step from bottoms up, each change in indentation is an indication of shuffle operation.

In [18]:
print(startTrips.toDebugString().decode("utf-8")) 

(4) PythonRDD[22] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[13] at mapPartitions at PythonRDD.scala:145 []
 |  ShuffledRDD[12] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[11] at join at <ipython-input-16-819cbef38202>:3 []
    |  PythonRDD[10] at join at <ipython-input-16-819cbef38202>:3 []
    |  UnionRDD[9] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[7] at RDD at PythonRDD.scala:53 []
    |  data/stations.csv MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0 []
    |  data/stations.csv HadoopRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[8] at RDD at PythonRDD.scala:53 []
    |  data/trips.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
    |  data/trips.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [19]:
 print(endTrips.toDebugString().decode("utf-8"))

(4) PythonRDD[23] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[20] at mapPartitions at PythonRDD.scala:145 []
 |  ShuffledRDD[19] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[18] at join at <ipython-input-16-819cbef38202>:4 []
    |  PythonRDD[17] at join at <ipython-input-16-819cbef38202>:4 []
    |  UnionRDD[16] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[14] at RDD at PythonRDD.scala:53 []
    |  data/stations.csv MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0 []
    |  data/stations.csv HadoopRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[15] at RDD at PythonRDD.scala:53 []
    |  data/trips.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
    |  data/trips.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [20]:
# number of rows
startTrips.count()

669959

In [21]:
from pyspark.rdd import portable_hash

In [22]:
stationsIndexed.partitionBy(numPartitions=trips.getNumPartitions(), partitionFunc=lambda x: portable_hash(x[0]))

MapPartitionsRDD[28] at mapPartitions at PythonRDD.scala:145

In [23]:
print(stationsIndexed.partitioner)

None


##### Create a data model
Transform original string data into class objects

In [24]:
from typing import NamedTuple
from datetime import datetime

In [25]:
 def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

In [26]:
# runs separately on each partition (block) of the RDD
stationsInternal = stations.mapPartitions(initStation)
stationsInternal.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [27]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            ) 
        except:
            pass

In [28]:
tripsInternal = trips.mapPartitions(initTrip)
tripsInternal.take(2)

[Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214'),
 Trip(trip_id=4251, duration=77, start_date=datetime.datetime(2013, 8, 29, 11, 29), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 11, 30), end_station_name='San Jose City Hall', end_station_id='10', bike_id=26, subscription_type='Subscriber', zip_code='95060')]

Find trip avarage time for each start station:

In [29]:
# -> (station name, Trip[trip info])
tripsByStartStation = tripsInternal.keyBy(lambda trip: trip.start_station_name)

In [30]:
import sys
sys.path.append("./.local/lib/python3.9/site-packages/numpy")
import numpy as np

In [31]:
from pyspark.sql.functions import avg

In [32]:
? tripsByStartStation.aggregateByKey

[0;31mSignature:[0m
 [0mtripsByStartStation[0m[0;34m.[0m[0maggregateByKey[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mzeroValue[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mseqFunc[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mcombFunc[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mnumPartitions[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpartitionFunc[0m[0;34m=[0m[0;34m<[0m[0mfunction[0m [0mportable_hash[0m [0mat[0m [0;36m0x7f4b6cf93040[0m[0;34m>[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Aggregate the values of each key, using given combine functions and a neutral
"zero value". This function can return a different result type, U, than the type
of the values in this RDD, V. Thus, we need one operation for merging a V into
a U and one operation for merging two U's, The former operation is used for merging
values within a partition, and the latter is used for merging values betwee

In [33]:
def seqFunc(acc, duration):
    duration_sum, count = acc
    return (duration_sum + duration, count + 1)

def combFunc(acc1, acc2):
    duration_sum1, count1 = acc1
    duration_sum2, count2 = acc2
    return (duration_sum1+duration_sum2, count1+count2)

def meanFunc(acc):
    duration_sum, count = acc
    return duration_sum/count

avgDurationByStartStation2 = tripsByStartStation\
  .mapValues(lambda trip: trip.duration)\
  .aggregateByKey(
    zeroValue=(0,0),
    seqFunc=seqFunc,
    combFunc=combFunc)\
  .mapValues(meanFunc)

In [34]:
%%time
avgDurationByStartStation2.top(10, key=lambda x: x[1])

CPU times: user 10.8 ms, sys: 6.76 ms, total: 17.6 ms
Wall time: 17.8 s


[('University and Emerson', 7090.239417989418),
 ('California Ave Caltrain Station', 4628.005847953216),
 ('Redwood City Public Library', 4579.234741784037),
 ('Park at Olive', 4438.1613333333335),
 ('San Jose Civic Center', 4208.016938519448),
 ('Rengstorff Avenue / California Street', 4174.082373782108),
 ('Redwood City Medical Center', 3959.491961414791),
 ('Palo Alto Caltrain Station', 3210.6489815253435),
 ('San Mateo County Center', 2716.7700348432054),
 ('Broadway at Main', 2481.2537313432836)]

Find the first trip for each station:

In [35]:
 def earliestTrip(trips):
    if trips is None:
        return None
    if len(trips)==0:
        return trips
    trips = list(trips)
    min_date = trips[0].start_date
    min_trip = trips[0]
    for trip in trips[1:]:
        if min_date > trip.start_date:
            min_date = trip.start_date
            min_trip = trip
    return min_trip

firstGrouped = tripsByStartStation\
  .groupByKey()\
  .mapValues(lambda trips: earliestTrip(trips))

In [36]:
%%time
firstGrouped.take(5)

CPU times: user 12 ms, sys: 3.74 ms, total: 15.8 ms
Wall time: 27 s


[('Mountain View City Hall',
  Trip(trip_id=4081, duration=218, start_date=datetime.datetime(2013, 8, 29, 9, 38), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 9, 41), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=150, subscription_type='Subscriber', zip_code='97214')),
 ('Santa Clara at Almaden',
  Trip(trip_id=4500, duration=109, start_date=datetime.datetime(2013, 8, 29, 13, 25), start_station_name='Santa Clara at Almaden', start_station_id=4, end_date=datetime.datetime(2013, 8, 29, 13, 27), end_station_name='Adobe on Almaden', end_station_id='5', bike_id=679, subscription_type='Subscriber', zip_code='95112')),
 ('Mountain View Caltrain Station',
  Trip(trip_id=4505, duration=97713, start_date=datetime.datetime(2013, 8, 29, 13, 30), start_station_name='Mountain View Caltrain Station', start_station_id=28, end_date=datetime.datetime(2013, 8, 30, 16, 38), end_station_name='Mountain View City Hall', 

In [37]:
firstGrouped = tripsByStartStation\
  .reduceByKey(lambda tripA, tripB: tripA if tripA.start_date < tripB.start_date else tripB)

In [38]:
%%time

firstGrouped.take(5)

CPU times: user 962 µs, sys: 12.7 ms, total: 13.7 ms
Wall time: 17 s


[('Mountain View City Hall',
  Trip(trip_id=4081, duration=218, start_date=datetime.datetime(2013, 8, 29, 9, 38), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 9, 41), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=150, subscription_type='Subscriber', zip_code='97214')),
 ('Santa Clara at Almaden',
  Trip(trip_id=4500, duration=109, start_date=datetime.datetime(2013, 8, 29, 13, 25), start_station_name='Santa Clara at Almaden', start_station_id=4, end_date=datetime.datetime(2013, 8, 29, 13, 27), end_station_name='Adobe on Almaden', end_station_id='5', bike_id=679, subscription_type='Subscriber', zip_code='95112')),
 ('Mountain View Caltrain Station',
  Trip(trip_id=4505, duration=97713, start_date=datetime.datetime(2013, 8, 29, 13, 30), start_station_name='Mountain View Caltrain Station', start_station_id=28, end_date=datetime.datetime(2013, 8, 30, 16, 38), end_station_name='Mountain View City Hall', 

#### with dataframes

In [39]:
tripData = spark_session.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("data/trips.csv")

tripData

DataFrame[id: int, duration: int, start_date: timestamp, start_station_name: string, start_station_id: int, end_date: timestamp, end_station_name: string, end_station_id: int, bike_id: int, subscription_type: string, zip_code: string]

In [40]:
 tripData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [41]:
 tripData.show(n=3)

+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|         start_date|  start_station_name|start_station_id|           end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|               null|South Van Ness at...|              66|2013-08-29 14:14:00|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|    null|2013-08-29 14:42:00|  San Jose City Hall|              10|2013-08-29 14:43:00|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|2013-08-29 10:16:00|Mountain View Cit...|              27|2013-08-29 10:17:00|Mountain View Cit...|            27|     48|       Subscriber|   97214|
+----+----

In [42]:
? tripData.dropna

[0;31mSignature:[0m  [0mtripData[0m[0;34m.[0m[0mdropna[0m[0;34m([0m[0mhow[0m[0;34m=[0m[0;34m'any'[0m[0;34m,[0m [0mthresh[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0msubset[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Returns a new :class:`DataFrame` omitting rows with null values.
:func:`DataFrame.dropna` and :func:`DataFrameNaFunctions.drop` are aliases of each other.

.. versionadded:: 1.3.1

Parameters
----------
how : str, optional
    'any' or 'all'.
    If 'any', drop a row if it contains any nulls.
    If 'all', drop a row only if all its values are null.
thresh: int, optional
    default None
    If specified, drop rows that have less than `thresh` non-null values.
    This overwrites the `how` parameter.
subset : str, tuple or list, optional
    optional list of column names to consider.

Examples
--------
>>> df4.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
+---

In [43]:
stationData = spark_session.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y')\
.csv("data/stations.csv")

stationData.printSchema()
stationData.show(n=3)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)

+---+--------------------+---------+-------------------+----------+--------+-------------------+
| id|                name|      lat|               long|dock_count|    city|  installation_date|
+---+--------------------+---------+-------------------+----------+--------+-------------------+
|  2|San Jose Diridon ...|37.329732|-121.90178200000001|        27|San Jose|2013-08-06 00:00:00|
|  3|San Jose Civic Ce...|37.330698|        -121.888979|        15|San Jose|2013-08-05 00:00:00|
|  4|Santa Clara at Al...|37.333988|        -121.894902|        11|San Jose|2013-08-06 00:00:00|
+---+--------------------+---------+-------------------+----------+--------+-------------------+
only showing top 3 rows



#### DataFrame API usage example

In [44]:
stationsView = stationData.select(stationData['id'], stationData['name'], stationData['lat'], stationData['long'])
stationsView.show()

+---+--------------------+------------------+-------------------+
| id|                name|               lat|               long|
+---+--------------------+------------------+-------------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|
|  8| San Salvador at 1st|         37.330165|-121.88583100000001|
|  9|           Japantown|         37.348742|-121.89471499999999|
| 10|  San Jose City Hall|         37.337391|        -121.886995|
| 11|         MLK Library|         37.335885|-121.88566000000002|
| 12|SJSU 4th at San C...|         37.332808|-121.88389099999999|
| 13|       St James Park|         37.339301|-121.88993700000002|
| 14|Arena

In [45]:
startTrips = tripData.select(tripData.id, tripData.duration, tripData.start_station_id).withColumnRenamed('id', 'trip_id').join(stationsView, tripData.start_station_id == stationsView.id)
startTrips = startTrips.drop('id') 

In [46]:
 startTrips.show()

+-------+--------+----------------+--------------------+------------------+-------------------+
|trip_id|duration|start_station_id|                name|               lat|               long|
+-------+--------+----------------+--------------------+------------------+-------------------+
|   4576|      63|              66|South Van Ness at...|         37.774814|        -122.418954|
|   4607|    null|              10|  San Jose City Hall|         37.337391|        -121.886995|
|   4130|      71|              27|Mountain View Cit...|         37.389218|        -122.081896|
|   4251|      77|              10|  San Jose City Hall|         37.337391|        -121.886995|
|   4299|      83|              66|South Van Ness at...|         37.774814|        -122.418954|
|   4927|     103|              59| Golden Gate at Polk|         37.781332|        -122.418603|
|   4500|     109|               4|Santa Clara at Al...|         37.333988|        -121.894902|
|   4563|     111|               8| San 

## Tasks
Набор данных: данные велопарковок Сан-Франциско (trips.csv, stations.csv)

1. Найти велосипед с максимальным временем пробега.  
2. Найти наибольшее геодезическое расстояние между станциями.  
3. Найти путь велосипеда с максимальным временем пробега через станции.  
4. Найти количество велосипедов в системе.  
5. Найти пользователей потративших на поездки более 3 часов.  

### 1 Найти велосипед с максимальным временем пробега.

In [197]:
type(tripsInternal)

pyspark.rdd.PipelinedRDD

In [198]:
# -> (bike_id, Trip[])
# -> (bike_id, one of duration)
# -> (bike_id, sum of durations)
tmp1 = tripsInternal\
    .keyBy(lambda x: x.bike_id)\
    .mapValues(lambda x: x.duration)\
    .reduceByKey(lambda x, y: x+y)

In [199]:
tmp2 = tmp1.map(lambda x: x[::-1]).top(1)
tmp2

[(18611693, 535)]

In [200]:
print("Answer (bike id): ", tmp2[0][1])

Answer (bike id):  535


### 2 Найти наибольшее геодезическое расстояние между станциями.

Distance from a to b = d* R  
d = arccos(COS(a.Latitude)* COS(b.Latitude)* COS(a.Longitude - b.Longitude) + SIN(a.Latitude)* SIN(b.Latitude))  
R = 6371 км — средний радиус земного шара  
from http://osiktakan.ru/geo_koor.htm

In [161]:
import numpy as np

In [193]:
def distance(a, b):
    a_lat = a[0]
    a_long = a[1]
    b_lat = b[0]
    b_long = b[1]
    delta = a_long - b_long
    return 111.111*np.arccos(np.cos(a_lat)*np.cos(b_lat)*np.cos(delta) + np.sin(a_lat)*np.sin(b_lat))

In [180]:
stationscoord = stationsView.rdd\
    .keyBy(lambda x: (x.name))\
    .mapValues(lambda x: (x.lat, x.long))

In [181]:
# rdd
grid = stationscoord.cartesian(stationscoord)

In [182]:
grid.take(1)

[(('San Jose Diridon Caltrain Station', (37.329732, -121.90178200000001)),
  ('San Jose Diridon Caltrain Station', (37.329732, -121.90178200000001)))]

In [183]:
# -> ((station1, station2), (coord1, coord2))
routes = grid\
    .keyBy(lambda x: (x[0][0], x[1][0]))\
    .mapValues(lambda x: (x[0][1], x[1][1]))

In [184]:
routes.take(1)

[(('San Jose Diridon Caltrain Station', 'San Jose Diridon Caltrain Station'),
  ((37.329732, -121.90178200000001), (37.329732, -121.90178200000001)))]

In [187]:
# (station: x[0][], coord: x[1][])
distances = routes.mapValues(lambda x: distance(x[0], x[1]))

In [188]:
distances.take(4)

[(('San Jose Diridon Caltrain Station', 'San Jose Diridon Caltrain Station'),
  0.0),
 (('San Jose Diridon Caltrain Station', 'San Jose Civic Center'),
  1.3311857254441992),
 (('San Jose Diridon Caltrain Station', 'Santa Clara at Almaden'),
  0.8559554481844344),
 (('San Jose Diridon Caltrain Station', 'Adobe on Almaden'),
  0.9089709851794548)]

In [190]:
ans = distances.map(lambda x: x[1]).top(1)

In [192]:
print(f"Answer (max distance): {ans} km") 

Answer (max distance): [77.63109804183553] km


### 3 Найти путь велосипеда с максимальным временем пробега через станции.

In [195]:
tripsInternal.take(1)

[Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')]

In [210]:
# tmp2[0][1] - id велосипеда с макс временем пробега из задания1
path = tripsInternal\
    .filter(lambda x: x.bike_id == tmp2[0][1])\
    .keyBy(lambda x: x.start_date)\
    .sortByKey()

In [211]:
path.take(2)

[(datetime.datetime(2013, 8, 29, 19, 32),
  Trip(trip_id=4966, duration=1245, start_date=datetime.datetime(2013, 8, 29, 19, 32), start_station_name='Post at Kearney', start_station_id=47, end_date=datetime.datetime(2013, 8, 29, 19, 53), end_station_name='San Francisco Caltrain (Townsend at 4th)', end_station_id='70', bike_id=535, subscription_type='Customer', zip_code='94123')),
 (datetime.datetime(2013, 8, 29, 21, 38),
  Trip(trip_id=5067, duration=423, start_date=datetime.datetime(2013, 8, 29, 21, 38), start_station_name='San Francisco Caltrain (Townsend at 4th)', start_station_id=70, end_date=datetime.datetime(2013, 8, 29, 21, 45), end_station_name='San Francisco Caltrain 2 (330 Townsend)', end_station_id='69', bike_id=535, subscription_type='Subscriber', zip_code='94133'))]

In [212]:
path_stations = path.map(lambda x: (x[1].start_station_name, x[1].end_station_name))

In [218]:
path_stations.collect()[:10]

[('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)'),
 ('San Francisco Caltrain (Townsend at 4th)',
  'San Francisco Caltrain 2 (330 Townsend)'),
 ('San Francisco Caltrain 2 (330 Townsend)', 'Market at Sansome'),
 ('Market at Sansome', '2nd at South Park'),
 ('2nd at Townsend', 'Davis at Jackson'),
 ('San Francisco City Hall', 'Civic Center BART (7th at Market)'),
 ('Civic Center BART (7th at Market)', 'Post at Kearney'),
 ('Post at Kearney', 'Embarcadero at Sansome'),
 ('Embarcadero at Sansome', 'Washington at Kearney'),
 ('Washington at Kearney', 'Market at Sansome')]

### 4 Найти количество велосипедов в системе.

In [219]:
tripsInternal.map(lambda x: x.bike_id).distinct().count()

700

### 5 Найти пользователей потративших на поездки более 3 часов.

In [228]:
period = 3*3600 
# let trip_id = user_id
users = tripsInternal\
    .filter(lambda x: x.duration > period)\
    .map(lambda x: x.trip_id)

In [231]:
print(f"Answer (trip ids): {users.take(10)} ...")

Answer (trip ids): [4639, 4637, 4528, 4363, 4193, 4190, 4225, 4663, 4532, 4521] ...


In [232]:
sc.stop()