Welcome to Fugue SQL demo

In this demo, we are going to explore US Flight Dataset that is extensively discussed on the internet. 

I will show you how to use Fugue and Spark to quickly iterate on big data problems.


Firstly, I will paste a piece of code to initialize the notebook environment. 

In [None]:
from fugue_notebook import setup
import fugue_spark

setup()

from pyspark.sql import SparkSession

(SparkSession.builder
         .config("spark.driver.memory", "30g")
         .config("spark.sql.shuffle.partitions", "16")
         .config("fugue.spark.use_pandas_udf", True)
         .getOrCreate()
)





The setup function will create shortcut to use spark 

It also enables the Fugue SQL magic and syntax highlighting

Please pay attention to the config for Spark session.

We enabled fugue to use pandas udf whenever is eligible.

In fugue, using or not using pandas udf, your code will not need any change

It's just a switch.

And for this case, enabling pandas udf makes certain steps 3-4 times faster.

Also notice you don't need to write this piece of code every time you use Fugue.

There can be a centralized place to customize your fugue experience.

-

All data of this demo has been saved on google cloud

The first fugue SQL is to load csv files from google cloud

In [None]:
%%fsql
airports = 
    LOAD CSV "gs://fugue/demo/flights/airports.csv"
    COLUMNS airport_id:long,name:str,city:str,country:str,iata:str,icao:str,lat:double,lng:double,alt:long,timezone:str,dst:str,type:str,source:str
YIELD DATAFRAME
PRINT

airlines = 
    LOAD CSV "gs://fugue/demo/flights/airlines.csv"
    COLUMNS airline_id:long,name:str,alias:str,iata:str,icao:str,callsign:str,country:str,active:str
YIELD DATAFRAME
PRINT

These csv files don't have header, so we define the schema using COLUMNS

Then we yield them so we can consume them in the following cells

We also print them for debugging purposes

Notice the execution of this cell has nothing to do with Spark.

It is using default exeuction engine, which only uses native python and pandas

-

now let's take a look at the underlying dataframe of the yielded airports

In [None]:
airports.native

it is just a pandas dataframe.

-

Now let's explore the main dataset, the flights information.

In [None]:
%%fsql spark
LOAD "gs://fugue/demo/flights/flights.parquet"
PRINT ROWCOUNT

The flights.parquet is a folder of parquet files, using Spark will be much faster

to use spark, we just specify spark for fsql

it has 2.5 million rows

-
-


Now we need to do some transformation on this big dataset

But how can we verify everything is working fine before applying it on the entire dataset?

We can use the Fugue SQL's SAMPLE syntax here and yield a small dataframe to iterate on

In [None]:
%%fsql spark
LOAD "gs://fugue/demo/flights/flights.parquet"
PRINT ROWCOUNT
SAMPLE 0.01 PERCENT SEED 0 PERSIST
YIELD DATAFRAME AS test
PRINT ROWCOUNT

In test, we have only 267 rows

-

If you want to iterate quickly or do unit test,using small dataset is one thing

Avoid using Spark is also a great idea although practically at this point, if you were not using Fugue, you already see Spark dependencies everywhere in your code, bypassing Spark would not be an option any more.

But so far with Fugue, you don't see Spark dependencies anywhere.

-

Let's paste the transformation function

In [None]:
import pandas as pd

#schema: *,ts:datetime,day_of_year:int,hour_of_week:int
def generate_time_metrics(df:pd.DataFrame) -> pd.DataFrame:
    date = df["FL_DATE"].astype(str) + " "+df["CRS_DEP_TIME"].astype(str)
    df["ts"]=pd.to_datetime(date, format="%Y-%m-%d %H%M")
    df["day_of_year"]=df["ts"].dt.dayofyear
    df["hour_of_week"]=df["ts"].dt.dayofweek*24+df["ts"].dt.hour
    return df

Again, this function is still unrelated to Spark, or Fugue.

It is just a native python and pandas function, adding a few more columns from the existing columns

The comment line will be useful to fugue

it tells fugue the output schema is the input schema which is the wildcard plus three columns with types

But since it's just a comment line, it doesn't build any dependency on Fugue

We all know how to unit test such a simple function on pandas

And here, we just verify the result by the sample test dataframe

In [None]:
generate_time_metrics(test.as_pandas())

OK, this code only works for pandas dataframes.

But with fugue this function can work for all supported dataframes such Spark and dask

Let's see how to use this function in Fugue SQL

In [None]:
%%fsql
TRANSFORM test USING generate_time_metrics
PRINT

TRANSFORM can directly apply simple python functions on a generalized dataframe

The usage has many variations, for example, you can pre partition the dataframe then the function will be applied on each partition separately.

For more details, you can visit the Fugue repository

-

