### Helpful Flink Resources

* [Flink SQL API Examples. Common SQL query patterns. Windows, aggregation ...][1]
* [AWS Online Tech Talks (YouTube). Streaming Analytics via. Kinesis Data Analytics Studio][2]
* [Flink on Zeppelin 13 part YouTube Series][3]
* [Flink on Zeppeling - Streaming Blog Post][4]
* [Flink interpreter for Apache Zeppelin (Flink Documenation)][5]
* [Flink SQL Documentation 1.13][6]
* [Flink Kinesis Connector][7]

[1]:https://awsfeed.com/whats-new/big-data/get-started-with-flink-sql-apis-in-amazon-kinesis-data-analytics-studio
[2]:https://www.youtube.com/watch?v=mRDst424mKY
[3]:https://www.youtube.com/watch?v=YxPo0Fosjjg&list=PL4oy12nnS7FFtg3KV1iS5vDb0pTz12VcX
[4]:https://zjffdu.medium.com/flink-on-zeppelin-part-3-streaming-5fca1e16754
[5]:http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html#streamexecutionenvironment-executionenvironment-streamtableenvir
[6]:https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/sql/
[7]:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kinesis/

### [Flink's Zeppelin Interpreters][1] 

| Name | Class | Description
| --- | ----------- | --------|
| %flink | FlinkInterpreter | Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment
| %flink.pyflink | PyFlinkInterpreter | Provides a python environment
| %flink.ipyflink | IPyFlinkInterpreter | Provides an ipython environment
| %flink.ssql | FlinkStreamSqlInterpreter | Provides a stream sql environment
| %flink.bsql | FlinkBatchSqlInterpreter | Provides a batch sql environment

[1]:http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html#overview

### Create Table in Glue Data Catalog

* [Flink Supported Data Types][1]

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/

In [3]:
%flink.ssql

DROP TABLE IF EXISTS yellow_cab_1v13;

