In [1]:
from pyspark.sql import SparkSession

# start the session
spark = SparkSession.builder.appName("JoinTypes").getOrCreate()

23/08/06 21:19:46 WARN Utils: Your hostname, ihorlukianov-Predator-PHN16-71 resolves to a loopback address: 127.0.1.1; using 192.168.1.103 instead (on interface wlp0s20f3)
23/08/06 21:19:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/06 21:19:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Load dataframes

In [6]:
# git clone https://github.com/databricks/LearningSparkV2.git
tripdelaysFilePath = "../databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
airportsnaFilePath = "../databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

In [7]:
airportsna = (spark.read
                    .format("csv")
                    .options(header="true", inferSchema="true", sep="\t")
                    .load(airportsnaFilePath))
airportsna.createOrReplaceTempView("airports_na")
spark.sql('Select count(*) as total_rows from airports_na').show()

+----------+
|total_rows|
+----------+
|       526|
+----------+



In [8]:
airportsna.show(5)

+----------+-----+-------+----+
|      City|State|Country|IATA|
+----------+-----+-------+----+
|Abbotsford|   BC| Canada| YXX|
|  Aberdeen|   SD|    USA| ABR|
|   Abilene|   TX|    USA| ABI|
|     Akron|   OH|    USA| CAK|
|   Alamosa|   CO|    USA| ALS|
+----------+-----+-------+----+
only showing top 5 rows



In [9]:
departureDelays = (spark.read
                        .format("csv")
                        .options(header="true")
                        .load(tripdelaysFilePath))
departureDelays.createOrReplaceTempView("departureDelays")
spark.sql('Select count(*) as total_rows from departureDelays').show()

+----------+
|total_rows|
+----------+
|   1391578|
+----------+



In [10]:
departureDelays.show(5)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 5 rows



## JOIN types

### Inner join

In [11]:
from pyspark.sql.functions import sum, col

# get only five flights with the longest delays
airportsna.filter(airportsna.State == 'NY').select('IATA') \
    .join(departureDelays.filter(departureDelays.delay > 0).select('date', col('delay').cast('int'), 'origin', 'destination'),
          airportsna.IATA == departureDelays.destination,
          'inner').sort('delay', ascending=False).show(5)

+----+--------+-----+------+-----------+
|IATA|    date|delay|origin|destination|
+----+--------+-----+------+-----------+
| JFK|01300915| 1500|   EGE|        JFK|
| JFK|01031442| 1167|   SJU|        JFK|
| LGA|01011700| 1017|   MSP|        LGA|
| JFK|01291718|  978|   SJU|        JFK|
| JFK|02121625|  932|   HNL|        JFK|
+----+--------+-----+------+-----------+
only showing top 5 rows



In [12]:
airportsna.filter(airportsna.State == 'NY') \
    .join(departureDelays,
          airportsna.IATA == departureDelays.destination,
          'inner').select(sum('delay')).show(5)

+----------+
|sum(delay)|
+----------+
|  970213.0|
+----------+



### Full/Outer Join

In [13]:
airportsna.filter(airportsna.State == 'GA').select('IATA') \
    .join(departureDelays.select('date', col('delay').cast('int'), 'origin', 'destination') \
                         .filter((departureDelays.destination.isin(['ATL', 'JFK'])) & (departureDelays.delay <= 0)).sort('date').limit(5),
          airportsna.IATA == departureDelays.destination,
          'full').sort('delay', ascending=False).show()

+----+--------+-----+------+-----------+
|IATA|    date|delay|origin|destination|
+----+--------+-----+------+-----------+
|null|01010206|   -4|   SJU|        JFK|
| ATL|01010040|   -6|   SLC|        ATL|
| ATL|01010030|   -8|   LAS|        ATL|
|null|01010500|   -8|   SJU|        JFK|
| ATL|01010059|   -9|   DEN|        ATL|
| ABY|    null| null|  null|       null|
| AGS|    null| null|  null|       null|
| AHN|    null| null|  null|       null|
| BQK|    null| null|  null|       null|
| CSG|    null| null|  null|       null|
| MCN|    null| null|  null|       null|
| SAV|    null| null|  null|       null|
| VLD|    null| null|  null|       null|
+----+--------+-----+------+-----------+



### Cross

In [21]:
# working incorrect
airportsna.filter(airportsna.State.isin(['GA', 'NY'])).select('IATA') \
    .join(departureDelays.select('date', col('delay').cast('int'), 'origin', 'destination') \
                         .filter((departureDelays.destination.isin(['ATL', 'JFK'])) & (departureDelays.delay <= 0)).sort('date').limit(5),
          airportsna.IATA == departureDelays.destination,
          'cross').sort('delay', ascending=False).show()

+----+--------+-----+------+-----------+
|IATA|    date|delay|origin|destination|
+----+--------+-----+------+-----------+
| JFK|01010206|   -4|   SJU|        JFK|
| ATL|01010040|   -6|   SLC|        ATL|
| ATL|01010030|   -8|   LAS|        ATL|
| JFK|01010500|   -8|   SJU|        JFK|
| ATL|01010059|   -9|   DEN|        ATL|
+----+--------+-----+------+-----------+



