<a href="https://colab.research.google.com/github/PiotrMaciejKowalski/BigData2022-initial-project/blob/main/colabs/Wprowadzenie_do_Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup Sparka

## Utworzenie środowiska pyspark do obliczeń

Tworzymy swoje środowisko z pysparkiem we wenętrzu naszych zasobów chmurowych

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

In [11]:
!tar xf spark-3.3.1-bin-hadoop3.tgz

In [14]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [15]:
!pip install -q findspark

import findspark
findspark.init()

## Utworzenie sesji z pyspark


Utworzymy testowo sesję aby zobaczyć czy działa. Element ten jest wspólny również gdy systemy sparkowe pracują w sposób ciągły, a nie są tworzone przez naszą sesję.

In [16]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

## Apache Spark 

**Apache Spark** to zunifikowany silnik do obliczeń rozproszonych na licencji open-source. Oferuje interfejs pozwalający na programowanie obliczeń na klastrach z domyślną paralelizacją oraz odpornością na awarie.

Ze Sparkiem pracować można w Scali, Pythonie, Javie oraz R.

Jego przewaga nad model Map-Reduce Hadoopa polega na unikaniu zapisów na hdfs tak długo jak to możliwe - i posługiwaniu się RAMem nodów jak długo go wystarcza.

**Komponenty Sparka:**

<img src="https://raw.githubusercontent.com/PiotrMaciejKowalski/kurs-analiza-danych-2022/main/Tydzie%C5%84%206/cluster-overview.png" alt="title" width="500"/>

* Spark "core" - podstawa Sparka z podstawową abstrakcją danych nazywaną RDD
* Spark SQL - komponent pozwalający na operowanie na ustrukturyzowanych danych z wykorzystaniem operacji znanych z SQL - łatwy w użyciu
* Spark MLlib - komponent zawierający algorytmy ML dostępne w Sparku - ML na skalę klastrów
* Spark Streaming - moduł pozwalający na pracę ze strumnieniami danych
* Spark GraphX - komponent do pracy z grafami

**Architektura Sparka:**

<img src="https://raw.githubusercontent.com/PiotrMaciejKowalski/kurs-analiza-danych-2022/main/Tydzie%C5%84%206/spark-stack.png" alt="title" width="500"/>

* driver - proces uruchamiający główną funkcję aplikacji i tworzący SparkContext
* executor(y) - proces uruchomiony dla aplikacji w węźle roboczym (worker node), który uruchamia zadania i przechowuje dane w pamięci lub na dysku. Każda aplikacja ma własne executory
* cluster manager - dostępne opcje: YARN, Mesos, Kubernetes, Standalone

**SparkSession:**
* wprowadzony w Spark 2.0
* składa się ze SparkContextu, SQLContextu oraz HiveContext
* zwykle nazywany w kodzie `spark`
* kroki niezbędne do utworzenia SparkSession w pySparku:

> from pyspark.sql import SparkSession  
> spark = SparkSession.builder.getOrCreate()

## RDD

Podstawowym formatem danych (coś jak tabela w db) jest RDD. Skrót rozwija się następująco:
* R - resilient (elastyczny)
* D - distributed
* D - dataset

Model RDD jest napisany w sposób wspierający przekstrzałcenia Map-Reduce jako domyślny. W związku z powyższym wykazuje się następującymi własnościami:


* immutable - każdy obiekt jest niezmienniczy. Chcesz coś zmienić - musisz utworzyć nowy rdd
* in-memory - przetwarzany głównie w RAM
* lazy evaluated - silnik obliczeniowy wykonuje obliczenia dopiero gdy okażą się konieczne.
* parallel - współbieżny 

Z RDD stowarzyszone są dwa rodzaje czynności:
* akcje, oraz
* transformacje

### Transformacje 

Modelują czynności jakie możemy chcieć wykonywać na danych. Przekształcenia (map), redukcje (reduce), filtry (filter). Mają charakter opisu skąd się biorą pewne wartości. W naszym ujęciu mogą odpowiadać funkcjom mapper, reducer i podobnym. 

