<a href="https://colab.research.google.com/github/PiotrMaciejKowalski/kurs-analiza-danych-2022/blob/create-spark-materials/Tydzie%C5%84%206/Preprocessing%20i%20MLLib%20w%20Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup Sparka

## Utworzenie środowiska pyspark do obliczeń

Tworzymy swoje środowisko z pysparkiem we wenętrzu naszych zasobów chmurowych

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q ftp.ps.pl/pub/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
!pip install -q findspark

import findspark
findspark.init()

## Utworzenie sesji z pyspark


Utworzymy testowo sesję aby zobaczyć czy działa. Element ten jest wspólny również gdy systemy sparkowe pracują w sposób ciągły, a nie są tworzone przez naszą sesję.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

## Apache Spark 

**Apache Spark** to zunifikowany silnik do obliczeń rozproszonych na licencji open-source. Oferuje interfejs pozwalający na programowanie obliczeń na klastrach z domyślną paralelizacją oraz odpornością na awarie.

Ze Sparkiem pracować można w Scali, Pythonie, Javie oraz R.

Jego przewaga nad model Map-Reduce Hadoopa polega na unikaniu zapisów na hdfs tak długo jak to możliwe - i posługiwaniu się RAMem nodów jak długo go wystarcza.

**Komponenty Sparka:**

<img src="https://raw.githubusercontent.com/PiotrMaciejKowalski/kurs-analiza-danych-2022/main/Tydzie%C5%84%206/cluster-overview.png" alt="title" width="500"/>

* Spark "core" - podstawa Sparka z podstawową abstrakcją danych nazywaną RDD
* Spark SQL - komponent pozwalający na operowanie na ustrukturyzowanych danych z wykorzystaniem operacji znanych z SQL - łatwy w użyciu
* Spark MLlib - komponent zawierający algorytmy ML dostępne w Sparku - ML na skalę klastrów
* Spark Streaming - moduł pozwalający na pracę ze strumnieniami danych
* Spark GraphX - komponent do pracy z grafami

**Architektura Sparka:**

<img src="https://raw.githubusercontent.com/PiotrMaciejKowalski/kurs-analiza-danych-2022/main/Tydzie%C5%84%206/spark-stack.png" alt="title" width="500"/>

* driver - proces uruchamiający główną funkcję aplikacji i tworzący SparkContext
* executor(y) - proces uruchomiony dla aplikacji w węźle roboczym (worker node), który uruchamia zadania i przechowuje dane w pamięci lub na dysku. Każda aplikacja ma własne executory
* cluster manager - dostępne opcje: YARN, Mesos, Kubernetes, Standalone

**SparkSession:**
* wprowadzony w Spark 2.0
* składa się ze SparkContextu, SQLContextu oraz HiveContext
* zwykle nazywany w kodzie `spark`
* kroki niezbędne do utworzenia SparkSession w pySparku:

> from pyspark.sql import SparkSession  
> spark = SparkSession.builder.getOrCreate()

## RDD

Podstawowym formatem danych (coś jak tabela w db) jest RDD. Skrót rozwija się następująco:
* R - resilient (elastyczny)
* D - distributed
* D - dataset

Model RDD jest napisany w sposób wspierający przekstrzałcenia Map-Reduce jako domyślny. W związku z powyższym wykazuje się następującymi własnościami:


* immutable - każdy obiekt jest niezmienniczy. Chcesz coś zmienić - musisz utworzyć nowy rdd
* in-memory - przetwarzany głównie w RAM
* lazy evaluated - silnik obliczeniowy wykonuje obliczenia dopiero gdy okażą się konieczne.
* parallel - współbieżny 

Z RDD stowarzyszone są dwa rodzaje czynności:
* akcje, oraz
* transformacje

### Transformacje 

Modelują czynności jakie możemy chcieć wykonywać na danych. Przekształcenia (map), redukcje (reduce), filtry (filter). Mają charakter opisu skąd się biorą pewne wartości. W naszym ujęciu mogą odpowiadać funkcjom mapper, reducer i podobnym. 

_Dla osób, które kojarzą paradygmat funkcyjny programowania - można dodać, że transformacje dotyczą funkcji czystych._

### Akcje

Modelują czynności z uwagi na wynik jaki oczekujemy. Wyświetl, zapisz, wyszukaj. Mają charakter silnie połączony z wynikiem działania.

Aby obliczenia na danych zostały wykonane - musi zostać uruchomiona akcja. Dopiero ona wykona odpowiednie (i tylko te konieczne) transformacje.

## DataFrame

Choć RDD są wszędzie w Sparku, obecnie już się ich nie widzi. Od Sparka w wersji 2.0 zostały przesłonięte nowym interfejsem (zostały spakowane do wnętrza) czegoś nazywanego Ramką Danych (Dataframe). Skojarzenie z dataframe z R lub Pandas Python jest tutaj bardzo naturalne i prawdziwe. DataFrame Sparka były na nich wzorowane i pokrywają się w dużym obszarze składni.

**DataFrame:**
* abstrakcja danych z modułu Spark SQL
* zawiera dodatkowe informacje o strukturze danych (schema)
* pozwala na pracę z danymi wykorzysując zapytania znane z SQL/Hive

Dalej zaprezentujemy jak to się odbywa w praktyce

## Podłączenie Google Drive do sesji colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Przykładowy processing danych w Spark

## Wczytanie danych do Sparka

W tej części wczytamy sobie nasz plik `flights.csv` do przetwarzania w Spark.

Z uwagi na to, że nasz plik to csv bez nagłówka - trzeba zdefiniować schemat dla danych, które przetwarzamy

Przypomnijmy listę pól
YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,\
DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,\
TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,\
TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,\
CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,\
LATE_AIRCRAFT_DELAY,WEATHER_DELAY

_Sporo ich. Więc najpierw trochę magii (bo nie chce mi się kodować każdego pola ręcznie, a jestem leniwy). Ufam, że przykład pozwoli rozszerzyć zastosowanie do bardziej skomplikowanych zastosowań. Na pocieszenie dodam, że wczytywanie csv bez nagłówka to najgorszy scenariusz w wersji wczytywania w sparku._

In [None]:
pola_zbiorczo = '''YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY'''
pola = pola_zbiorczo.split(',')

A dalej użyjemy funkcji add(Nazwa, Typ pola, czy może być null) do zapisania prostego schematu danych

In [None]:
from pyspark.sql.types import StructType, StringType

schemat = StructType()
for pole in pola:
    schemat = schemat.add(pole, StringType(), True)

Przejdźmy do wczytywania

In [None]:
df = spark.read.format('csv').option("header", False).schema(schemat).load('/content/drive/MyDrive/flights.csv')
df.show(5)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [None]:
df.columns