CREATE TABLE yellow_cab_1v13 (
   `VendorID` INT,
   `tpep_pickup_datetime` TIMESTAMP(3),
   `tpep_dropoff_datetime` TIMESTAMP(3),
   `passenger_count` INT,
   `trip_distance` FLOAT,
   `RatecodeID` INT,
   `store_and_fwd_flag` STRING,
   `PULocationID` INT,
   `DOLocationID` INT,
   `payment_type` INT,
   `fare_amount` FLOAT,
   `extra` FLOAT,
   `mta_tax` FLOAT,
   `tip_amount` FLOAT,
   `tolls_amount` FLOAT,
   `improvement_surcharge` FLOAT,
   `total_amount` FLOAT,
   `congestion_surcharge` FLOAT,
   `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
   `shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
   `sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL,
   WATERMARK FOR tpep_dropoff_datetime AS tpep_dropoff_datetime - INTERVAL '5' SECOND
) 
 WITH (
   'connector' = 'kinesis',
   'stream' = 'yellow-cab-trip',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'
);

In [4]:
%flink.ssql

-- SHOW CATALOGS;
-- SHOW DATABASES;
-- SHOW TABLES;

DESCRIBE yellow_cab_1v13;

### Select all Fields from the yellow-cab-trip stream

In [6]:
%flink.ssql(type=update)

SELECT * FROM yellow_cab_1v13;

### Filter

In [8]:
%flink.ssql(type=update)

SELECT * FROM yellow_cab_1v13 WHERE trip_distance > 3.0 

### User Defined Function

In [10]:
%flink.pyflink

class PythonLower(ScalarFunction):
  def eval(self, s):
    return s.lower()

bt_env.register_function("python_lower", udf(PythonLower(), DataTypes.STRING(), DataTypes.STRING()))

In [11]:
%flink.ssql(type=update)

SELECT store_and_fwd_flag, python_lower(store_and_fwd_flag) as lower_store_and_fwd_flag FROM yellow_cab_1v13


### Window

* [Window Aggregation][1]
* [Different Types of Windows][2]

[1]:https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/
[2]:https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/windows/


In [13]:
%flink.ssql(type=update)

SELECT AVG(trip_distance) as avg_trip_distance FROM yellow_cab_1v13 GROUP BY TUMBLE(tpep_dropoff_datetime, INTERVAL '10' SECOND)

In [14]:
%flink.ssql(type=update)

SELECT AVG(fare_amount) as avg_fair_amount, VendorID FROM yellow_cab_1v13 GROUP BY TUMBLE(tpep_dropoff_datetime, INTERVAL '10' SECOND), VendorID

### Join

***Note*** before completing this next section ensure that you upload the [vendor_ref.csv][1] to an S3 bucket and update the ``` 'path' = 's3://yellowcabsharkech/reference_data/vendor_ref.csv' ``` in the paragraph below with the path of the CSV [vendor_ref.csv][1] file you uploaded.

[1]:https://github.com/ev2900/Flink_Kinesis_Data_Analytics/blob/main/data/vendor_ref.csv

In [16]:
%flink.ssql(type=update)

DROP TABLE IF EXISTS vendor_details_1v13;

CREATE TABLE vendor_details_1v13 (
    `VendorID` INT,
    `VendorName` STRING
) WITH (
   'connector'='filesystem',
   'path' = 's3://yellowcabsharkech/reference_data/vendor_ref.csv',
   'format' = 'csv'
)

In [17]:
%flink.ssql(type=update)

SELECT vendor_details_1v13.VendorName, yellow_cab_1v13.passenger_count, yellow_cab_1v13.trip_distance, yellow_cab_1v13.fare_amount FROM yellow_cab_1v13 
JOIN vendor_details_1v13
ON yellow_cab_1v13.VendorID = vendor_details_1v13.VendorID


### Write Data to S3

In [19]:
%flink.ssql(type=update)

DROP TABLE IF EXISTS S3_yellow_cab_1v13;

CREATE TABLE S3_yellow_cab_1v13 (
   `VendorID` INT,
   `tpep_pickup_datetime` TIMESTAMP,
   `tpep_dropoff_datetime` TIMESTAMP,
   `passenger_count` INT,
   `trip_distance` FLOAT,
   `RatecodeID` INT,
   `store_and_fwd_flag` STRING,
   `PULocationID` INT,
   `DOLocationID` INT,
   `payment_type` INT,
   `fare_amount` FLOAT,
   `extra` FLOAT,
   `mta_tax` FLOAT,
   `tip_amount` FLOAT,
   `tolls_amount` FLOAT,
   `improvement_surcharge` FLOAT,
   `total_amount` FLOAT,
   `congestion_surcharge` FLOAT,
   `arrival_time` TIMESTAMP(3),
   `shard_id` VARCHAR(128),
   `sequence_number` VARCHAR(128)
)
PARTITIONED BY (VendorID)
WITH (
   'connector'='filesystem',
   'path' = 's3://yellowcabsharkech/sql_output_1v13',
   'format' = 'csv',
   'sink.partition-commit.policy.kind'='success-file',
   'sink.partition-commit.delay' = '1 min'
)

In [20]:
%flink.pyflink

st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"    
)

st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"    
)

In [21]:
%flink.ssql(type=update)

INSERT INTO S3_yellow_cab_1v13 SELECT * FROM yellow_cab_1v13

### Start (restart) processing the yellow-cab-trip data stream from the begining of the kinesis data stream

```
'scan.stream.initpos' = 'TRIM_HORIZON'
```

in the create table statement. Reference [start reading position for more information][1]

[1]:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kinesis/#start-reading-position

In [23]:
%flink.ssql

DROP TABLE IF EXISTS yellow_cab_1v13_Restart;

CREATE TABLE yellow_cab_1v13_Restart (
   `VendorID` INT,
   `tpep_pickup_datetime` TIMESTAMP(3),
   `tpep_dropoff_datetime` TIMESTAMP(3),
   `passenger_count` INT,
   `trip_distance` FLOAT,
   `RatecodeID` INT,
   `store_and_fwd_flag` STRING,
   `PULocationID` INT,
   `DOLocationID` INT,
   `payment_type` INT,
   `fare_amount` FLOAT,
   `extra` FLOAT,
   `mta_tax` FLOAT,
   `tip_amount` FLOAT,
   `tolls_amount` FLOAT,
   `improvement_surcharge` FLOAT,
   `total_amount` FLOAT,
   `congestion_surcharge` FLOAT,
   `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
   `shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
   `sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL,
   WATERMARK FOR tpep_dropoff_datetime AS tpep_dropoff_datetime - INTERVAL '5' SECOND
) 
 WITH (
   'connector' = 'kinesis',
   'stream' = 'yellow-cab-trip',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'TRIM_HORIZON',
   'format' = 'json'
);

In [24]:
%flink.ssql(type=update)

SELECT * FROM yellow_cab_1v13_Restart;