_Dla osób, które kojarzą paradygmat funkcyjny programowania - można dodać, że transformacje dotyczą funkcji czystych._

### Akcje

Modelują czynności z uwagi na wynik jaki oczekujemy. Wyświetl, zapisz, wyszukaj. Mają charakter silnie połączony z wynikiem działania.

Aby obliczenia na danych zostały wykonane - musi zostać uruchomiona akcja. Dopiero ona wykona odpowiednie (i tylko te konieczne) transformacje.

## DataFrame

Choć RDD są wszędzie w Sparku, obecnie już się ich nie widzi. Od Sparka w wersji 2.0 zostały przesłonięte nowym interfejsem (zostały spakowane do wnętrza) czegoś nazywanego Ramką Danych (Dataframe). Skojarzenie z dataframe z R lub Pandas Python jest tutaj bardzo naturalne i prawdziwe. DataFrame Sparka były na nich wzorowane i pokrywają się w dużym obszarze składni.

**DataFrame:**
* abstrakcja danych z modułu Spark SQL
* zawiera dodatkowe informacje o strukturze danych (schema)
* pozwala na pracę z danymi wykorzysując zapytania znane z SQL/Hive

Dalej zaprezentujemy jak to się odbywa w praktyce

## Podłączenie Google Drive do sesji colab

In [17]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: ignored

# Przykładowy processing danych w Spark

## Wczytanie danych do Sparka

W tej części wczytamy sobie nasz plik `flights.csv` do przetwarzania w Spark.

Z uwagi na to, że nasz plik to csv bez nagłówka - trzeba zdefiniować schemat dla danych, które przetwarzamy

Przypomnijmy listę pól
YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,\
DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,\
TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,\
TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,\
CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,\
LATE_AIRCRAFT_DELAY,WEATHER_DELAY

_Sporo ich. Więc najpierw trochę magii (bo nie chce mi się kodować każdego pola ręcznie, a jestem leniwy). Ufam, że przykład pozwoli rozszerzyć zastosowanie do bardziej skomplikowanych zastosowań. Na pocieszenie dodam, że wczytywanie csv bez nagłówka to najgorszy scenariusz w wersji wczytywania w sparku._

In [None]:
pola_zbiorczo = '''YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY'''
pola = pola_zbiorczo.split(',')

A dalej użyjemy funkcji add(Nazwa, Typ pola, czy może być null) do zapisania prostego schematu danych

In [None]:
from pyspark.sql.types import StructType, StringType

schemat = StructType()
for pole in pola:
    schemat = schemat.add(pole, StringType(), True)

Przejdźmy do wczytywania

In [None]:
df = spark.read.format('csv').option("header", False).schema(schemat).load('/content/drive/MyDrive/flights.csv')
df.show(5)

In [None]:
df.columns

In [None]:
df['YEAR','MONTH','DAY','AIRLINE','DISTANCE'].show()

In [None]:
df.printSchema()

In [None]:
%%time
df.count()

In [None]:
%%time
df.show(15)

In [None]:
%%time
df.show(1015)

## Zapytania Spark-SQL

Zapytania do Sparka kierowane są za pomocą 

* składni a.k.a. SQL, lub
* wyrażone w ORM (object relational mapping) czyli obiektowym sposobie na wyrażanie kwerend.

Składniowo wydaje się, że zapytania SQL są łatwiejsze do zapisania. W kilku przypadkach jednak jawne zadanie kolejności obliczeń może pomóc zoptymalizować kształt zapytania.

Zaprezentuje kilka podstawowych sposób na odpytywanie Spark DataFrame kwerendami o różnych naturach. Zawsze podane obe będą w postaci SparkSQL oraz wyrażenia ORM.

### Proste kwerendy

Zanim zacznimy pisać kwerendy należy jeszcze dodać nasz DataFrame do 'przestrzeni nazw tabel' Sparka. Formalnie nazywane jest to widokiem danych 