['YEAR',
 'MONTH',
 'DAY',
 'DAY_OF_WEEK',
 'AIRLINE',
 'FLIGHT_NUMBER',
 'TAIL_NUMBER',
 'ORIGIN_AIRPORT',
 'DESTINATION_AIRPORT',
 'SCHEDULED_DEPARTURE',
 'DEPARTURE_TIME',
 'DEPARTURE_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'SCHEDULED_TIME',
 'ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'WHEELS_ON',
 'TAXI_IN',
 'SCHEDULED_ARRIVAL',
 'ARRIVAL_TIME',
 'ARRIVAL_DELAY',
 'DIVERTED',
 'CANCELLED',
 'CANCELLATION_REASON',
 'AIR_SYSTEM_DELAY',
 'SECURITY_DELAY',
 'AIRLINE_DELAY',
 'LATE_AIRCRAFT_DELAY',
 'WEATHER_DELAY']

In [None]:
df['YEAR','MONTH','DAY','AIRLINE','DISTANCE'].show()

+----+-----+---+-------+--------+
|YEAR|MONTH|DAY|AIRLINE|DISTANCE|
+----+-----+---+-------+--------+
|2015|    1|  1|     AS|    1448|
|2015|    1|  1|     AA|    2330|
|2015|    1|  1|     US|    2296|
|2015|    1|  1|     AA|    2342|
|2015|    1|  1|     AS|    1448|
|2015|    1|  1|     DL|    1589|
|2015|    1|  1|     NK|    1299|
|2015|    1|  1|     US|    2125|
|2015|    1|  1|     AA|    1464|
|2015|    1|  1|     DL|    1747|
|2015|    1|  1|     DL|    1199|
|2015|    1|  1|     AA|    2174|
|2015|    1|  1|     DL|    1535|
|2015|    1|  1|     DL|    1590|
|2015|    1|  1|     DL|    1399|
|2015|    1|  1|     AS|    1448|
|2015|    1|  1|     DL|    1448|
|2015|    1|  1|     UA|    1635|
|2015|    1|  1|     AS|    1542|
|2015|    1|  1|     DL|    1426|
+----+-----+---+-------+--------+
only showing top 20 rows



In [None]:
df.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

In [None]:
%%time
df.count()

CPU times: user 82 ms, sys: 19.7 ms, total: 102 ms
Wall time: 11.4 s


5819079

## Zapytania Spark-SQL

Zapytania do Sparka kierowane są za pomocą 

* składni a.k.a. SQL, lub
* wyrażone w ORM (object relational mapping) czyli obiektowym sposobie na wyrażanie kwerend.

Składniowo wydaje się, że zapytania SQL są łatwiejsze do zapisania. W kilku przypadkach jednak jawne zadanie kolejności obliczeń może pomóc zoptymalizować kształt zapytania.

Zaprezentuje kilka podstawowych sposób na odpytywanie Spark DataFrame kwerendami o różnych naturach. Zawsze podane obe będą w postaci SparkSQL oraz wyrażenia ORM.

### Proste kwerendy

Zanim zacznimy pisać kwerendy należy jeszcze dodać nasz DataFrame do 'przestrzeni nazw tabel' Sparka. Formalnie nazywane jest to widokiem danych 

In [None]:
df.createOrReplaceTempView("df")

Wykonajmy prostego Selecta z tego zbioru.

Przypominam, że do uruchomienia sparka potrzebna jest akcja. Np. taki `show()`

In [None]:
spark.sql('select airline, distance from df').show()

+-------+--------+
|airline|distance|
+-------+--------+
|     AS|    1448|
|     AA|    2330|
|     US|    2296|
|     AA|    2342|
|     AS|    1448|
|     DL|    1589|
|     NK|    1299|
|     US|    2125|
|     AA|    1464|
|     DL|    1747|
|     DL|    1199|
|     AA|    2174|
|     DL|    1535|
|     DL|    1590|
|     DL|    1399|
|     AS|    1448|
|     DL|    1448|
|     UA|    1635|
|     AS|    1542|
|     DL|    1426|
+-------+--------+
only showing top 20 rows



In [None]:
df.select('airline','distance').show()

+-------+--------+
|airline|distance|
+-------+--------+
|     AS|    1448|
|     AA|    2330|
|     US|    2296|
|     AA|    2342|
|     AS|    1448|
|     DL|    1589|
|     NK|    1299|
|     US|    2125|
|     AA|    1464|
|     DL|    1747|
|     DL|    1199|
|     AA|    2174|
|     DL|    1535|
|     DL|    1590|
|     DL|    1399|
|     AS|    1448|
|     DL|    1448|
|     UA|    1635|
|     AS|    1542|
|     DL|    1426|
+-------+--------+
only showing top 20 rows



### Sprawniejsze wyświetlanie danych sparkowych

In [None]:
df.select('airline','distance').limit(20).toPandas()

Unnamed: 0,airline,distance
0,AS,1448
1,AA,2330
2,US,2296
3,AA,2342
4,AS,1448
5,DL,1589
6,NK,1299
7,US,2125
8,AA,1464
9,DL,1747


## Proste grupowania i agregacje

Dalej proste pogrupowanie z polem poddanym agregacji.

In [None]:
%%time
spark.sql('select airline, count(*) as count from df group by airline').show()

+-------+-------+
|airline|  count|
+-------+-------+
|     UA| 515723|
|     NK| 117379|
|     AA| 725984|
|     EV| 571977|
|     B6| 267048|
|     DL| 875881|
|     OO| 588353|
|     F9|  90836|
|     US| 198715|
|     MQ| 294632|
|     HA|  76272|
|     AS| 172521|
|     VX|  61903|
|     WN|1261855|
+-------+-------+

CPU times: user 111 ms, sys: 9.64 ms, total: 121 ms
Wall time: 16.7 s


In [None]:
%%time
df.groupBy('airline').count().show()

+-------+-------+
|airline|  count|
+-------+-------+
|     UA| 515723|
|     NK| 117379|
|     AA| 725984|
|     EV| 571977|
|     B6| 267048|
|     DL| 875881|
|     OO| 588353|
|     F9|  90836|
|     US| 198715|
|     MQ| 294632|
|     HA|  76272|
|     AS| 172521|
|     VX|  61903|
|     WN|1261855|
+-------+-------+

CPU times: user 100 ms, sys: 5.72 ms, total: 106 ms
Wall time: 14 s


## Klauzala sortująca

Możemy dane uporządkować względem kolumny

In [None]:
%%time
spark.sql('select airline, count(*) as count from df group by airline order by count').show()

+-------+-------+
|airline|  count|
+-------+-------+
|     VX|  61903|
|     HA|  76272|
|     F9|  90836|
|     NK| 117379|
|     AS| 172521|
|     US| 198715|
|     B6| 267048|
|     MQ| 294632|
|     UA| 515723|
|     EV| 571977|
|     OO| 588353|
|     AA| 725984|
|     DL| 875881|
|     WN|1261855|
+-------+-------+