In [27]:
# correct
airportsna.filter(airportsna.State.isin(['GA', 'NY'])).select('IATA') \
    .crossJoin(departureDelays.select('date', col('delay').cast('int'), 'origin', 'destination') \
                         .filter((departureDelays.destination.isin(['ATL', 'JFK'])) & (departureDelays.delay <= 0)).sort('date').limit(5)).show()

+----+--------+-----+------+-----------+
|IATA|    date|delay|origin|destination|
+----+--------+-----+------+-----------+
| ABY|01010030|   -8|   LAS|        ATL|
| ABY|01010040|   -6|   SLC|        ATL|
| ABY|01010059|   -9|   DEN|        ATL|
| ABY|01010206|   -4|   SJU|        JFK|
| ABY|01010500|   -8|   SJU|        JFK|
| ALB|01010030|   -8|   LAS|        ATL|
| ALB|01010040|   -6|   SLC|        ATL|
| ALB|01010059|   -9|   DEN|        ATL|
| ALB|01010206|   -4|   SJU|        JFK|
| ALB|01010500|   -8|   SJU|        JFK|
| AHN|01010030|   -8|   LAS|        ATL|
| AHN|01010040|   -6|   SLC|        ATL|
| AHN|01010059|   -9|   DEN|        ATL|
| AHN|01010206|   -4|   SJU|        JFK|
| AHN|01010500|   -8|   SJU|        JFK|
| ATL|01010030|   -8|   LAS|        ATL|
| ATL|01010040|   -6|   SLC|        ATL|
| ATL|01010059|   -9|   DEN|        ATL|
| ATL|01010206|   -4|   SJU|        JFK|
| ATL|01010500|   -8|   SJU|        JFK|
+----+--------+-----+------+-----------+
only showing top

### Left

In [28]:
airportsna.filter(airportsna.State == 'GA').select('IATA') \
    .join(departureDelays.select('date', col('delay').cast('int'), 'origin', 'destination') \
                         .filter((departureDelays.destination.isin(['ATL', 'JFK'])) & (departureDelays.delay <= 0)).sort('date').limit(5),
          airportsna.IATA == departureDelays.destination,
          'left_outer').sort('delay', ascending=False).show()

+----+--------+-----+------+-----------+
|IATA|    date|delay|origin|destination|
+----+--------+-----+------+-----------+
| ATL|01010040|   -6|   SLC|        ATL|
| ATL|01010030|   -8|   LAS|        ATL|
| ATL|01010059|   -9|   DEN|        ATL|
| ABY|    null| null|  null|       null|
| AHN|    null| null|  null|       null|
| AGS|    null| null|  null|       null|
| BQK|    null| null|  null|       null|
| CSG|    null| null|  null|       null|
| MCN|    null| null|  null|       null|
| SAV|    null| null|  null|       null|
| VLD|    null| null|  null|       null|
+----+--------+-----+------+-----------+



### Right

In [26]:
airportsna.filter(airportsna.State == 'GA').select('IATA') \
    .join(departureDelays.select('date', col('delay').cast('int'), 'origin', 'destination') \
                         .filter((departureDelays.destination.isin(['ATL', 'JFK'])) & (departureDelays.delay <= 0)).sort('date').limit(5),
          airportsna.IATA == departureDelays.destination,
          'right').sort('delay', ascending=False).show()

+----+--------+-----+------+-----------+
|IATA|    date|delay|origin|destination|
+----+--------+-----+------+-----------+
|null|01010206|   -4|   SJU|        JFK|
| ATL|01010040|   -6|   SLC|        ATL|
| ATL|01010030|   -8|   LAS|        ATL|
|null|01010500|   -8|   SJU|        JFK|
| ATL|01010059|   -9|   DEN|        ATL|
+----+--------+-----+------+-----------+



### Anti

In [63]:
departureDelays.select('date', col('delay').cast('int'), 'origin', 'destination') \
    .join(airportsna.filter(airportsna.Country == 'USA'),
          departureDelays['destination'] == airportsna['IATA'],
          'anti').sort('delay', ascending=False).show(5)

+--------+-----+------+-----------+
|    date|delay|origin|destination|
+--------+-----+------+-----------+
|01110900|  511|   JFK|        SJU|
|01051250|  468|   STT|        SJU|
|01040611|  459|   JFK|        SJU|
|01052359|  447|   JFK|        SJU|
|01030900|  432|   JFK|        SJU|
+--------+-----+------+-----------+
only showing top 5 rows



### Semi

In [64]:
departureDelays.select('date', col('delay').cast('int'), 'origin', 'destination') \
    .join(airportsna.filter(airportsna.Country == 'USA'),
          departureDelays['destination'] == airportsna['IATA'],
          'semi').sort('delay', ascending=False).show(5)

+--------+-----+------+-----------+
|    date|delay|origin|destination|
+--------+-----+------+-----------+
|03090615| 1642|   TPA|        DFW|
|02190925| 1638|   SFO|        ORD|
|02021245| 1636|   FLL|        DFW|
|03020700| 1592|   RSW|        ORD|
|01180805| 1560|   BNA|        DFW|
+--------+-----+------+-----------+
only showing top 5 rows