Now, we want to do one more thing, the output dataframe has too many columns

we want to select just a few and rename them

This is a typical case that SQL SELECT can do an elegant job, so let's just do it

In [None]:
%%fsql
TRANSFORM test USING generate_time_metrics
SELECT 
    ts, 
    day_of_year, 
    hour_of_week, 
    ORIGIN AS origin,
    DEST AS dest,
    OP_UNIQUE_CARRIER AS carrier,
    DEP_DELAY AS delay
PRINT

This still runs on pandas, not Spark

In Fugue, SQL is no longer a privilege when using a certain computing framework.

Your SQL statements can run on every computing framework Fugue supports, including native pandas engine.

-

So this piece of code has a map function plus a sql statement

You could make it more complicated, it just runs without Spark.

-


Now let's bring it to Spark and apply on the entire dataset

In [None]:
%%fsql spark
LOAD "gs://fugue/demo/flights/flights.parquet"
TRANSFORM USING generate_time_metrics
SELECT 
    ts, 
    day_of_year, 
    hour_of_week, 
    ORIGIN AS origin,
    DEST AS dest,
    OP_UNIQUE_CARRIER AS carrier,
    DEP_DELAY AS delay
PERSIST 
YIELD DATAFRAME AS flights
PRINT ROWCOUNT

We just replace the test dataframe with a LOAD statement, and specify the engine spark for fsql

We also yield the result to use later

While it is running, let's continue our demo

-

Visualization is very important for data exploration

so how can we enable visualization inside Fugue SQL?

Again, you only need to write the very simple python functions independent from Fugue

In [2]:
import matplotlib.pyplot as plt

def plot(df:pd.DataFrame,x,y,sort,**kwargs) -> None:
    df.sort_values(sort).plot(x=x,y=y,**kwargs)
    plt.show()
    
def plot_bar(df:pd.DataFrame,x,y,sort,**kwargs) -> None:
    df.sort_values(sort).plot.bar(x=x,y=y,**kwargs)
    plt.show()

plot and plot_bar will be used as another type of extension in Fugue, it's called outputter, it has to run on the driver.

Here is how to use it

In [None]:
%%fsql spark
SELECT day_of_year, AVG(delay) AS avg_delay FROM flights GROUP BY day_of_year
OUTPUT USING plot(x="day_of_year",y="avg_delay",sort="day_of_year")

SELECT hour_of_week, AVG(delay) AS avg_delay FROM flights GROUP BY hour_of_week
OUTPUT USING plot(x="hour_of_week",y="avg_delay",sort="hour_of_week")

This is a simple piece of code shows several things

First, it is multi task, we aggregated and ploted data points based on day of year and hour of year respectively

Second, it is a more balanced case where you use both standard SELECT statements and Fugue specific syntax OUTPUT

Third, and again, this code runs on all computing frameworks Fugue supports

-

Now let's see a more SQL intensive case

In [None]:
%%fsql spark
info = 
    SELECT ts
        , carrier
        , B.name AS carrier_name
        , origin
        , C.name AS origin_name      
        , C.country AS origin_country      
        , C.lat AS origin_lat       
        , C.lng AS origin_lng    
        , dest
        , D.name AS dest_name
        , D.country AS dest_country    
        , D.lat AS dest_lat       
        , D.lng AS dest_lng    
        , delay
    FROM flights AS A
    LEFT OUTER JOIN airlines AS B
        ON A.carrier = B.iata
    LEFT OUTER JOIN airports AS C
        ON A.origin = C.iata
    LEFT OUTER JOIN airports AS D
        ON A.dest = D.iata
    WHERE C.lat IS NOT NULL AND C.lng IS NOT NULL
        AND D.lat IS NOT NULL AND D.lng IS NOT NULL
PERSIST YIELD DATAFRAME
PRINT ROWCOUNT

SELECT * WHERE origin_country = dest_country AND origin_country = 'United States'
PERSIST YIELD DATAFRAME AS info_us
PRINT ROWCOUNT

We joined flights, airlines and airports together as info

Then we filtered the dataframe to contain only US domestic flights.

We yielded both and print both

-

Now let's do more analysis

In [None]:
%%fsql spark
SELECT origin, AVG(delay) AS delay FROM info_us GROUP BY origin
SELECT * ORDER BY delay DESC LIMIT 10
OUTPUT USING plot_bar(x="origin",y="delay",sort="delay", title="By Origin")

SELECT dest, AVG(delay) AS delay FROM info_us GROUP BY dest
SELECT * ORDER BY delay DESC LIMIT 10
OUTPUT USING plot_bar(x="dest",y="delay",sort="delay", title="By Dest")

top = 
    SELECT carrier, COUNT(*) AS ct 
    FROM info_us GROUP BY carrier 
    ORDER BY ct DESC LIMIT 10
    