CPU times: user 95.2 ms, sys: 17.4 ms, total: 113 ms
Wall time: 14.2 s


In [None]:
%%time
df.groupBy('airline').count().orderBy('count').show()

+-------+-------+
|airline|  count|
+-------+-------+
|     VX|  61903|
|     HA|  76272|
|     F9|  90836|
|     NK| 117379|
|     AS| 172521|
|     US| 198715|
|     B6| 267048|
|     MQ| 294632|
|     UA| 515723|
|     EV| 571977|
|     OO| 588353|
|     AA| 725984|
|     DL| 875881|
|     WN|1261855|
+-------+-------+

CPU times: user 101 ms, sys: 13.8 ms, total: 115 ms
Wall time: 13.8 s


Możemy zmienić funkcje agregacji na mniej oczywistą lub zadać ich więcej.


In [None]:
%%time
spark.sql('select airline, max(distance) as maks, min(distance) as min from df group by airline').show()

+-------+----+----+
|airline|maks| min|
+-------+----+----+
|     UA| 997| 100|
|     NK| 986|1005|
|     AA| 993|1005|
|     EV| 999| 100|
|     B6| 997|1005|
|     DL| 991|1005|
|     OO| 996|1004|
|     F9| 993|1005|
|     US| 993|1009|
|     MQ| 999|1013|
|     HA|  84| 100|
|     AS| 987|1009|
|     VX| 954|1067|
|     WN| 999|1005|
+-------+----+----+

CPU times: user 136 ms, sys: 15.7 ms, total: 152 ms
Wall time: 19.4 s


In [None]:
%%time 
from pyspark.sql import functions as sf #spark functions

df.groupBy('airline').agg(sf.max('distance').alias('maks'), sf.min('distance').alias('min')).show()

+-------+----+----+
|airline|maks| min|
+-------+----+----+
|     UA| 997| 100|
|     NK| 986|1005|
|     AA| 993|1005|
|     EV| 999| 100|
|     B6| 997|1005|
|     DL| 991|1005|
|     OO| 996|1004|
|     F9| 993|1005|
|     US| 993|1009|
|     MQ| 999|1013|
|     HA|  84| 100|
|     AS| 987|1009|
|     VX| 954|1067|
|     WN| 999|1005|
+-------+----+----+

CPU times: user 155 ms, sys: 26.2 ms, total: 182 ms
Wall time: 17.3 s


## Filtrowanie danych

Bardzo ważne jest oczywiście odflitrowanie części dużego zbioru danych. 

_Uwaga pamiętajmy, że leniwie wczytując plik skazałem wszystkie pola na bycie Stringami._

In [None]:
%%time
spark.sql('select airline, count(*) as count from df  where day_of_week = "2" group by airline').show()

+-------+------+
|airline| count|
+-------+------+
|     UA| 74945|
|     NK| 16359|
|     AA|103401|
|     EV| 83541|
|     B6| 37753|
|     DL|128412|
|     OO| 84054|
|     F9| 12991|
|     US| 28496|
|     MQ| 42970|
|     HA| 10516|
|     AS| 24165|
|     VX|  8990|
|     WN|188007|
+-------+------+

CPU times: user 99.3 ms, sys: 7.21 ms, total: 106 ms
Wall time: 12.8 s


In [None]:
%%time 
df.where('day_of_week = "2"').groupBy('airline').count().show()

+-------+------+
|airline| count|
+-------+------+
|     UA| 74945|
|     NK| 16359|
|     AA|103401|
|     EV| 83541|
|     B6| 37753|
|     DL|128412|
|     OO| 84054|
|     F9| 12991|
|     US| 28496|
|     MQ| 42970|
|     HA| 10516|
|     AS| 24165|
|     VX|  8990|
|     WN|188007|
+-------+------+

CPU times: user 112 ms, sys: 10.6 ms, total: 123 ms
Wall time: 14.1 s


# Zaawansowany preprocessing danych w Spark

Aby dokładniej poznać możliwości oferowane przez Sparka rozbudujemy przejrzymy listę operacji i sposoby ich wykorzystywania ponownie i dokładniej.

## Perspektywa RDD

RDD jest originalnym interfejsem dostępu do danych w Sparku. I w kryzysowych sytuacjach również z niego można korzystać. W nich (w odróżnieniu do DataFrame) dane składowane są bez porządku jaki dostarcza schemat danych. 

In [None]:
# tworzenie RDD ręcznie
%%time
sc = spark.sparkContext

data = sc.parallelize(['A', 'B', 'C'])

CPU times: user 7.92 ms, sys: 0 ns, total: 7.92 ms
Wall time: 37.8 ms


In [None]:
%%time
data.collect()

CPU times: user 8.9 ms, sys: 1.24 ms, total: 10.1 ms
Wall time: 53.9 ms


['A', 'B', 'C']

Aby uzyskać dostęp do danych RDD można wydobyć je z DataFrame

In [None]:
rdd_data = df.rdd
rdd_data_sample = sc.parallelize(df.rdd.take(1000))

In [None]:
%%time 
rdd_data.take(5)

CPU times: user 9.03 ms, sys: 548 µs, total: 9.58 ms
Wall time: 134 ms


