# Fetch first/last occurence of a value 

This notebook shows how to find the first or last desired value in a grouped column without using window functions and report the corresponding value in another column

In this example, let's consider the following data : the speed of 2 cars (2 IDs) is recorded as a function of time. We want to find the first time these cars reach a speed of 4 and the last time a speed of 6.

Two methods are used : 
- Sort column and get first desired value
- Filter by speed and get the min or max of time directly

As expected, using sort is slower.

Also note that depending on the dataset size, the first method may give a wrong answer, maybe due to the sort or the grouping. If using the sort, a good practice may be to groupBy several possible columns.

In [9]:
import time
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Window

In [10]:
fm = get_foundry_manager()
#Create Spark context
sc = fm._sql_context



## Keep only phases 4 & 5

In [11]:
# Sample DataFrame
a = sc.createDataFrame([(1, '10A', 4), 
                        (2, '10A', 4),
                        (3, '10A', 4),
                        (4, '10A', 4),
                        (5, '10A', 5),
                        (6, '10A', 5),
                        (7, '10A', 6),
                        (8, '10A', 7),
                        (9, '58R', 3),
                        (10, '58R', 4),
                        (11, '58R', 4),
                        (12, '58R', 5),
                        (13, '58R', 5),
                        (14, '58R', 5),
                        (15, '58R', 5),
                        (16, '58R', 6)]
                       , ['Time', 'ID', 'Speed'])
a.show()

+----+---+-----+
|Time| ID|Speed|
+----+---+-----+
|   1|10A|    4|
|   2|10A|    4|
|   3|10A|    4|
|   4|10A|    4|
|   5|10A|    5|
|   6|10A|    5|
|   7|10A|    6|
|   8|10A|    7|
|   9|58R|    3|
|  10|58R|    4|
|  11|58R|    4|
|  12|58R|    5|
|  13|58R|    5|
|  14|58R|    5|
|  15|58R|    5|
|  16|58R|    6|
+----+---+-----+



In [12]:
# All starts grouped by ID
t_init = time.time()
starts = a.filter(a["Speed"] == 4.0)\
                .sort('Time')\
                .groupBy("ID")\
                .agg(F.first("Time").alias("First_Speed_4"))                
starts.show()
print("Execution time with sort: {0} s".format(time.time()-t_init))

t_init = time.time()

starts = a.filter(a["Speed"] == 4.0)\
                .groupBy("ID")\
                .agg(F.min("Time").alias("First_Speed_4"))
starts.show()
print("Execution time without sort: {0} s".format(time.time()-t_init))

# All stops grouped by ID and MSN
t_init = time.time()
stops = a.filter(a["Speed"] == 6.0)\
                .sort('Time')\
                .groupBy("ID")\
                .agg(F.last("Time").alias("Last_Speed_6"))                
stops.show()
print("Execution time with sort: {0} s".format(time.time()-t_init))
t_init = time.time()
stops = a.filter(a["Speed"] == 6.0)\
                .groupBy("ID")\
                .agg(F.max("Time").alias("Last_Speed_6"))                
stops.show()
print("Execution time without sort: {0} s".format(time.time()-t_init))

+---+-------------+
| ID|First_Speed_4|
+---+-------------+
|10A|            1|
|58R|           10|
+---+-------------+

Execution time with sort: 1.59730792046 s
+---+-------------+
| ID|First_Speed_4|
+---+-------------+
|10A|            1|
|58R|           10|
+---+-------------+

Execution time without sort: 0.929090976715 s
+---+------------+
| ID|Last_Speed_6|
+---+------------+
|10A|           7|
|58R|          16|
+---+------------+

Execution time with sort: 1.48199009895 s
+---+------------+
| ID|Last_Speed_6|
+---+------------+
|10A|           7|
|58R|          16|
+---+------------+

Execution time without sort: 1.10506987572 s