In [None]:
df.createOrReplaceTempView("df")

Wykonajmy prostego Selecta z tego zbioru.

Przypominam, że do uruchomienia sparka potrzebna jest akcja. Np. taki `show()`

In [None]:
spark.sql('select airline, distance from df').show()

In [None]:
df.select('airline','distance').show()

### Sprawniejsze wyświetlanie danych sparkowych

In [None]:
df.select('airline','distance').limit(20).toPandas()

## Proste grupowania i agregacje

Dalej proste pogrupowanie z polem poddanym agregacji.

In [None]:
%%time
spark.sql('select airline, count(*) as count from df group by airline').show()

In [None]:
%%time
df.groupBy('airline').count().show()

## Klauzala sortująca

Możemy dane uporządkować względem kolumny

In [None]:
%%time
spark.sql('select airline, count(*) as count from df group by airline order by count').show()

In [None]:
%%time
df.groupBy('airline').count().orderBy('count').show()

Możemy zmienić funkcje agregacji na mniej oczywistą lub zadać ich więcej.


In [None]:
%%time
spark.sql('select airline, max(distance) as maks, min(distance) as min from df group by airline').show()

In [None]:
%%time 
from pyspark.sql import functions as sf #spark functions

df.groupBy('airline').agg(sf.max('distance').alias('maks'), sf.min('distance').alias('min')).show()

## Filtrowanie danych

Bardzo ważne jest oczywiście odflitrowanie części dużego zbioru danych. 

_Uwaga pamiętajmy, że leniwie wczytując plik skazałem wszystkie pola na bycie Stringami._

In [None]:
%%time
spark.sql('select airline, count(*) as count from df  where day_of_week = "2" group by airline').show()

In [None]:
%%time 
df.where('day_of_week = "2"').groupBy('airline').count().show()

# Zaawansowany preprocessing danych w Spark

Aby dokładniej poznać możliwości oferowane przez Sparka rozbudujemy przejrzymy listę operacji i sposoby ich wykorzystywania ponownie i dokładniej.

## Perspektywa RDD

RDD jest originalnym interfejsem dostępu do danych w Sparku. I w kryzysowych sytuacjach również z niego można korzystać. W nich (w odróżnieniu do DataFrame) dane składowane są bez porządku jaki dostarcza schemat danych. 

In [None]:
# tworzenie RDD ręcznie
%%time
sc = spark.sparkContext

data = sc.parallelize(['A', 'B', 'C'])

In [None]:
%%time
data.collect()

Aby uzyskać dostęp do danych RDD można wydobyć je z DataFrame

In [None]:
rdd_data = df.rdd
rdd_data_sample = sc.parallelize(df.rdd.take(1000))

In [None]:
%%time 
rdd_data.take(5)

In [None]:
%%time
rdd_data_sample.map(lambda row: row[4]+row[5]).collect()

In [None]:
%%time
rdd_data_sample.filter(lambda row: row[4] == 'DL').collect()

In [None]:
%%time
rdd_data_sample.flatMap(lambda row: row[4]+row[5]).collect()

In [None]:
%%time
rdd_data_sample.map(lambda row: row[4])\
  .distinct()\
  .collect()

In [None]:
%%time
rdd_data_sample.sample(False,0.01).collect()

* .leftOuterJoin
* .intersection
* .repartition

Akcje RDD

* .take
* .takeSample
* .collect
* .reduce
* .count
* .saveAsTextFile
* .foreach

## Przygotowanie Dataframe z pełnym schematem danych

Poprzednio poszliśmy na skróty przypisując każdej ze zmiennych typ ciągu znaków. Tym razem zróbmy to porządnie

In [None]:
import pandas as pd
pd.set_option('display.max_columns', None)
df.limit(5).toPandas()