[Row(YEAR='2015', MONTH='1', DAY='1', DAY_OF_WEEK='4', AIRLINE='AS', FLIGHT_NUMBER='98', TAIL_NUMBER='N407AS', ORIGIN_AIRPORT='ANC', DESTINATION_AIRPORT='SEA', SCHEDULED_DEPARTURE='0005', DEPARTURE_TIME='2354', DEPARTURE_DELAY='-11', TAXI_OUT='21', WHEELS_OFF='0015', SCHEDULED_TIME='205', ELAPSED_TIME='194', AIR_TIME='169', DISTANCE='1448', WHEELS_ON='0404', TAXI_IN='4', SCHEDULED_ARRIVAL='0430', ARRIVAL_TIME='0408', ARRIVAL_DELAY='-22', DIVERTED='0', CANCELLED='0', CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR='2015', MONTH='1', DAY='1', DAY_OF_WEEK='4', AIRLINE='AA', FLIGHT_NUMBER='2336', TAIL_NUMBER='N3KUAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='PBI', SCHEDULED_DEPARTURE='0010', DEPARTURE_TIME='0002', DEPARTURE_DELAY='-8', TAXI_OUT='12', WHEELS_OFF='0014', SCHEDULED_TIME='280', ELAPSED_TIME='279', AIR_TIME='263', DISTANCE='2330', WHEELS_ON='0737', TAXI_IN='4', SCHEDULED_ARRIVAL

In [None]:
%%time
rdd_data_sample.map(lambda row: row[4]+row[5]).collect()

CPU times: user 12.5 ms, sys: 3.65 ms, total: 16.2 ms
Wall time: 133 ms


['AS98',
 'AA2336',
 'US840',
 'AA258',
 'AS135',
 'DL806',
 'NK612',
 'US2013',
 'AA1112',
 'DL1173',
 'DL2336',
 'AA1674',
 'DL1434',
 'DL2324',
 'DL2440',
 'AS108',
 'DL1560',
 'UA1197',
 'AS122',
 'DL1670',
 'NK520',
 'AA371',
 'NK214',
 'AA115',
 'DL1450',
 'UA1545',
 'AS130',
 'NK597',
 'US413',
 'AA2392',
 'NK168',
 'AA2211',
 'AS136',
 'DL95',
 'NK298',
 'HA17',
 'US617',
 'UA1528',
 'AS134',
 'B6304',
 'NK451',
 'NK972',
 'AA2459',
 'AS144',
 'NK647',
 'B61990',
 'US699',
 'AS114',
 'B6668',
 'UA1162',
 'B61030',
 'B6262',
 'B62134',
 'B6730',
 'B6768',
 'B62276',
 'US602',
 'AS695',
 'HA102',
 'OO5467',
 'HA108',
 'AS730',
 'HA206',
 'UA1500',
 'AA1323',
 'NK103',
 'OO7404',
 'OO7419',
 'OO5254',
 'US480',
 'AA1057',
 'AA2454',
 'EV4354',
 'US425',
 'AA89',
 'DL2099',
 'EV4685',
 'EV5583',
 'OO5484',
 'OO7370',
 'HA103',
 'EV5187',
 'MQ2859',
 'UA1201',
 'UA1221',
 'UA1607',
 'AA328',
 'EV5764',
 'F91279',
 'NK409',
 'OO5460',
 'OO6391',
 'UA1532',
 'UA1167',
 'UA1012',
 'B62

In [None]:
%%time
rdd_data_sample.filter(lambda row: row[4] == 'DL').collect()

CPU times: user 11.1 ms, sys: 2.15 ms, total: 13.2 ms
Wall time: 85.7 ms


[Row(YEAR='2015', MONTH='1', DAY='1', DAY_OF_WEEK='4', AIRLINE='DL', FLIGHT_NUMBER='806', TAIL_NUMBER='N3730B', ORIGIN_AIRPORT='SFO', DESTINATION_AIRPORT='MSP', SCHEDULED_DEPARTURE='0025', DEPARTURE_TIME='0020', DEPARTURE_DELAY='-5', TAXI_OUT='18', WHEELS_OFF='0038', SCHEDULED_TIME='217', ELAPSED_TIME='230', AIR_TIME='206', DISTANCE='1589', WHEELS_ON='0604', TAXI_IN='6', SCHEDULED_ARRIVAL='0602', ARRIVAL_TIME='0610', ARRIVAL_DELAY='8', DIVERTED='0', CANCELLED='0', CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR='2015', MONTH='1', DAY='1', DAY_OF_WEEK='4', AIRLINE='DL', FLIGHT_NUMBER='1173', TAIL_NUMBER='N826DN', ORIGIN_AIRPORT='LAS', DESTINATION_AIRPORT='ATL', SCHEDULED_DEPARTURE='0030', DEPARTURE_TIME='0033', DEPARTURE_DELAY='3', TAXI_OUT='12', WHEELS_OFF='0045', SCHEDULED_TIME='221', ELAPSED_TIME='203', AIR_TIME='186', DISTANCE='1747', WHEELS_ON='0651', TAXI_IN='5', SCHEDULED_ARRIVAL='0

In [None]:
%%time
rdd_data_sample.flatMap(lambda row: row[4]+row[5]).collect()

CPU times: user 11.3 ms, sys: 911 µs, total: 12.2 ms
Wall time: 109 ms


['A',
 'S',
 '9',
 '8',
 'A',
 'A',
 '2',
 '3',
 '3',
 '6',
 'U',
 'S',
 '8',
 '4',
 '0',
 'A',
 'A',
 '2',
 '5',
 '8',
 'A',
 'S',
 '1',
 '3',
 '5',
 'D',
 'L',
 '8',
 '0',
 '6',
 'N',
 'K',
 '6',
 '1',
 '2',
 'U',
 'S',
 '2',
 '0',
 '1',
 '3',
 'A',
 'A',
 '1',
 '1',
 '1',
 '2',
 'D',
 'L',
 '1',
 '1',
 '7',
 '3',
 'D',
 'L',
 '2',
 '3',
 '3',
 '6',
 'A',
 'A',
 '1',
 '6',
 '7',
 '4',
 'D',
 'L',
 '1',
 '4',
 '3',
 '4',
 'D',
 'L',
 '2',
 '3',
 '2',
 '4',
 'D',
 'L',
 '2',
 '4',
 '4',
 '0',
 'A',
 'S',
 '1',
 '0',
 '8',
 'D',
 'L',
 '1',
 '5',
 '6',
 '0',
 'U',
 'A',
 '1',
 '1',
 '9',
 '7',
 'A',
 'S',
 '1',
 '2',
 '2',
 'D',
 'L',
 '1',
 '6',
 '7',
 '0',
 'N',
 'K',
 '5',
 '2',
 '0',
 'A',
 'A',
 '3',
 '7',
 '1',
 'N',
 'K',
 '2',
 '1',
 '4',
 'A',
 'A',
 '1',
 '1',
 '5',
 'D',
 'L',
 '1',
 '4',
 '5',
 '0',
 'U',
 'A',
 '1',
 '5',
 '4',
 '5',
 'A',
 'S',
 '1',
 '3',
 '0',
 'N',
 'K',
 '5',
 '9',
 '7',
 'U',
 'S',
 '4',
 '1',
 '3',
 'A',
 'A',
 '2',
 '3',
 '9',
 '2',
 'N',
 'K',
 '1'

In [None]:
%%time
rdd_data_sample.map(lambda row: row[4])\
  .distinct()\
  .collect()

CPU times: user 28.6 ms, sys: 2.06 ms, total: 30.6 ms
Wall time: 636 ms


['AS',
 'AA',
 'US',
 'DL',
 'NK',
 'UA',
 'HA',
 'B6',
 'OO',
 'EV',
 'MQ',
 'F9',
 'WN',
 'VX']

In [None]:
%%time
rdd_data_sample.sample(False,0.01).collect()

CPU times: user 9.1 ms, sys: 0 ns, total: 9.1 ms
Wall time: 62.6 ms


[Row(YEAR='2015', MONTH='1', DAY='1', DAY_OF_WEEK='4', AIRLINE='OO', FLIGHT_NUMBER='6224', TAIL_NUMBER='N956SW', ORIGIN_AIRPORT='RDM', DESTINATION_AIRPORT='DEN', SCHEDULED_DEPARTURE='0550', DEPARTURE_TIME='0549', DEPARTURE_DELAY='-1', TAXI_OUT='12', WHEELS_OFF='0601', SCHEDULED_TIME='143', ELAPSED_TIME='150', AIR_TIME='130', DISTANCE='898', WHEELS_ON='0911', TAXI_IN='8', SCHEDULED_ARRIVAL='0913', ARRIVAL_TIME='0919', ARRIVAL_DELAY='6', DIVERTED='0', CANCELLED='0', CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR='2015', MONTH='1', DAY='1', DAY_OF_WEEK='4', AIRLINE='B6', FLIGHT_NUMBER='371', TAIL_NUMBER='N708JB', ORIGIN_AIRPORT='LGA', DESTINATION_AIRPORT='FLL', SCHEDULED_DEPARTURE='0600', DEPARTURE_TIME='0600', DEPARTURE_DELAY='0', TAXI_OUT='22', WHEELS_OFF='0622', SCHEDULED_TIME='183', ELAPSED_TIME='177', AIR_TIME='150', DISTANCE='1076', WHEELS_ON='0852', TAXI_IN='5', SCHEDULED_ARRIVAL='09

* .leftOuterJoin
* .intersection
* .repartition

Akcje RDD

* .take
* .takeSample
* .collect
* .reduce
* .count
* .saveAsTextFile
* .foreach

## Przygotowanie Dataframe z pełnym schematem danych

Poprzednio poszliśmy na skróty przypisując każdej ze zmiennych typ ciągu znaków. Tym razem zróbmy to porządnie

In [None]:
import pandas as pd
pd.set_option('display.max_columns', None)
df.limit(5).toPandas()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5,2354,-11,21,15,205,194,169,1448,404,4,430,408,-22,0,0,,,,,,
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,2,-8,12,14,280,279,263,2330,737,4,750,741,-9,0,0,,,,,,
2,2015,1,1,4,US,840,N171US,SFO,CLT,20,18,-2,16,34,286,293,266,2296,800,11,806,811,5,0,0,,,,,,
3,2015,1,1,4,AA,258,N3HYAA,LAX,MIA,20,15,-5,15,30,285,281,258,2342,748,8,805,756,-9,0,0,,,,,,
4,2015,1,1,4,AS,135,N527AS,SEA,ANC,25,24,-1,11,35,235,215,199,1448,254,5,320,259,-21,0,0,,,,,,


In [None]:
from pyspark.sql.types import StructType, StringType, IntegerType, BooleanType, FloatType, TimestampType, DateType, ArrayType, MapType
from typing import List, Tuple, Dict, Any
map_python_types_2_spark_types = {
    str : StringType(),
    int : IntegerType(),
    bool : BooleanType(),
    float: FloatType(),
    'timestamp' : TimestampType(),
    'date' : DateType(),
    List[str] : ArrayType(StringType()),
    Tuple[str] : ArrayType(StringType()),
    Dict[str, str] : MapType(StringType(), StringType())
}

column_type_collection = {
    int : ['YEAR', 'MONTH', 'DAY', 'DAY_OF_WEEK', 'DEPARTURE_DELAY', 'TAXI_OUT', 'SCHEDULED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'TAXI_IN', 'ARRIVAL_DELAY', 'DIVERTED', 'CANCELLED' ],
    str : ['AIRLINE', 'FLIGHT_NUMBER', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'WHEELS_OFF', 'WHEELS_ON', 
      'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME', 'CANCELLATION_REASON', 'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY'
    ]
}

map_column_names_2_types = {}

for pole in pola:
  for python_type, column_list in column_type_collection.items():
    if pole in column_list:
      map_column_names_2_types[pole] = map_python_types_2_spark_types[python_type]

print(map_column_names_2_types)



{'YEAR': IntegerType, 'MONTH': IntegerType, 'DAY': IntegerType, 'DAY_OF_WEEK': IntegerType, 'AIRLINE': StringType, 'FLIGHT_NUMBER': StringType, 'TAIL_NUMBER': StringType, 'ORIGIN_AIRPORT': StringType, 'DESTINATION_AIRPORT': StringType, 'SCHEDULED_DEPARTURE': StringType, 'DEPARTURE_TIME': StringType, 'DEPARTURE_DELAY': IntegerType, 'TAXI_OUT': IntegerType, 'WHEELS_OFF': StringType, 'SCHEDULED_TIME': IntegerType, 'ELAPSED_TIME': IntegerType, 'AIR_TIME': IntegerType, 'DISTANCE': IntegerType, 'WHEELS_ON': StringType, 'TAXI_IN': IntegerType, 'SCHEDULED_ARRIVAL': StringType, 'ARRIVAL_TIME': StringType, 'ARRIVAL_DELAY': IntegerType, 'DIVERTED': IntegerType, 'CANCELLED': IntegerType, 'CANCELLATION_REASON': StringType, 'AIR_SYSTEM_DELAY': StringType, 'SECURITY_DELAY': StringType, 'AIRLINE_DELAY': StringType, 'LATE_AIRCRAFT_DELAY': StringType, 'WEATHER_DELAY': StringType}


In [None]:
schemat = StructType()
for pole, typ in map_column_names_2_types.items():
    schemat = schemat.add(pole, typ, True)


In [None]:
flights = spark.read.format('csv').option("header", False).schema(schemat).load('/content/drive/MyDrive/flights.csv')
flights.show(5)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [None]:
flights.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: integer (nullable = 

In [None]:
flights.select(flights.DAY).distinct().show()

+---+
|DAY|
+---+
| 31|
| 28|
| 26|
| 27|
| 12|
| 22|
|  1|
| 13|
|  6|
| 16|
|  3|
| 20|
|  5|
| 19|
| 15|
|  9|
| 17|
|  4|
|  8|
| 23|
+---+
only showing top 20 rows



Wykonajmy jeszcze proste statystyki ze zbioru by zobaczyć poprawność jego wczytania

In [None]:
%%time
flights.count()

CPU times: user 33.9 ms, sys: 1.76 ms, total: 35.7 ms
Wall time: 4.11 s


5819079

In [None]:
%%time
for pole in ['YEAR', 'MONTH', 'DAY', 'AIRLINE', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'DIVERTED', 'CANCELLED']:
  print(f'Pole {pole}\n')
  print(flights.select(pole).distinct().sort(pole).toPandas())

Pole YEAR

   YEAR
0  2015
Pole MONTH

    MONTH
0       1
1       2
2       3
3       4
4       5
5       6
6       7
7       8
8       9
9      10
10     11
11     12
Pole DAY

    DAY
0     1
1     2
2     3
3     4
4     5
5     6
6     7
7     8
8     9
9    10
10   11
11   12
12   13
13   14
14   15
15   16
16   17
17   18
18   19
19   20
20   21
21   22
22   23
23   24
24   25
25   26
26   27
27   28
28   29
29   30
30   31
Pole AIRLINE

   AIRLINE
0       AA
1       AS
2       B6
3       DL
4       EV
5       F9
6       HA
7       MQ
8       NK
9       OO
10      UA
11      US
12      VX
13      WN
Pole TAIL_NUMBER

     TAIL_NUMBER
0           None
1          7819A
2          7820L
3         D942DN
4         N001AA
...          ...
4893      N997DL
4894      N998AT
4895      N998DL
4896      N999DN
4897      N9EAMQ

[4898 rows x 1 columns]
Pole ORIGIN_AIRPORT

    ORIGIN_AIRPORT
0            10135
1            10136
2            10140
3            10141
4            10146
..  

In [None]:
flights.createOrReplaceTempView("flights")

## Ćwiczenia 

Zadanie w ćwiczeniu polegać będą na stworzeniu konwersji zapytania Spark-SQL na składnię pyspark

In [None]:
spark.sql('select count(*) from flights where cancelled = 1').toPandas()

Unnamed: 0,count(1)
0,89884


In [None]:
spark.sql('select count(*) from flights where diverted = 1').toPandas()

Unnamed: 0,count(1)
0,15187


In [None]:
spark.sql('select count(*) from flights where cancelled = 1 and diverted = 1').toPandas()

Unnamed: 0,count(1)
0,0


In [None]:
spark.sql('select avg(distance) from flights').toPandas()

Unnamed: 0,avg(distance)
0,822.356495


In [None]:
spark.sql('select min(DEPARTURE_DELAY), max(DEPARTURE_DELAY) from flights').toPandas()

Unnamed: 0,min(DEPARTURE_DELAY),max(DEPARTURE_DELAY)
0,-82,1988


In [None]:
spark.sql('select min(ARRIVAL_DELAY), max(ARRIVAL_DELAY) from flights').toPandas()

Unnamed: 0,min(ARRIVAL_DELAY),max(ARRIVAL_DELAY)
0,-87,1971


## Mieszanie składni SparkSQL i pyspark oraz praca z wieloma zbiorami danych

Zagadnienie związane z tym która ze składni jest bardziej odpowiednia to dyskusja która nie jako toczy się od początku istnienia jej dualizmu w pyspark.
Należy mieć na uwadze, że niezależnie od tego na który ze sposobów wyrazimy swoją kwerendę - spark i tak przetłumaczy ją na ciąg operacji na RDD i odpowiednio skolejkuje. Oznacza, że w każdej niemal sytuacji zadanie kwerendy poprzez formułę SQL oraz pysparka oznacza wykonanie tych samych operacji. Pozostawia to miejsce na używanie tej składni według preferencji użytkownika. Ale jednak każda z tych składni wnosi pewien narzut.

Za składnią SQL przemawiają następujące argumenty:

* Jest prostsza i często dużo łatwiej jest napisać kwerendę,
* Zyskujemy na czasie pisania kwerendy.

Za składnią pysparka natomiast:

* zapisywanie kwerend w postaci funkcji pysparka pozwala lepiej zrozumieć kolejność operacji i dbałość o zmniejszanie obciążenia,
* mamy możliwość reagowania na różnych etapach działania. Możemy tworzyć naszą kwerendę etapami i obserwować jej rozwój.
* jeśli mamy wiele różnych zaawansowanych kwerend o wspólnej bazie to szybciej napiszemy je w pyspark
* pyspark pozwala nam dużo wygodniej zarządzać tworzeniem kolumn



## Praca z tworzeniem kolumn w pyspark

Załóżmy, że dla naszych danych chcemy teraz utworzyć kolumną z wyliczonym na podstawie czasu przylotu i odlotu czasem. Pamiętamy, że część kolumn nie udała się zrzutować w czasie ładowania. Mamy zatem w bazie ciągi znaków jak 0815 odpowiadające godzinie 8:15, oraz inne które odpowiadają datom w sposób zrzutowany

In [None]:
time_flights = df.select('year', 'month', 'day', 'airline', 'flight_number', 'tail_number', 'scheduled_departure', 'scheduled_time', 'scheduled_arrival')
time_flights.limit(5).toPandas()

Unnamed: 0,year,month,day,airline,flight_number,tail_number,scheduled_departure,scheduled_time,scheduled_arrival
0,2015,1,1,AS,98,N407AS,5,205,430
1,2015,1,1,AA,2336,N3KUAA,10,280,750
2,2015,1,1,US,840,N171US,20,286,806
3,2015,1,1,AA,258,N3HYAA,20,285,805
4,2015,1,1,AS,135,N527AS,25,235,320


In [None]:
from pyspark.sql.functions import substring, col, expr
time_flights2 = time_flights.withColumn('scheduled_departure_in_hours_str', substring('scheduled_departure',1,2))
time_flights2 = time_flights2.withColumn('scheduled_departure_in_hours', col('scheduled_departure_in_hours_str').cast('integer'))
time_flights2 = time_flights2.withColumn('scheduled_departure_in_minutes_str', substring('scheduled_departure',3,4))
time_flights2 = time_flights2.withColumn('scheduled_departure_in_minutes', col('scheduled_departure_in_minutes_str').cast('integer'))
time_flights2 = time_flights2.withColumn('scheduled_departure_in_minutes_from_midnight',expr('scheduled_departure_in_hours*60 + scheduled_departure_in_minutes '))
time_flights2 = time_flights2.drop(
    'scheduled_departure_in_hours_str','scheduled_departure_in_minutes_str', 'scheduled_departure_in_minutes','scheduled_departure_in_hours')
time_flights2.limit(60).toPandas()

Unnamed: 0,year,month,day,airline,flight_number,tail_number,scheduled_departure,scheduled_time,scheduled_arrival,scheduled_departure_in_minutes_from_midnight
0,2015,1,1,AS,98,N407AS,5,205,430,5
1,2015,1,1,AA,2336,N3KUAA,10,280,750,10
2,2015,1,1,US,840,N171US,20,286,806,20
3,2015,1,1,AA,258,N3HYAA,20,285,805,20
4,2015,1,1,AS,135,N527AS,25,235,320,25
5,2015,1,1,DL,806,N3730B,25,217,602,25
6,2015,1,1,NK,612,N635NK,25,181,526,25
7,2015,1,1,US,2013,N584UW,30,273,803,30
8,2015,1,1,AA,1112,N3LAAA,30,195,545,30
9,2015,1,1,DL,1173,N826DN,30,221,711,30


In [None]:
time_flights3 = time_flights2.withColumn('scheduled_arrival_in_hours_str', substring('scheduled_arrival',1,2))
time_flights3 = time_flights3.withColumn('scheduled_arrival_in_hours', col('scheduled_arrival_in_hours_str').cast('integer'))
time_flights3 = time_flights3.withColumn('scheduled_arrival_in_minutes_str', substring('scheduled_arrival',3,4))
time_flights3 = time_flights3.withColumn('scheduled_arrival_in_minutes', col('scheduled_arrival_in_minutes_str').cast('integer'))
time_flights3 = time_flights3.withColumn('scheduled_arrival_in_minutes_from_midnight',expr('scheduled_arrival_in_hours*60 + scheduled_arrival_in_minutes '))
time_flights3 = time_flights3.drop(
    'scheduled_arrival_in_hours_str','scheduled_arrival_in_minutes_str', 'scheduled_arrival_in_minutes','scheduled_arrival_in_hours')
time_flights3.limit(60).toPandas()

Unnamed: 0,year,month,day,airline,flight_number,tail_number,scheduled_departure,scheduled_time,scheduled_arrival,scheduled_departure_in_minutes_from_midnight,scheduled_arrival_in_minutes_from_midnight
0,2015,1,1,AS,98,N407AS,5,205,430,5,270
1,2015,1,1,AA,2336,N3KUAA,10,280,750,10,470
2,2015,1,1,US,840,N171US,20,286,806,20,486
3,2015,1,1,AA,258,N3HYAA,20,285,805,20,485
4,2015,1,1,AS,135,N527AS,25,235,320,25,200
5,2015,1,1,DL,806,N3730B,25,217,602,25,362
6,2015,1,1,NK,612,N635NK,25,181,526,25,326
7,2015,1,1,US,2013,N584UW,30,273,803,30,483
8,2015,1,1,AA,1112,N3LAAA,30,195,545,30,345
9,2015,1,1,DL,1173,N826DN,30,221,711,30,431


In [None]:
time_flights3.where('scheduled_departure_in_minutes_from_midnight > scheduled_arrival_in_minutes_from_midnight').toPandas()

Unnamed: 0,year,month,day,airline,flight_number,tail_number,scheduled_departure,scheduled_time,scheduled_arrival,scheduled_departure_in_minutes_from_midnight,scheduled_arrival_in_minutes_from_midnight
0,2015,1,1,EV,5764,N832AS,0530,55,0525,330,325
1,2015,1,1,EV,5234,N884AS,0908,53,0901,548,541
2,2015,1,1,DL,1224,N981AT,1030,56,1026,630,626
3,2015,1,1,EV,5183,N853AS,1120,52,1112,680,672
4,2015,1,1,EV,5692,N834AS,1228,55,1223,748,743
...,...,...,...,...,...,...,...,...,...,...,...
183469,2015,12,31,B6,688,N657JB,2359,320,0819,1439,499
183470,2015,12,31,B6,745,N828JB,2359,227,0446,1439,286
183471,2015,12,31,B6,1503,N913JB,2359,221,0440,1439,280
183472,2015,12,31,B6,333,N527JB,2359,161,0340,1439,220


In [None]:
time_flights4 = time_flights3.withColumn('flight_time_diff',expr('scheduled_departure_in_minutes_from_midnight+scheduled_time-scheduled_arrival_in_minutes_from_midnight'))
time_flights4.select('flight_time_diff').distinct().sort('flight_time_diff').toPandas()

Unnamed: 0,flight_time_diff
0,
1,-240.0
2,-180.0
3,-150.0
4,-120.0
5,-105.0
6,-78.0
7,-60.0
8,-45.0
9,-43.0


In [None]:
time_flights4.where('flight_time_diff >= 1000').select('scheduled_time', 'flight_time_diff', 'scheduled_departure', 'scheduled_arrival').limit(100).toPandas()

Unnamed: 0,scheduled_time,flight_time_diff,scheduled_departure,scheduled_arrival
0,314,1260.0,1559,0013
1,313,1260.0,1600,0013
2,323,1260.0,1605,0028
3,314,1260.0,1620,0034
4,283,1260.0,1622,0005
...,...,...,...,...
95,229,1500.0,2115,0004
96,195,1380.0,2117,0132
97,217,1440.0,2118,0055
98,330,1320.0,2119,0449


In [None]:
time_flights4.where('flight_time_diff < 1000').select('scheduled_time', 'flight_time_diff', 'scheduled_departure', 'scheduled_arrival').limit(100).toPandas()

Unnamed: 0,scheduled_time,flight_time_diff,scheduled_departure,scheduled_arrival
0,205,-60.0,0005,0430
1,280,-180.0,0010,0750
2,286,-180.0,0020,0806
3,285,-180.0,0020,0805
4,235,60.0,0025,0320
...,...,...,...,...
95,88,0.0,0535,0703
96,85,0.0,0535,0700
97,225,-60.0,0535,1020
98,61,0.0,0535,0636


## Tworzenie zmiennych okienkowych

Troszeczkę inaczej wygląda zadanie w którym chcemy przeprowadzić jakieś obliczenia związane ze zmiennymi reprezentowanymi przez tak zwane okna. Okna to przedziały (z reguły czasu) w których obliczane są statystyki dla poszczególnych elementów celem składowania ich w danych.

Wiemy np. niektóre kierunki są popularniejsze od innych. Możemy spróbować pogrupować nasze dane tak aby zobaczyć na które lotniska w ostatnim przedziale czasowym przyleciało najwięcej samolotów. Kiedy chcemy aby dotyczyło to pojedynczego dnia zadanie jest proste.

In [None]:
popular_airports = flights.groupBy('Year', 'Month', 'Day', 'Destination_airport').count().orderBy(col('count').desc())
popular_airports.limit(20).toPandas()

Unnamed: 0,Year,Month,Day,Destination_airport,count
0,2015,11,29,ATL,1165
1,2015,8,14,ATL,1143
2,2015,8,7,ATL,1143
3,2015,8,17,ATL,1142
4,2015,7,27,ATL,1141
5,2015,7,31,ATL,1141
6,2015,8,3,ATL,1141
7,2015,7,6,ATL,1141
8,2015,8,10,ATL,1141
9,2015,7,10,ATL,1141


In [None]:
flights.where('Month = 11 and day = 29 and destination_airport = "ATL" ').toPandas()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
0,2015,11,29,7,AA,1675,N557UW,CLT,ATL,0025,0019,-6.0,12.0,0031,68,60.0,41.0,226,0112,7.0,0133,0119,-14.0,0,0,,,,,,
1,2015,11,29,7,AA,774,N542UW,PHX,ATL,0040,0048,8.0,14.0,0102,205,199.0,180.0,1587,0602,5.0,0605,0607,2.0,0,0,,,,,,
2,2015,11,29,7,F9,1456,N220FR,LAS,ATL,0040,0043,3.0,17.0,0100,225,209.0,185.0,1747,0705,7.0,0725,0712,-13.0,0,0,,,,,,
3,2015,11,29,7,DL,1646,N557NW,PHX,ATL,0055,0053,-2.0,12.0,0105,202,180.0,165.0,1587,0550,3.0,0617,0553,-24.0,0,0,,,,,,
4,2015,11,29,7,DL,2324,N838DN,SLC,ATL,0059,0055,-4.0,23.0,0118,202,201.0,173.0,1590,0611,5.0,0621,0616,-5.0,0,0,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1160,2015,11,29,7,DL,1254,N1610D,LAX,ATL,2340,2343,3.0,31.0,0014,249,234.0,199.0,1947,0633,4.0,0649,0637,-12.0,0,0,,,,,,
1161,2015,11,29,7,DL,2258,N829DN,SMF,ATL,2352,2358,6.0,12.0,0010,251,238.0,219.0,2092,0649,7.0,0703,0656,-7.0,0,0,,,,,,
1162,2015,11,29,7,F9,420,N920FR,DEN,ATL,2355,0028,33.0,37.0,0105,170,178.0,134.0,1199,0519,7.0,0445,0526,41.0,0,0,,8,0,33,0,0
1163,2015,11,29,7,DL,2446,N343NW,SFO,ATL,2359,2354,-5.0,19.0,0013,273,255.0,230.0,2139,0703,6.0,0732,0709,-23.0,0,0,,,,,,


Troche trudniej sprawa wygląda jeśli podsumowanie ma dotyczyć np. ostatniego tygodnia.

Najpierw wygenerujemy tabelę dziennych lotów na poszczególne dni

In [None]:
popular_airports = flights.groupBy('Year', 'Month', 'Day', 'Destination_airport').count().orderBy(col('count').desc())
popular_airports.limit(10).toPandas()

Unnamed: 0,Year,Month,Day,Destination_airport,count
0,2015,11,29,ATL,1165
1,2015,8,14,ATL,1143
2,2015,8,7,ATL,1143
3,2015,8,17,ATL,1142
4,2015,7,31,ATL,1141
5,2015,7,10,ATL,1141
6,2015,7,27,ATL,1141
7,2015,8,10,ATL,1141
8,2015,7,6,ATL,1141
9,2015,8,3,ATL,1141


Dalej musimy ją przetworzyć tak aby policzyć większe okna czasowe

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import concat_ws, to_timestamp, to_date, rank, count, sum

days = lambda x: x * 86400

destinations = popular_airports
destinations = destinations.withColumn('Year_str', col('Year').cast('string') )
destinations = destinations.withColumn('Month_str', col('Month').cast('string') )
destinations = destinations.withColumn('Day_str', col('Day').cast('string') )
destinations = destinations.withColumn('Date_str', concat_ws('-', 'Day_str', 'Month_str', 'Year_str') )
destinations = destinations.withColumn('Date', to_timestamp('Date_str', 'd-M-yyyy' ))
destinations = destinations.drop( 'Year_str', 'Month_str', 'Day_str', 'Date_str') #'Year','Month','Day',
windSpec = Window.partitionBy('Destination_Airport').orderBy(col('Date').cast('long')).rangeBetween(-days(7),0)
destinations = destinations.withColumn('Cumulative_flights', sum(col('count')).over(windSpec))
destinations = destinations.drop('count')
windSpec2 = Window.partitionBy('Date').orderBy(col('Cumulative_flights').desc())
destinations = destinations.withColumn('Rank' , rank().over(windSpec2))
destinations = destinations.drop('Date', 'Cumulative_flights')
destinations.limit(100).toPandas()

Unnamed: 0,Year,Month,Day,Destination_airport,Rank
0,2015,4,26,ATL,1
1,2015,4,26,ORD,2
2,2015,4,26,DFW,3
3,2015,4,26,DEN,4
4,2015,4,26,LAX,5
...,...,...,...,...,...
95,2015,4,26,ALB,95
96,2015,4,26,FAT,97
97,2015,4,26,TYS,97
98,2015,4,26,PNS,99


In [None]:
%%time
print(f'flights size ')
flights = flights.alias("left").join(destinations.alias('right'), (flights.YEAR == destinations.Year)  & (flights.MONTH == destinations.Month) 
& (flights.DAY == destinations.Day)  & (flights.DESTINATION_AIRPORT == destinations.Destination_airport)).select("left.*", 'right.rank') # 

flights size 
CPU times: user 10.5 ms, sys: 803 µs, total: 11.3 ms
Wall time: 111 ms


In [None]:
flights.limit(5).toPandas()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,rank
0,2015,1,1,4,EV,4872,N850AS,ATL,AVL,1100,1103,3,26,1129,53,59,30,164,1159,3,1153,1202,9,0,0,,,,,,,165
1,2015,1,1,4,OO,5353,N963SW,ORD,AVL,1311,1320,9,20,1340,104,96,72,536,1552,4,1555,1556,1,0,0,,,,,,,165
2,2015,1,1,4,EV,4175,N13132,EWR,AVL,1320,1316,-4,10,1326,119,113,97,583,1503,6,1519,1509,-10,0,0,,,,,,,165
3,2015,1,1,4,DL,1729,N935AT,ATL,AVL,1505,1502,-3,14,1516,54,51,33,164,1549,4,1559,1553,-6,0,0,,,,,,,165
4,2015,1,1,4,OO,5329,N963SW,ORD,AVL,1800,1848,48,13,1901,104,87,71,536,2112,3,2044,2115,31,0,0,,0.0,0.0,19.0,12.0,0.0,165


Można jeszcze użyć operacji `withColumnRenamed` do uporządkowania nazw kolumn

# Ćwiczenie warsztatowe

## Zadanie 1 

Utworzyć podzbiór zawierający tylko loty, które się odbyły. Znaleźć lot najkrótszy oraz najdłuższy.

## Zadanie 2

Wyszukać liczbę przewozników w danych i znaleźć łączną liczbę lotów wykonanych przez każdego z nich

## Zadanie 3

Dla każdej trasy (Lotnisko początkowe -> Lotnisko końcowe) znaleźć minimalny, przeciętny i maksymalny (rzeczywisty) czas przelotu

## Zadanie 4

Utworzyć nową kolumnę opisującą trasę (Lotnisko początkowe -> Lotnisko końcowe). Następnie sprawdzić czy w danych podanych jest zgodna odległość jego łączące

## Zadanie 5

Wygenerować tabelę z popularnością poszczególnych przewoźników na podstawie całego zbioru danych, wygenerować ich ranking i dołączyć go do danych flights jako kolumnę AIRLINE_RANK.
