## Flink Sql

-- For Quick Demo follow the steps in this [link](https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html)

### Tutorial

This tutorial is covering the trainning presented by Vervica 

You can find it in the Github from this [link](https://github.com/ververica/sql-training/wiki/Introduction-to-SQL-on-Flink)

### Setting Up The Environment

1. Install Docker from [link](https://www.docker.com/)
2. Make directory `mkdir Vervica`
4. Download YAML file from git by writing this command in the terminal
`git clone https://github.com/ververica/sql-training`
3. Change directory `cd sql-training`


5. Then start the trainning environment 

    1. Mac and Linux : `docker-compose up -d`
    2. Windows : `set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d`
6. Make sure that all the contatinrs are up and running 
`docker ps`




### Entering the SQL CLI client

`docker-compose exec sql-client ./sql-client.sh`

### Intrduction to Flink SQL

https://github.com/ververica/sql-training/wiki/Introduction-to-SQL-on-Flink


1. Listing All predefined Tables `SHOW TABLES;`
2. Describe Tables `DESCRIBE RIDES`

##### Rides
We will use the Rides source table for all of the exercises. It contains information about taxi rides that took place in New York City in the beginning of 2013. Each ride is represented by two event records, a start ride event and an end ride event.

The schema of the Rides table is as follows:
```
- rideId: BIGINT       // the unique id of a ride (note, Rides contains two records per ride)
- taxiId: BIGINT       // the unique id of the taxi
- isStart: BOOLEAN     // flag for pick-up (true) or drop-off (false) event
- lon: FLOAT           // the longitude of the pick-up or drop-off location
- lat: FLOAT           // the latitude of the pick-up or drop-off location
- rideTime: TIMESTAMP  // the time of the pick-up or drop-off event
- psgCnt: INT          // the number of passengers on the ride
```

#### Fares

The Fares source table contains information about about the fares paid for the taxi rides. The table contains one record for each taxi ride.

The schema of the Fares table is as follows:
```
rideId: BIGINT      // the unique id of the ride
payTime: TIMESTAMP  // the time when the payment was made (same as timestamp of ride end event in Rides table)
payMethod: VARCHAR  // the method of payment (CSH, CRD, DIS, NOC, UNK)
tip: FLOAT          // the amount of paid tip
toll: FLOAT         // the amount of paid toll
fare: FLOAT         // the amount of paid fare
```

#### DriverChanges
The DriverChanges table contains one record for event when a taxi is driven by another driver than before, i.e., when the driver of a taxi changes. This might happen when a driver starts a new shift.

The schema of the DriverChanges table is as follows:

```
taxiId: BIGINT             // the unique id of the taxi
driverId: BIGINT           // the unique id of the driver who starts using the taxi
usageStartTime: TIMESTAMP  // the time when the driver starts using the taxi
```

# Explore the data

`SELECT * FROM Rides;`

``` bash
rideId        taxiId   isStart           lon           lat                  rideTime   psgCnt
150156    2013003948     false     -73.98211      40.74796     2013-01-01 06:49:26.0        2
150538    2013003570     false    -74.004684      40.72859     2013-01-01 06:49:26.0        1
151066    2013005078      true     -73.97712     40.752007     2013-01-01 06:49:26.0        1
147794    2013010015     false     -73.87098     40.774143     2013-01-01 06:49:27.0        1
148680    2013003578     false     -73.96466     40.680794     2013-01-01 06:49:27.0        1
151067    2013002010      true    -73.992256     40.750004     2013-01-01 06:49:27.0        2
```

You can leave the result visualization mode by pressing `q`. The mode provides more functionality, such as skipping through pages or in/decreasing the update rate, as shown at the bottom.


### Explore Predefined User Functions

`SHOW FUNCTIONS;`

- To k now more about UDF , go to this [link](https://nightlies.apache.org/flink/flink-docs-release-1.4/dev/table/udfs.html) 

```bash

abs
acos
and
array
as
asc
asin
at
atan
atan2
avg
between
...
```

#### User-Defined Functions
Four user-defined functions for the training isInNYC, toAreaId, toCoords, and Drivers.
##### <small>`isInNYC(lon: FLOAT, lat: FLOAT): BOOLEAN`</small>
<small>- **Description**: Checks if a location is within the NYC area.</small>

##### <small>`toAreaId(lon: FLOAT, lat: FLOAT): INT`</small>
<small>- **Description**: Maps a location (longitude, latitude) to an area id that represents a cell of approximately 100x100 meters size.</small>

##### <small>`toCoords(areaId: INT): [lon: FLOAT, lat: FLOAT]`</small>
<small>- **Description**: Reverse method of `toAreaId` to compute the longitude and latitude of the center of an area cell.</small>

##### <small>`Drivers(ts: TIMESTAMP): Table(taxiId: BIGINT, driverId: BIGINT, usageStartTime: TIMESTAMP)`</small>
<small>- **Description**:  
  - A **Temporal Table Function** that returns for a timestamp `ts` the most recent driver for every taxi.  
  - This function provides a table with the following columns:  
    - `taxiId: BIGINT`  
    - `driverId: BIGINT`  
    - `usageStartTime: TIMESTAMP`  
  - **Note**: This function will be discussed further in the context of joins.</small>


# Exercises


#### Find a Particular Ride

Write a query that outputs the start and end event of ride 123.
 


```
isStart                  rideTime
   true     2013-01-01 00:01:00.0
  false     2013-01-01 00:07:00.0
```

The query filters by the ride id.


<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT isStart, rideTime FROM Rides WHERE rideId=123;
```

----
</details>

### Cleanse the Rides

The task of this exercise is to cleanse the table of ride events by removing events that do not start or end in New York City.
hint:The query filters by using the UDF `isInNYC`.

```
rideId           taxiId          isStart              lon              lat                  rideTime    psgCnt
     1       2013000001             true        -73.99078         40.76088     2013-01-01 00:00:00.0         1
     2       2013000002             true       -73.978325         40.77809     2013-01-01 00:00:00.0         5
     3       2013000003             true        -73.98962         40.72999     2013-01-01 00:00:00.0         1
     4       2013000004             true       -73.981575         40.76763     2013-01-01 00:00:00.0         2
     5       2013000005             true        -74.00053        40.737343     2013-01-01 00:00:00.0         4
     6       2013000006             true       -73.866135         40.77109     2013-01-01 00:00:00.0         6
     7       2013000007             true        -74.00693        40.740765     2013-01-01 00:00:00.0         6
     8       2013000008             true       -73.955925        40.781887     2013-01-01 00:00:00.0         3
     9       2013000009             true        -73.99988        40.743343     2013-01-01 00:00:00.0         1
    10       2013000010             true       -73.989845         40.75804     2013-01-01 00:00:00.0         3
    11       2013000011             true       -73.870834         40.77377     2013-01-01 00:00:00.0         1
```

<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT * FROM Rides WHERE isInNYC(lon, lat);
```

The query filters by using the UDF `isInNYC`.

----
</details>

## Types of Operators
##### Stateless Operators
- Projection (`SELECT`)
- Filter (`WHERE`)
##### Materializing (Stateful) Operators
- Aggregation (`GROUP BY`)
- Joins
##### Temporal Operators
- Windowed Aggregations
  - (`GROUP BY`, `OVER`, window table-valued functions)
- Time-based Joins
  - (interval joins, temporal table joins)
- Pattern Matching
  - (`MATCH_RECOGNIZE`)


# Querying Dynamic Tables with SQL


### Ride Count per Number of Passengers

For this exercise you should compute the number of rides per number of passengers, i.e., determine how many rides happened with 1, 2, 3, ... passengers.

We are only interested in counts for rides that started in New York City.

hint: Each ride is represented by two events. Filter out all end events for accurate counts.



```
psgCnt         cnt
     4        6051
     2       31063
     3       10812
     6        5408
     5       11031
     1      100367
```

<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT
  psgCnt,
  COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat) AND isStart
GROUP BY
  psgCnt;
```

----
</details>

### Ride Count per Area and Hour of Day

In this exercise we want to count the total number of arriving and departing rides per area for every hour of a day, i.e., the total number of rides per area that happen from 12pm to 1am, 1am to 2am, and so on. Hence, we do not want a separate count for every day but the combined counts of all days for every hour.

We are only interested in events that start or end in New York City. To limit the size of the result, return only areas with counts that include more than 60 events.

<summary>hints:</summary>

* Use the built-in `HOUR` function to extract the hour of a timestamp.
* Use the provided `toAreaId` to convert coordinates to an area id.



```
area       isStart     hourOfDay        cnt
49551      false       0                85
49789       true       0                64
48806       true       0                75
50044      false       0                62
52543       true       0                67
49792      false       1                77
48559       true       1               114
48808       true       1               100
```

<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT
  toAreaId(lon, lat) AS area,
  isStart,
  HOUR(rideTime) AS hourOfDay,
  COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY
  toAreaId(lon, lat),
  isStart,
  HOUR(rideTime)
HAVING COUNT(*) > 60;
```

The query is a typical `SELECT FROM GROUP BY` query. After filtering by rides in New York City, the result is grouped by area id, `isStart` flag, and hour of the day (computed by the `HOUR(timestamp)` function). By using the `HAVING` clause, we only return counts that are greater than 60.

----
</details>

# GROUP BY windows aggregation


### Windowed Ride Count

For continuously determining the city's taxi traffic situation, count the number of arriving and departing rides per area in a window of 5 minutes.

We are only interested in events that start or end in New York City and areas with at least 5 arriving or departing rides.
<summary> hint</summary>

* Use the provided `toAreaId` to convert coordinates to an area id.

The `t` column represents the end of every 5-minute window.


```
area         isStart      t                             cnt
49282        true         2013-01-01 00:05:00.0           6
45881        true         2013-01-01 00:05:00.0           8
51781        true         2013-01-01 00:05:00.0           8
49551        true         2013-01-01 00:05:00.0           7
48540        true         2013-01-01 00:10:00.0           6
51795        true         2013-01-01 00:10:00.0           6
47550        true         2013-01-01 00:10:00.0           6
54285        true         2013-01-01 00:10:00.0           8
51781        true         2013-01-01 00:10:00.0          17
45548        true         2013-01-01 00:10:00.0          14
```

<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT
  toAreaId(lon, lat) AS area,
  isStart,
  TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS t,
  COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY
  toAreaId(lon, lat),
  isStart,
  TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;
```

The query filters out events that do not start in New York City. It uses a tumbling window of 5 minutes on the `rideTime` time attribute. The result is grouped by the area id, the `isStart` flag, and the tumbling window. For every group, we return the area id, the `isStart` flag, the end boundary of the window, and the aggregated count. We only return counts that are equal or greater than 5.

# Joining Dynamic Tables


### Interval Joins (Average Tip per Hour of Day)

Compute the average tip per hour of day and number of passengers if the payTime attribute is at most 5 minutes earlier than the rideTime




<summary> hints</summary>

* The tip paid for a ride is contained in the `Fares` table.
* The payment event is expected to happen at most 5 minutes before the trip ends. Hence, the `payTime` timestamp of a `Fares` record is at most 5 minutes earlier and not later than the `rideTime` of the corresponding ride's end event.
* Use the built-in `HOUR` function to extract the hour of a timestamp. 


```
hourOfDay             avgTip
        0          0.9319049
        1           1.100541
        2          1.1744025
        3          1.2137822
        4          1.1707343
        5          1.1629586
        6          1.1505183
```

<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT
  HOUR(r.rideTime) AS hourOfDay,
  r.psgCnt,
  AVG(f.tip) AS avgTip
FROM
  Rides r,
  Fares f
WHERE 
  r.rideId = f.rideId AND
  NOT r.isStart AND
  f.payTime BETWEEN r.rideTime - INTERVAL '5' MINUTE AND r.rideTime
GROUP BY
  HOUR(r.rideTime), r.psgCnt;
```

The query joins ride end events of the `Rides` table with payment events of the `Fares` table on the `rideId` key and their timestamps. The interval join condition joins a payment and a ride event if the `payTime` attribute is at most 5 minutes earlier than the `rideTime` attribute. After both tables are joined, the query groups by the hour of day (computed with the `HOUR` function) and computes the average tip.

### Compute the Ride Duration

In this exercise we want to compute the duration of every taxi ride, i.e., the time between its start and end event, in minutes. This means that we need to join start event and end event based on the ride id.

We are only interested in rides that start and end in New York City and take less than two hours.

<summary> hints:</summary>

* Filter the `Rides` table to separate start and end events.
* Use an interval join to associate the start and end events of a ride. Only join an end event if it arrives within 2 hours after a start event has arrived.
* Use the built-in function [`TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)`](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#temporal-functions) to compute the difference of two timestamps in minutes.



```
rideId        durationMin
 52693                 13
 46868                 24
 53226                 12
 53629                 11
 55651                  7
 43220                 31
 53082                 12
 54716                  9
 55125                  9
 57211                  4
 44795                 28
 53563                 12
```

<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT
  s.rideId,
  TIMESTAMPDIFF(MINUTE, s.rideTime, e.rideTime) AS durationMin
FROM
  (SELECT * FROM Rides WHERE isStart AND isInNYC(lon, lat)) s,
  (SELECT * FROM Rides WHERE NOT isStart AND isInNYC(lon, lat)) e
WHERE
  s.rideId = e.rideId AND
  e.rideTime BETWEEN s.rideTime AND s.rideTime + INTERVAL '2' HOUR;
```


</details>



This query performs an interval join between two subqueries. The first subquery `s` returns the start events within New York City. The second subquery `e` returns the end events within New York City.

Both tables are joined on the `rideId` column. Additionally, the `WHERE` clause specifies time constraints between an end event and a start event. This ensures to only join two taxi rides if the end event arrives within 2 hours after the start event.

The resulting `durationMin` column contains the duration between both timestamps in minutes.


### Temporal Table Joins

A temporal table join joins a streaming table with a temporal table. A temporal table maintains the history of a table and is able to return the table's rows for a specific point in time. For each record of the streaming table, the join looks up the rows of the version of the temporal table that corresponds to the timestamp of current record and joins them according to additional join predicates.

Due to the temporal condition, the join operator only holds the relevant history of the temporal table in state and does not store the streaming table at all.



### Identify Drivers with many Passengers

- Identify all drivers who served in 15 minutes at least 10 passengers.

- The temporal table function `Drivers` gives provides the driver that operated a taxi at a specific point in time..



```
  driverId                srvdPsgCnt                         t
2013000155                        12     2013-01-01 00:00:00.0
2013000233                        12     2013-01-01 00:00:00.0
2013000230                        31     2013-01-01 00:00:00.0
2013001174                        12     2013-01-01 00:00:00.0
2013000014                        12     2013-01-01 00:00:00.0
2013000595                        12     2013-01-01 00:00:00.0
2013002453                        12     2013-01-01 00:00:00.0
2013000124                        12     2013-01-01 00:00:00.0
2013000117                        18     2013-01-01 00:00:00.0
```

<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT 
  d.driverId, 
  SUM(r.psgCnt) AS srvdPsgCnt,
  TUMBLE_START(r.rideTime, INTERVAL '15' MINUTE) AS t
FROM
  Rides r, 
  LATERAL TABLE(Drivers(r.rideTime)) d
WHERE
  r.taxiId = d.taxiId AND
  r.isStart
GROUP BY
  d.driverId,
  TUMBLE(r.rideTime, INTERVAL '15' MINUTE)
HAVING SUM(r.psgCnt) >= 10;
```
</details>


The query joins each ride start event of the `Rides` table with the temporal `Drivers` table and enriches it with the driver who was driving the taxi when the ride started. Subsequently, the query groups the enriched data by the `driverId` and a 15 minute window and computes the number of served passengers. The final `HAVING` clause removes all drivers who did not serve at least 10 passengers.

# Pattern Matching with MATCH_RECOGNIZE


### Compute the Ride Duration

In this exercise we want to compute the duration of every taxi ride, i.e., the time between its start and end event, in minutes. This means that we need to look for the pattern of a start event and end event based on the ride id.

**Note:** This is exactly the same exercise as in the join exercises, but this time we want to solve it with MATCH_RECOGNIZE.


<summary>hints:</summary>

* We are looking for a pattern per `rideId`.
* The pattern consists of two types of events a start event followed by an end event.
* Use the built-in function [`TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)`](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#temporal-functions) to compute the difference of two timestamps in minutes.

```
rideId        durationMin
 52693                 13
 46868                 24
 53226                 12
 53629                 11
 55651                  7
 43220                 31
 53082                 12
 54716                  9
 55125                  9
 57211                  4
 44795                 28
 53563                 12

```

<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT rideId, TIMESTAMPDIFF(MINUTE, startT, endT) AS durationMin
FROM Rides
MATCH_RECOGNIZE (
  PARTITION BY rideId
  ORDER BY rideTime
  MEASURES 
    S.rideTime AS startT, 
    E.rideTime AS endT
  AFTER MATCH SKIP PAST LAST ROW
  PATTERN (S E)
  DEFINE
    S AS S.isStart,
    E AS NOT E.isStart
);
```

This query matches start and end events of the same ride, i.e., that have the same ride id. By partitioning the table on rideId, only rides with the same id are processed together. Events are then distinguished into a start event S and an end event E and the pattern to match is defined as (S E), i.e., exactly one S followed by exactly one E. Finally, we emit for each match, the rideId and the timestamps of the start and end events, and compute the ride duration in the SELECT clause using the TIMESTAMPDIFF function.



### Rides and Rest

Use the MATCH_RECOGNIZE clause to detect a pattern of:

- a start event,
- an arbitrary number of potential intermediate events for the same taxi but from different rides,
- an end event,
- and the next start event.


Compute the resting times in minutes.

<summary> hints:</summary>

* Use a `AFTER MATCH SKIP TO LAST variable` strategy to include the last start event in the next pattern matching.
* Use the built-in `TIMESTAMPDIFF` function to calculate the difference in minutes.

```
                   taxiId                ride_start                  ride_end           next_ride_start           minutes_of_rest
                2013000002     2013-01-01 00:00:00.0     2013-01-01 00:06:00.0     2013-01-01 00:16:00.0                        10
                2013000004     2013-01-01 00:00:00.0     2013-01-01 00:08:00.0     2013-01-01 00:13:00.0                         5
                2013000032     2013-01-01 00:00:00.0     2013-01-01 00:05:00.0     2013-01-01 00:06:00.0                         1
                2013000128     2013-01-01 00:01:00.0     2013-01-01 00:04:00.0     2013-01-01 00:12:00.0                         8
                2013000256     2013-01-01 00:02:00.0     2013-01-01 00:10:00.0     2013-01-01 00:10:00.0                         0
                2013000512     2013-01-01 00:03:25.0     2013-01-01 00:04:51.0     2013-01-01 00:10:00.0                         5
                2013000512     2013-01-01 00:10:00.0     2013-01-01 00:13:31.0     2013-01-01 00:14:19.0                         0
                2013001028     2013-01-01 00:05:52.0     2013-01-01 00:13:20.0     2013-01-01 00:14:12.0                         0
                2013000258     2013-01-01 00:02:00.0     2013-01-01 00:08:00.0     2013-01-01 00:11:00.0                         3
                2013002070     2013-01-01 00:08:57.0     2013-01-01 00:12:26.0     2013-01-01 00:13:46.0                         1
```

<details>
<summary>Click to see the solution.</summary>

----
```sql
SELECT * FROM Rides
MATCH_RECOGNIZE(
  PARTITION BY taxiId
  ORDER BY rideTime
  MEASURES
    START_RIDE.rideTime AS ride_start,
    END_RIDE.rideTime AS ride_end,
    NEXT_RIDE.rideTime AS next_ride_start,
    TIMESTAMPDIFF(MINUTE, END_RIDE.rideTime, NEXT_RIDE.rideTime) AS minutes_of_rest
  AFTER MATCH SKIP TO LAST NEXT_RIDE
  PATTERN (START_RIDE M* END_RIDE NEXT_RIDE)
  DEFINE
    START_RIDE AS START_RIDE.isStart = true,
    M AS M.rideId <> START_RIDE.rideId,
    END_RIDE AS END_RIDE.isStart = false,
    NEXT_RIDE AS NEXT_RIDE.isStart = true
);
```
</details>

The query matches the pattern mentioned above. The variable `START_RIDE` detects the start event. `M` defines a greedy set of ride events that don't belong to the same ride of the start event. The `END_RIDE` detects the end event. The `NEXT_RIDE` variable defines the following start event for computing the resting time. Once a match has been detected, we measure the difference in minutes and return it.