In [2]:
# https://mode.com/sql-tutorial/sql-window-functions/

In [2]:
import pyspark as ps

In [3]:
# Establish Spark session

spark = ps.sql.SparkSession.builder \
            .master("local[2]") \
            .appName("df lecture") \
            .getOrCreate()

sc = spark.sparkContext 
print("woot")

woot


## **SQL Window Functions** ##
* Start spark session
* Execute SQL queries
* Next: Add notes and continue

In [4]:
# read CSV
df_sales = spark.read.csv('data/sales.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

# Now create an SQL table and issue SQL queries against it without
# using the sqlContext but through the SparkSession object.
# Creates a temporary view of the DataFrame
df_sales.createOrReplaceTempView("sales")

AnalysisException: Path does not exist: file:/home/jovyan/work/DS/repos_practice/sql-practice/misc/data/sales.csv;

In [None]:
result = spark.sql('''
    SELECT state, AVG(amount) as avg_amount
    FROM sales
    GROUP BY state
    ''')
result.show()

In [5]:
# Load CSV in a Spark dataframe

df_bike = spark.read.csv('data/2012Q1-capitalbikeshare-tripdata.csv', header=True, quote='"', sep=',', inferSchema=False)
df_bike.createOrReplaceTempView('bike_data')

In [6]:
# Check to make sure it worked

result = spark.sql('''
    SELECT * FROM bike_data LIMIT 5
    ''')
result.show()

+----------------+-----------+-----------+--------------------+--------------------+------------------+--------------------+-----------+-----------+
|duration_seconds| start_time|   end_time|start_station_number|       start_station|end_station_number|         end_station|bike_number|member_type|
+----------------+-----------+-----------+--------------------+--------------------+------------------+--------------------+-----------+-----------+
|             475|1/1/12 0:04|1/1/12 0:11|               31245|7th & R St NW / S...|             31109|       7th & T St NW|     W01412|     Member|
|            1162|1/1/12 0:10|1/1/12 0:29|               31400|Georgia & New Ham...|             31103|16th & Harvard St NW|     W00524|     Casual|
|            1145|1/1/12 0:10|1/1/12 0:29|               31400|Georgia & New Ham...|             31103|16th & Harvard St NW|     W00235|     Member|
|             485|1/1/12 0:15|1/1/12 0:23|               31101|      14th & V St NW|             31602|Par

In [6]:
# Demonstrate basic window function.  'OVER' designates the window function, ordered by start_time.

result = spark.sql('''
    SELECT duration_seconds,
       SUM(duration_seconds) OVER (ORDER BY start_time) AS running_total
    FROM bike_data
    ''')
result.show()

+----------------+-------------+
|duration_seconds|running_total|
+----------------+-------------+
|             475|        475.0|
|            1162|       2782.0|
|            1145|       2782.0|
|             485|       3738.0|
|             471|       3738.0|
|             358|       4096.0|
|            1754|       5850.0|
|             259|       6109.0|
|             516|       6625.0|
|             913|       7538.0|
|            1097|       8635.0|
|             490|       9125.0|
|            1045|      11205.0|
|            1035|      11205.0|
|            1060|      14063.0|
|            1039|      14063.0|
|             443|      14063.0|
|             316|      14063.0|
|             506|      14569.0|
|             956|      15525.0|
+----------------+-------------+
only showing top 20 rows



In [7]:
# PARTITION BY separates by starting station, then performs running_total ordered by start_time.  running_total starts over at each new station.

result = spark.sql('''
   SELECT start_station_number,
       duration_seconds,
       SUM(duration_seconds) OVER
         (PARTITION BY start_station_number ORDER BY start_time)
         AS running_total
   FROM bike_data
   WHERE start_time < '2012-01-08'
   ''')
result.show()

+--------------------+----------------+-------------+
|start_station_number|duration_seconds|running_total|
+--------------------+----------------+-------------+
|               31217|             841|       1613.0|
|               31217|             772|       1613.0|
|               31217|            1623|       3236.0|
|               31217|            1260|       5751.0|
|               31217|            1255|       5751.0|
|               31217|            5154|      12076.0|
|               31217|            1171|      12076.0|
|               31217|            4880|      16956.0|
|               31217|             531|      17487.0|
|               31217|            8831|      26318.0|
|               31217|            8684|      35002.0|
|               31217|            8681|      43683.0|
|               31217|            8528|      52211.0|
|               31217|             881|      53092.0|
|               31217|             858|      53950.0|
|               31217|      

In [9]:
# Without ORDER BY, running_total is the total of all seconds (see below without ORDER BY).  ORDER BY moves row by row in order of the designated column.  
# ORDER BY and PARTITION define the window.  Can't include window function in a GROUP BY clause.

result = spark.sql('''
    SELECT start_station_number,
       duration_seconds,
       SUM(duration_seconds) OVER
         (PARTITION BY start_station_number)
         AS running_total
   FROM bike_data
   WHERE start_time < '2012-01-08'
    ''')
result.show()

+--------------------+----------------+-------------+
|start_station_number|duration_seconds|running_total|
+--------------------+----------------+-------------+
|               31217|             841|    2726242.0|
|               31217|             772|    2726242.0|
|               31217|            1623|    2726242.0|
|               31217|            1260|    2726242.0|
|               31217|            1255|    2726242.0|
|               31217|            5154|    2726242.0|
|               31217|            1171|    2726242.0|
|               31217|            4880|    2726242.0|
|               31217|             531|    2726242.0|
|               31217|            8831|    2726242.0|
|               31217|            8684|    2726242.0|
|               31217|            8681|    2726242.0|
|               31217|            8528|    2726242.0|
|               31217|             881|    2726242.0|
|               31217|             858|    2726242.0|
|               31217|      

In [12]:
# Using aggregates with window functions.

result = spark.sql('''
    SELECT start_station_number,
           duration_seconds,
           SUM(duration_seconds) OVER
               (PARTITION BY start_station_number ORDER BY start_time) AS running_total,
           COUNT(duration_seconds) OVER
               (PARTITION BY start_station_number ORDER BY start_time) AS running_count,
           AVG(duration_seconds) OVER
               (PARTITION BY start_station_number ORDER BY start_time) AS running_avg
    FROM bike_data
    WHERE start_time < '2012-01-08'
     ''')
result.show()

+--------------------+----------------+-------------+-------------+------------------+
|start_station_number|duration_seconds|running_total|running_count|       running_avg|
+--------------------+----------------+-------------+-------------+------------------+
|               31217|             841|       1613.0|            2|             806.5|
|               31217|             772|       1613.0|            2|             806.5|
|               31217|            1623|       3236.0|            3|1078.6666666666667|
|               31217|            1260|       5751.0|            5|            1150.2|
|               31217|            1255|       5751.0|            5|            1150.2|
|               31217|            5154|      12076.0|            7| 1725.142857142857|
|               31217|            1171|      12076.0|            7| 1725.142857142857|
|               31217|            4880|      16956.0|            8|            2119.5|
|               31217|             531|    

In [15]:
# ROW_NUMBER() displays row numbers according to ORDER BY.  PARTITION BY will cause row numbers to reset at the partition.

result = spark.sql('''
   SELECT start_station_number,
          start_time,
          duration_seconds,
          ROW_NUMBER() OVER (ORDER BY start_time) AS row_number
   FROM bike_data
   WHERE start_time < '2012-01-08'
   ''')
result.show()

+--------------------+-----------+----------------+----------+
|start_station_number| start_time|duration_seconds|row_number|
+--------------------+-----------+----------------+----------+
|               31245|1/1/12 0:04|             475|         1|
|               31400|1/1/12 0:10|            1162|         2|
|               31400|1/1/12 0:10|            1145|         3|
|               31101|1/1/12 0:15|             485|         4|
|               31102|1/1/12 0:15|             471|         5|
|               31017|1/1/12 0:17|             358|         6|
|               31236|1/1/12 0:18|            1754|         7|
|               31101|1/1/12 0:22|             259|         8|
|               31014|1/1/12 0:24|             516|         9|
|               31101|1/1/12 0:25|             913|        10|
|               31303|1/1/12 0:29|            1097|        11|
|               31222|1/1/12 0:30|             490|        12|
|               31230|1/1/12 0:32|            1045|    

In [16]:
# RANK() is like ROW_NUMBER() but not exclusive.  There can be more than one of the same rank.
# Below, there are duplicate start times (the ORDER BY) and so duplicate rank.
# RANK() - 1, 2, 2, 4, 5, 5, 7....
# DENSE_RANK() - 1, 2, 2, 3, 4, 4, 5.... (no skipping ranks)

result = spark.sql('''
   SELECT start_station_number,
          duration_seconds,
          RANK() OVER (PARTITION BY start_station_number ORDER BY start_time) AS rank
   FROM bike_data
   WHERE start_time < '2012-01-08'
   ''')
result.show()

+--------------------+----------------+----+
|start_station_number|duration_seconds|rank|
+--------------------+----------------+----+
|               31217|             841|   1|
|               31217|             772|   1|
|               31217|            1623|   3|
|               31217|            1260|   4|
|               31217|            1255|   4|
|               31217|            5154|   6|
|               31217|            1171|   6|
|               31217|            4880|   8|
|               31217|             531|   9|
|               31217|            8831|  10|
|               31217|            8684|  11|
|               31217|            8681|  12|
|               31217|            8528|  13|
|               31217|             881|  14|
|               31217|             858|  15|
|               31217|            3029|  16|
|               31217|            2097|  17|
|               31217|            2082|  17|
|               31217|            1997|  19|
|         

In [8]:
# NTILE
# The syntax is NTILE(*# of buckets*). In this case, ORDER BY determines which column to use to determine the quartiles (or whatever number of 'tiles you specify).

result = spark.sql('''
   SELECT start_station_number, duration_seconds,
          NTILE(4) OVER
             (PARTITION BY start_station_number ORDER BY duration_seconds) AS quartile,
          NTILE(5) OVER
             (PARTITION BY start_station_number ORDER BY duration_seconds) AS quintile,
          NTILE(100) OVER
             (PARTITION BY start_station_number ORDER BY duration_seconds) AS percentile
  FROM bike_data
  WHERE start_time < '2012-01-08'
   ''')
result.show()

+--------------------+----------------+--------+--------+----------+
|start_station_number|duration_seconds|quartile|quintile|percentile|
+--------------------+----------------+--------+--------+----------+
|               31217|            1000|       1|       1|         1|
|               31217|            1001|       1|       1|         1|
|               31217|            1004|       1|       1|         1|
|               31217|            1004|       1|       1|         1|
|               31217|            1005|       1|       1|         1|
|               31217|            1005|       1|       1|         1|
|               31217|            1006|       1|       1|         1|
|               31217|            1009|       1|       1|         1|
|               31217|            1010|       1|       1|         1|
|               31217|            1011|       1|       1|         1|
|               31217|            1011|       1|       1|         1|
|               31217|            

In [11]:
# LAG and LEAD
# LAG pulls from previous rows and LEAD pulls from following rows and creates new column.

result = spark.sql('''
   SELECT start_station_number, duration_seconds,
          LAG(duration_seconds, 1) OVER
            (PARTITION BY start_station_number ORDER BY duration_seconds) AS lag,
          LEAD(duration_seconds, 1) OVER
            (PARTITION BY start_station_number ORDER BY duration_seconds) AS lead
  FROM bike_data
  WHERE start_time < '2012-01-08'
  ORDER BY 1, 2
   ''')
result.show()

+--------------------+----------------+----+----+
|start_station_number|duration_seconds| lag|lead|
+--------------------+----------------+----+----+
|               31000|             103|null| 127|
|               31000|             127| 103| 128|
|               31000|             128| 127|1327|
|               31000|            1327| 128|1379|
|               31000|            1379|1327|1422|
|               31000|            1422|1379|1465|
|               31000|            1465|1422|1474|
|               31000|            1474|1465|1476|
|               31000|            1476|1474|1491|
|               31000|            1491|1476|1498|
|               31000|            1498|1491|1505|
|               31000|            1505|1498|1536|
|               31000|            1536|1505|1555|
|               31000|            1555|1536|1568|
|               31000|            1568|1555|1586|
|               31000|            1586|1568|1598|
|               31000|            1598|1586|1602|


In [14]:
# LAG and LEAD
# Useful when calculating differences between rows

result = spark.sql('''
   SELECT start_station_number, duration_seconds,
          duration_seconds - LAG(duration_seconds, 1) OVER
            (PARTITION BY start_station_number ORDER BY duration_seconds) AS difference
   FROM bike_data
   WHERE start_time < '2012-01-08'
   ORDER BY 1, 2
   ''')
result.show()

+--------------------+----------------+----------+
|start_station_number|duration_seconds|difference|
+--------------------+----------------+----------+
|               31000|             103|      null|
|               31000|             127|      24.0|
|               31000|             128|       1.0|
|               31000|            1327|    1199.0|
|               31000|            1379|      52.0|
|               31000|            1422|      43.0|
|               31000|            1465|      43.0|
|               31000|            1474|       9.0|
|               31000|            1476|       2.0|
|               31000|            1491|      15.0|
|               31000|            1498|       7.0|
|               31000|            1505|       7.0|
|               31000|            1536|      31.0|
|               31000|            1555|      19.0|
|               31000|            1568|      13.0|
|               31000|            1586|      18.0|
|               31000|         

In [15]:
# LAG and LEAD
# Wrap in outer query to remove nulls, like above.

result = spark.sql('''
   SELECT *
   FROM (
     SELECT start_station_number, duration_seconds,
          duration_seconds - LAG(duration_seconds, 1) OVER
            (PARTITION BY start_station_number ORDER BY duration_seconds) AS difference
     FROM bike_data
     WHERE start_time < '2012-01-08'
     ORDER BY 1, 2
        ) sub
   WHERE sub.difference IS NOT NULL
   ''')
result.show()

+--------------------+----------------+----------+
|start_station_number|duration_seconds|difference|
+--------------------+----------------+----------+
|               31000|             127|      24.0|
|               31000|             128|       1.0|
|               31000|            1327|    1199.0|
|               31000|            1379|      52.0|
|               31000|            1422|      43.0|
|               31000|            1465|      43.0|
|               31000|            1474|       9.0|
|               31000|            1476|       2.0|
|               31000|            1491|      15.0|
|               31000|            1498|       7.0|
|               31000|            1505|       7.0|
|               31000|            1536|      31.0|
|               31000|            1555|      19.0|
|               31000|            1568|      13.0|
|               31000|            1586|      18.0|
|               31000|            1598|      12.0|
|               31000|         

In [17]:
# Window alias
# NTILE example above can be rewritten as:

result = spark.sql('''
   SELECT start_station_number, duration_seconds,
          NTILE(4) OVER ntile_window AS quartile,
          NTILE(5) OVER ntile_window AS quintile,
          NTILE(100) OVER ntile_window AS percentile
  FROM bike_data
  WHERE start_time < '2012-01-08'
  WINDOW ntile_window AS (PARTITION BY start_station_number ORDER BY duration_seconds)
  ORDER BY 1, 2
   ''')
result.show()

+--------------------+----------------+--------+--------+----------+
|start_station_number|duration_seconds|quartile|quintile|percentile|
+--------------------+----------------+--------+--------+----------+
|               31000|             103|       1|       1|         1|
|               31000|             127|       1|       1|         1|
|               31000|             128|       1|       1|         2|
|               31000|            1327|       1|       1|         2|
|               31000|            1379|       1|       1|         3|
|               31000|            1422|       1|       1|         3|
|               31000|            1465|       1|       1|         4|
|               31000|            1474|       1|       1|         4|
|               31000|            1476|       1|       1|         5|
|               31000|            1491|       1|       1|         5|
|               31000|            1498|       1|       1|         6|
|               31000|            