SELECT info_us.* FROM info_us INNER JOIN top ON info_us.carrier = top.carrier
SELECT carrier_name, AVG(delay) AS delay GROUP BY carrier_name
SELECT * ORDER BY delay DESC LIMIT 10
OUTPUT USING plot_bar(x="carrier_name",y="delay",sort="delay", title="By Top Carriers")


in this example, pay attention to that we can use select statements one after another

this avoids gigantic embeded SQL statements

actually fugue also supports embeded SQL, if you prefer, you can still write in that way

also see how we use simple assignment syntax to define intermediate dataframes

and how we can use them in the following steps

-

another thing worth to mention is anonimity

we can give names to dataframes only when necessary

for chaining operations we can keep them anonymous, and we also omitted a lot of FROM clauses

there is no ambiguity issue.

And of course you can be explicit on each step in Fugue,

you can assign variable names and keep FROM clause if you prefer.

-

Now imagine how to implement the same logic in Spark,

you may realize how much bolierplate code to write to achieve the same thing

Fugue revolutionized SQL, the logic is much simpler, cleaner and more intuitive

-

Now let's put everything together

In [None]:
%%fsql spark
LOAD "gs://fugue/demo/flights/flights.parquet"
TRANSFORM USING generate_time_metrics
flights = 
    SELECT 
        ts, 
        day_of_year, 
        hour_of_week, 
        ORIGIN AS origin,
        DEST AS dest,
        OP_UNIQUE_CARRIER AS carrier,
        DEP_DELAY AS delay
    PERSIST 
    
SELECT day_of_year, AVG(delay) AS avg_delay FROM flights GROUP BY day_of_year
OUTPUT USING plot(x="day_of_year",y="avg_delay",sort="day_of_year")

SELECT hour_of_week, AVG(delay) AS avg_delay FROM flights GROUP BY hour_of_week
OUTPUT USING plot(x="hour_of_week",y="avg_delay",sort="hour_of_week")

    
info = 
    SELECT ts
        , carrier
        , B.name AS carrier_name
        , origin
        , C.name AS origin_name      
        , C.country AS origin_country      
        , C.lat AS origin_lat       
        , C.lng AS origin_lng    
        , dest
        , D.name AS dest_name
        , D.country AS dest_country    
        , D.lat AS dest_lat       
        , D.lng AS dest_lng    
        , delay
    FROM flights AS A
    LEFT OUTER JOIN airlines AS B
        ON A.carrier = B.iata
    LEFT OUTER JOIN airports AS C
        ON A.origin = C.iata
    LEFT OUTER JOIN airports AS D
        ON A.dest = D.iata
    WHERE C.lat IS NOT NULL AND C.lng IS NOT NULL
        AND D.lat IS NOT NULL AND D.lng IS NOT NULL

info_us = 
    SELECT * WHERE origin_country = dest_country AND origin_country = 'United States'
    PERSIST
    
SELECT origin, AVG(delay) AS delay FROM info_us GROUP BY origin
SELECT * ORDER BY delay DESC LIMIT 10
OUTPUT USING plot_bar(x="origin",y="delay",sort="delay", title="By Origin")

SELECT dest, AVG(delay) AS delay FROM info_us GROUP BY dest
SELECT * ORDER BY delay DESC LIMIT 10
OUTPUT USING plot_bar(x="dest",y="delay",sort="delay", title="By Dest")

top = 
    SELECT carrier, COUNT(*) AS ct 
    FROM info_us GROUP BY carrier 
    ORDER BY ct DESC LIMIT 20
    
SELECT info_us.* FROM info_us INNER JOIN top ON info_us.carrier = top.carrier
SELECT carrier_name, AVG(delay) AS delay GROUP BY carrier_name
SELECT * ORDER BY delay DESC LIMIT 10
OUTPUT USING plot_bar(x="carrier_name",y="delay",sort="delay", title="By Top Carriers")



We just copy pasted the code together and removed the YIELD statements because they are in a single cell now, YIELD is not necessary

So you can see Fugue SQL is a first class programming language to describe complex data pipelines

It's multi task, it's great for SQL heavy pipelines where python can help at a few steps

while it's running, let's talk about productionization

-

notebook magic is a cool idea, it makes development experience much better

however, for production, we need modulized, testable code

Let's see how we modulize the previous logic

In [None]:
metadata_query = """
airports = 
    LOAD CSV "gs://fugue/demo/flights/airports.csv"
    COLUMNS airport_id:long,name:str,city:str,country:str,iata:str,icao:str,lat:double,lng:double,alt:long,timezone:str,dst:str,type:str,source:str
YIELD DATAFRAME

airlines = 
    LOAD CSV "gs://fugue/demo/flights/airlines.csv"
    COLUMNS airline_id:long,name:str,alias:str,iata:str,icao:str,callsign:str,country:str,active:str
YIELD DATAFRAME
"""