In [None]:
from pyspark.sql.types import StructType, StringType, IntegerType, BooleanType, FloatType, TimestampType, DateType, ArrayType, MapType
from typing import List, Tuple, Dict, Any
map_python_types_2_spark_types = {
    str : StringType(),
    int : IntegerType(),
    bool : BooleanType(),
    float: FloatType(),
    'timestamp' : TimestampType(),
    'date' : DateType(),
    List[str] : ArrayType(StringType()),
    Tuple[str] : ArrayType(StringType()),
    Dict[str, str] : MapType(StringType(), StringType())
}

column_type_collection = {
    int : ['YEAR', 'MONTH', 'DAY', 'DAY_OF_WEEK', 'DEPARTURE_DELAY', 'TAXI_OUT', 'SCHEDULED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'TAXI_IN', 'ARRIVAL_DELAY', 'DIVERTED', 'CANCELLED' ],
    str : ['AIRLINE', 'FLIGHT_NUMBER', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'WHEELS_OFF', 'WHEELS_ON', 
      'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME', 'CANCELLATION_REASON', 'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY'
    ]
}

map_column_names_2_types = {}

for pole in pola:
  for python_type, column_list in column_type_collection.items():
    if pole in column_list:
      map_column_names_2_types[pole] = map_python_types_2_spark_types[python_type]

print(map_column_names_2_types)



In [None]:
schemat = StructType()
for pole, typ in map_column_names_2_types.items():
    schemat = schemat.add(pole, typ, True)


In [None]:
flights = spark.read.format('csv').option("header", False).schema(schemat).load('/content/drive/MyDrive/flights.csv')
flights.show(5)

In [None]:
flights.printSchema()

In [None]:
flights.select(flights.DAY).distinct().show()

Wykonajmy jeszcze proste statystyki ze zbioru by zobaczyć poprawność jego wczytania

In [None]:
%%time
flights.count()

In [None]:
%%time
for pole in ['YEAR', 'MONTH', 'DAY', 'AIRLINE', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'DIVERTED', 'CANCELLED']:
  print(f'Pole {pole}\n')
  print(flights.select(pole).distinct().sort(pole).toPandas())

In [None]:
flights.createOrReplaceTempView("flights")

## Ćwiczenia 

Zadanie w ćwiczeniu polegać będą na stworzeniu konwersji zapytania Spark-SQL na składnię pyspark

In [None]:
spark.sql('select count(*) from flights where cancelled = 1').toPandas()

In [None]:
spark.sql('select count(*) from flights where diverted = 1').toPandas()

In [None]:
spark.sql('select count(*) from flights where cancelled = 1 and diverted = 1').toPandas()

In [None]:
spark.sql('select avg(distance) from flights').toPandas()

In [None]:
spark.sql('select min(DEPARTURE_DELAY), max(DEPARTURE_DELAY) from flights').toPandas()

In [None]:
spark.sql('select min(ARRIVAL_DELAY), max(ARRIVAL_DELAY) from flights').toPandas()

## Mieszanie składni SparkSQL i pyspark oraz praca z wieloma zbiorami danych

Zagadnienie związane z tym która ze składni jest bardziej odpowiednia to dyskusja która nie jako toczy się od początku istnienia jej dualizmu w pyspark.
Należy mieć na uwadze, że niezależnie od tego na który ze sposobów wyrazimy swoją kwerendę - spark i tak przetłumaczy ją na ciąg operacji na RDD i odpowiednio skolejkuje. Oznacza, że w każdej niemal sytuacji zadanie kwerendy poprzez formułę SQL oraz pysparka oznacza wykonanie tych samych operacji. Pozostawia to miejsce na używanie tej składni według preferencji użytkownika. Ale jednak każda z tych składni wnosi pewien narzut.

Za składnią SQL przemawiają następujące argumenty:

* Jest prostsza i często dużo łatwiej jest napisać kwerendę,
* Zyskujemy na czasie pisania kwerendy.

Za składnią pysparka natomiast:

* zapisywanie kwerend w postaci funkcji pysparka pozwala lepiej zrozumieć kolejność operacji i dbałość o zmniejszanie obciążenia,
* mamy możliwość reagowania na różnych etapach działania. Możemy tworzyć naszą kwerendę etapami i obserwować jej rozwój.
* jeśli mamy wiele różnych zaawansowanych kwerend o wspólnej bazie to szybciej napiszemy je w pyspark
* pyspark pozwala nam dużo wygodniej zarządzać tworzeniem kolumn



## Praca z tworzeniem kolumn w pyspark

Załóżmy, że dla naszych danych chcemy teraz utworzyć kolumną z wyliczonym na podstawie czasu przylotu i odlotu czasem. Pamiętamy, że część kolumn nie udała się zrzutować w czasie ładowania. Mamy zatem w bazie ciągi znaków jak 0815 odpowiadające godzinie 8:15, oraz inne które odpowiadają datom w sposób zrzutowany

In [None]:
time_flights = df.select('year', 'month', 'day', 'airline', 'flight_number', 'tail_number', 'scheduled_departure', 'scheduled_time', 'scheduled_arrival')
time_flights.limit(5).toPandas()

In [None]:
from pyspark.sql.functions import substring, col, expr
time_flights2 = time_flights.withColumn('scheduled_departure_in_hours_str', substring('scheduled_departure',1,2))
time_flights2 = time_flights2.withColumn('scheduled_departure_in_hours', col('scheduled_departure_in_hours_str').cast('integer'))
time_flights2 = time_flights2.withColumn('scheduled_departure_in_minutes_str', substring('scheduled_departure',3,4))
time_flights2 = time_flights2.withColumn('scheduled_departure_in_minutes', col('scheduled_departure_in_minutes_str').cast('integer'))
time_flights2 = time_flights2.withColumn('scheduled_departure_in_minutes_from_midnight',expr('scheduled_departure_in_hours*60 + scheduled_departure_in_minutes '))
time_flights2 = time_flights2.drop(
    'scheduled_departure_in_hours_str','scheduled_departure_in_minutes_str', 'scheduled_departure_in_minutes','scheduled_departure_in_hours')
time_flights2.limit(60).toPandas()

In [None]:
time_flights3 = time_flights2.withColumn('scheduled_arrival_in_hours_str', substring('scheduled_arrival',1,2))
time_flights3 = time_flights3.withColumn('scheduled_arrival_in_hours', col('scheduled_arrival_in_hours_str').cast('integer'))
time_flights3 = time_flights3.withColumn('scheduled_arrival_in_minutes_str', substring('scheduled_arrival',3,4))
time_flights3 = time_flights3.withColumn('scheduled_arrival_in_minutes', col('scheduled_arrival_in_minutes_str').cast('integer'))
time_flights3 = time_flights3.withColumn('scheduled_arrival_in_minutes_from_midnight',expr('scheduled_arrival_in_hours*60 + scheduled_arrival_in_minutes '))
time_flights3 = time_flights3.drop(
    'scheduled_arrival_in_hours_str','scheduled_arrival_in_minutes_str', 'scheduled_arrival_in_minutes','scheduled_arrival_in_hours')
time_flights3.limit(60).toPandas()

In [None]:
time_flights3.where('scheduled_departure_in_minutes_from_midnight > scheduled_arrival_in_minutes_from_midnight').toPandas()

In [None]:
time_flights4 = time_flights3.withColumn('flight_time_diff',expr('scheduled_departure_in_minutes_from_midnight+scheduled_time-scheduled_arrival_in_minutes_from_midnight'))
time_flights4.select('flight_time_diff').distinct().sort('flight_time_diff').toPandas()

In [None]:
time_flights4.where('flight_time_diff >= 1000').select('scheduled_time', 'flight_time_diff', 'scheduled_departure', 'scheduled_arrival').limit(100).toPandas()

In [None]:
time_flights4.where('flight_time_diff < 1000').select('scheduled_time', 'flight_time_diff', 'scheduled_departure', 'scheduled_arrival').limit(100).toPandas()

## Tworzenie zmiennych okienkowych

Troszeczkę inaczej wygląda zadanie w którym chcemy przeprowadzić jakieś obliczenia związane ze zmiennymi reprezentowanymi przez tak zwane okna. Okna to przedziały (z reguły czasu) w których obliczane są statystyki dla poszczególnych elementów celem składowania ich w danych.

Wiemy np. niektóre kierunki są popularniejsze od innych. Możemy spróbować pogrupować nasze dane tak aby zobaczyć na które lotniska w ostatnim przedziale czasowym przyleciało najwięcej samolotów. Kiedy chcemy aby dotyczyło to pojedynczego dnia zadanie jest proste.

In [None]:
popular_airports = flights.groupBy('Year', 'Month', 'Day', 'Destination_airport').count().orderBy(col('count').desc())
popular_airports.limit(20).toPandas()

In [None]:
flights.where('Month = 11 and day = 29 and destination_airport = "ATL" ').toPandas()

Troche trudniej sprawa wygląda jeśli podsumowanie ma dotyczyć np. ostatniego tygodnia.

Najpierw wygenerujemy tabelę dziennych lotów na poszczególne dni

In [None]:
popular_airports = flights.groupBy('Year', 'Month', 'Day', 'Destination_airport').count().orderBy(col('count').desc())
popular_airports.limit(10).toPandas()

Dalej musimy ją przetworzyć tak aby policzyć większe okna czasowe

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import concat_ws, to_timestamp, to_date, rank, count, sum

days = lambda x: x * 86400

destinations = popular_airports
destinations = destinations.withColumn('Year_str', col('Year').cast('string') )
destinations = destinations.withColumn('Month_str', col('Month').cast('string') )
destinations = destinations.withColumn('Day_str', col('Day').cast('string') )
destinations = destinations.withColumn('Date_str', concat_ws('-', 'Day_str', 'Month_str', 'Year_str') )
destinations = destinations.withColumn('Date', to_timestamp('Date_str', 'd-M-yyyy' ))
destinations = destinations.drop( 'Year_str', 'Month_str', 'Day_str', 'Date_str') #'Year','Month','Day',
windSpec = Window.partitionBy('Destination_Airport').orderBy(col('Date').cast('long')).rangeBetween(-days(7),0)
destinations = destinations.withColumn('Cumulative_flights', sum(col('count')).over(windSpec))
destinations = destinations.drop('count')
windSpec2 = Window.partitionBy('Date').orderBy(col('Cumulative_flights').desc())
destinations = destinations.withColumn('Rank' , rank().over(windSpec2))
destinations = destinations.drop('Date', 'Cumulative_flights')
destinations.limit(100).toPandas()

In [None]:
%%time
print(f'flights size ')
flights = flights.alias("left").join(destinations.alias('right'), (flights.YEAR == destinations.Year)  & (flights.MONTH == destinations.Month) 
& (flights.DAY == destinations.Day)  & (flights.DESTINATION_AIRPORT == destinations.Destination_airport)).select("left.*", 'right.rank') # 

In [None]:
flights.limit(5).toPandas()

Można jeszcze użyć operacji `withColumnRenamed` do uporządkowania nazw kolumn

# Ćwiczenie warsztatowe

## Zadanie 1 

Utworzyć podzbiór zawierający tylko loty, które się odbyły. Znaleźć lot najkrótszy oraz najdłuższy.

## Zadanie 2

Wyszukać liczbę przewozników w danych i znaleźć łączną liczbę lotów wykonanych przez każdego z nich

## Zadanie 3

Dla każdej trasy (Lotnisko początkowe -> Lotnisko końcowe) znaleźć minimalny, przeciętny i maksymalny (rzeczywisty) czas przelotu

## Zadanie 4

Utworzyć nową kolumnę opisującą trasę (Lotnisko początkowe -> Lotnisko końcowe). Następnie sprawdzić czy w danych podanych jest zgodna odległość jego łączące

## Zadanie 5

Wygenerować tabelę z popularnością poszczególnych przewoźników na podstawie całego zbioru danych, wygenerować ich ranking i dołączyć go do danych flights jako kolumnę AIRLINE_RANK.
