In [1]:
! pip install nb_black



In [2]:
%load_ext nb_black

<IPython.core.display.Javascript object>

In [3]:
! pip install fugue[all]



<IPython.core.display.Javascript object>

## Just Like SQL

In [4]:
import warnings
import pandas as pd
import numpy as np
import fugue_spark
from typing import List, Dict, Any
from fugue_notebook import setup
from fugue_sql import fsql

warnings.filterwarnings('ignore')
setup()



<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [5]:
df = pd.DataFrame({"col1": [1, 2, 3, 4], "col2": ["a", "a", "c", "d"]})
df

Unnamed: 0,col1,col2
0,1,a
1,2,a
2,3,c
3,4,d


<IPython.core.display.Javascript object>

In [6]:
%%fsql

SELECT * FROM df
WHERE col2 = "a"
PRINT

Unnamed: 0,col1:long,col2:str
0,1,a
1,2,a


<IPython.core.display.Javascript object>

In [7]:
%%fsql

SELECT col2, AVG(col1) AS avg_col1
FROM df
GROUP BY col2
PRINT

Unnamed: 0,col2:str,avg_col1:double
0,a,1.5
1,c,3.0
2,d,4.0


<IPython.core.display.Javascript object>

## Enhance SQL Interface

In [8]:
%%fsql  

df2 = SELECT *
FROM df 
WHERE col2="a"

SAVE df2 OVERWRITE '/tmp/df2.csv' (header=true)

<IPython.core.display.Javascript object>

In [9]:

%%fsql  

df3 = LOAD '/tmp/df2.csv' (header=true)

SELECT *
FROM df3
PRINT

Unnamed: 0,col1:str,col2:str
0,1,a
1,2,a


<IPython.core.display.Javascript object>

## Added Keywords

### DROP

In [10]:
%%fsql 

df4 = DROP COLUMNS col2 IF EXISTS FROM df
PRINT df4

Unnamed: 0,col1:long
0,1
1,2
2,3
3,4


<IPython.core.display.Javascript object>

### FILL

In [11]:
null_df = pd.DataFrame({"col1": [np.nan, np.nan, 1], "col2": [2, 3, np.nan]})

<IPython.core.display.Javascript object>

In [12]:
%%fsql
-- Fill nan at col1 with 1 and nan at col2 with 2
df1 = FILL NULLS PARAMS col1:1, col2:2 FROM null_df
PRINT df1

Unnamed: 0,col1:double,col2:double
0,1.0,2.0
1,1.0,3.0
2,1.0,2.0


<IPython.core.display.Javascript object>

### SAMPLE

In [13]:
%%fsql
df2 = SAMPLE 2 ROWS SEED 42 FROM df
PRINT df2
df3 = SAMPLE 50 PERCENT SEED 1 FROM df
PRINT df3

Unnamed: 0,col1:long,col2:str
0,2,a
1,4,d


Unnamed: 0,col1:long,col2:str
0,4,d
1,3,c


<IPython.core.display.Javascript object>

### Intergrate with Python

In [14]:
# schema: *, col3:str
def str_concat(df: pd.DataFrame, delimeter: str) -> pd.DataFrame:
    df = df.assign(col3=df["col1"].astype(str) + delimeter + df["col2"])
    return df

<IPython.core.display.Javascript object>

In [15]:
%%fsql 
SELECT * 
FROM df
PRINT

Unnamed: 0,col1:long,col2:str
0,1,a
1,2,a
2,3,c
3,4,d


<IPython.core.display.Javascript object>

In [16]:
%%fsql 
SELECT * 
FROM df 
TRANSFORM USING str_concat(delimeter="_")
PRINT

Unnamed: 0,col1:long,col2:str,col3:str
0,1,a,1_a
1,2,a,2_a
2,3,c,3_c
3,4,d,4_d


<IPython.core.display.Javascript object>

### Scale to Big Data

In [17]:
%%fsql spark 
SELECT * 
FROM df 
TRANSFORM USING str_concat(delimeter="_")
PRINT

23/01/23 08:39:05 WARN Utils: Your hostname, fmpclt227-Predator-PH315-54 resolves to a loopback address: 127.0.1.1; using 192.168.1.66 instead (on interface wlp0s20f3)
23/01/23 08:39:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/23 08:39:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

Unnamed: 0,col1:long,col2:str,col3:str
0,1,a,1_a
1,2,a,2_a
2,3,c,3_c
3,4,d,4_d


<IPython.core.display.Javascript object>

In [18]:
# schema: *
def get_median(df: pd.DataFrame) -> List[Dict[str, Any]]:
    return [{"col1": df["col1"].median(), "col2": df["col2"].iloc[0]}]

<IPython.core.display.Javascript object>

In [19]:
%%fsql spark 
SELECT * 
FROM df 
TRANSFORM PREPARTITION BY col2 USING get_median
PRINT

Unnamed: 0,col1:long,col2:str
0,1,a
1,3,c
2,4,d


<IPython.core.display.Javascript object>

## FugueSQL in Production

In [20]:
fsql(
    """SELECT * 
        FROM df 
        TRANSFORM PREPARTITION BY col2 USING get_median
        PRINT"""
).run("spark")

Unnamed: 0,col1:long,col2:str
0,1,a
1,3,c
2,4,d


FugueWorkflowResult()

<IPython.core.display.Javascript object>