flights_query = """
LOAD "gs://fugue/demo/flights/flights.parquet"
TRANSFORM USING generate_time_metrics
flights = 
    SELECT 
        ts, 
        day_of_year, 
        hour_of_week, 
        ORIGIN AS origin,
        DEST AS dest,
        OP_UNIQUE_CARRIER AS carrier,
        DEP_DELAY AS delay
    PERSIST YIELD DATAFRAME
"""

eda_query = """
SELECT day_of_year, AVG(delay) AS avg_delay FROM flights GROUP BY day_of_year
OUTPUT USING plot(x="day_of_year",y="avg_delay",sort="day_of_year")

SELECT hour_of_week, AVG(delay) AS avg_delay FROM flights GROUP BY hour_of_week
OUTPUT USING plot(x="hour_of_week",y="avg_delay",sort="hour_of_week")

    
info = 
    SELECT ts
        , carrier
        , B.name AS carrier_name
        , origin
        , C.name AS origin_name      
        , C.country AS origin_country      
        , C.lat AS origin_lat       
        , C.lng AS origin_lng    
        , dest
        , D.name AS dest_name
        , D.country AS dest_country    
        , D.lat AS dest_lat       
        , D.lng AS dest_lng    
        , delay
    FROM flights AS A
    LEFT OUTER JOIN airlines AS B
        ON A.carrier = B.iata
    LEFT OUTER JOIN airports AS C
        ON A.origin = C.iata
    LEFT OUTER JOIN airports AS D
        ON A.dest = D.iata
    WHERE C.lat IS NOT NULL AND C.lng IS NOT NULL
        AND D.lat IS NOT NULL AND D.lng IS NOT NULL

info_us = 
    SELECT * WHERE origin_country = dest_country AND origin_country = 'United States'
    PERSIST
    
SELECT origin, AVG(delay) AS delay FROM info_us GROUP BY origin
SELECT * ORDER BY delay DESC LIMIT 10
OUTPUT USING plot_bar(x="origin",y="delay",sort="delay", title="By Origin")

SELECT dest, AVG(delay) AS delay FROM info_us GROUP BY dest
SELECT * ORDER BY delay DESC LIMIT 10
OUTPUT USING plot_bar(x="dest",y="delay",sort="delay", title="By Dest")

top = 
    SELECT carrier, COUNT(*) AS ct 
    FROM info_us GROUP BY carrier 
    ORDER BY ct DESC LIMIT {{n}}
    YIELD DATAFRAME
    
info_top = 
    SELECT info_us.* FROM info_us INNER JOIN top ON info_us.carrier = top.carrier

SELECT carrier_name, AVG(delay) AS delay FROM info_top GROUP BY carrier_name
SELECT * ORDER BY delay DESC LIMIT {{n}}
OUTPUT USING plot_bar(x="carrier_name",y="delay",sort="delay", title="By Top Carriers")

"""

see we separated the pipeline to 3 parts

the first to get reference data, second one to process and clean up the flights data

the third one is to do the data analysis

And we use YIELD to output result for the first and second modules

Notice in the third one, we have this n in a jinja template

this is how we parameterize the modules

-

now let's see how we chain them together

In [None]:
from fugue_sql import fsql

metadata = fsql(metadata_query).run()
flights = fsql(flights_query).run("spark")
fsql(eda_query, metadata, flights, n=30).run("spark");

we use fsql utility function to execute the sql strings

for the first query, we run it without parameters, meaning that it will run using our native execution engine

for the second and third, we run them on spark

So if you modulize in this way, we can run each module with a different execution engine

for example we can let the first step run on gpu using blazing, let the second run on dask, and let the third run on spark.

This is normally unnecessary, but a more practical case is you can run with different size of spark clusters.

-

In the end I want to talk about testing

For this particular case, it was not setup for the best testability but we still can use it as an example

look at eda query, it yields the top dataframe

so here is how we test with mock input

In [None]:
mock_airports = # a mock pandas dataframe
mock_airlines = # a mock pandas dataframe
mock_flights = # a mock pandas dataframe

result = fsql(
    eda_query, 
    airports=mock_airports,
    airlines=mock_airlines,
    flights=mock_mockflights,
    n=10).run()

df_to_assert_on = result["top"].as_pandas()

instead of getting outputs from the first 2 modules, we directly provide mock pandas dataframes into the third module

and we run it using native execution engine

and assert on the yielded top dataframe

everything will happen without Spark, it is just a normal test on pandas

if you want to test on local spark, you only need to specify it in run, all other things will remain the same

you just need to make sure your test environment can get a spark session