In [4]:
from pyflink.table import (
    EnvironmentSettings,
    TableEnvironment,
    TableDescriptor,
    Schema,
    DataTypes,
    FormatDescriptor,
    EnvironmentSettings
)
from pyflink.table.window import Tumble,Slide, Over
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import expressions as expr
from pyflink.table.expressions import col,call, lit
from pyflink.table.udf import udf

In [5]:
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)    # Point to the SQL Kafka connector JAR
kafka_connector_jar = "file:///Users/gerardomartinez/DataEng/alpaca/flink-connectors/flink-sql-connector-kafka-3.3.0.jar"

t_env.get_config().set("pipeline.jars", kafka_connector_jar)

dockerC = "localhost:9092"

# # Define the Kafka source table
t_env.create_temporary_table(
    'bitcoin_table4',
    TableDescriptor.for_connector('kafka')
        .schema(Schema.new_builder()
                .column('name_coin', DataTypes.STRING())
                .column('timestamp', DataTypes.TIMESTAMP(3))
                .watermark('timestamp', "`timestamp` - INTERVAL '5' SECOND")
                .column('open', DataTypes.FLOAT())
                .column('high', DataTypes.FLOAT())
                .column('low', DataTypes.FLOAT())
                .column('close', DataTypes.FLOAT()) 
                .column('volume', DataTypes.FLOAT())  
                .column('current_time', DataTypes.TIMESTAMP(3))
                .column_by_expression('start_of_hour', expr.call_sql("CAST(FLOOR(`timestamp` TO HOUR) AS TIMESTAMP(3))"))     
                .column_by_expression("proc_time", "PROCTIME()")           
                .build())
        .option('topic', 'bitcoin_price_2')
        .option('properties.bootstrap.servers', dockerC)
        .option('properties.group.id', 'transaction_group')
        .option('scan.startup.mode', 'earliest-offset')
        .format(FormatDescriptor.for_format('json')
                .option('fail-on-missing-field', 'false')
                .option('ignore-parse-errors', 'true')
                .build())
        .build())

t_env.create_temporary_table(
        'bitcoin_sink',
        TableDescriptor.for_connector('kafka')
                .schema(Schema.new_builder()
                        .column('name_coin', DataTypes.STRING())
                        .column('open', DataTypes.FLOAT())
                        .build())
                .option('topic', 'bitcoin_summary')
                .option('properties.bootstrap.servers', dockerC)
                .format(FormatDescriptor.for_format('json')
                        .build())
                .build())


source_table = t_env.from_path('bitcoin_table4') #.add_columns(expr.lit("Princeton").alias("city")).add_columns(col('current').get('temperature_2m').alias('temp'))


In [8]:
source_table.print_schema()

(
  `name_coin` STRING,
  `timestamp` TIMESTAMP(3) *ROWTIME*,
  `open` FLOAT,
  `high` FLOAT,
  `low` FLOAT,
  `close` FLOAT,
  `volume` FLOAT,
  `current_time` TIMESTAMP(3),
  `start_of_hour` TIMESTAMP(3) AS CAST(FLOOR(`timestamp` TO HOUR) AS TIMESTAMP(3)),
  `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(),
  WATERMARK FOR `timestamp`: TIMESTAMP(3) AS `timestamp` - INTERVAL '5' SECOND
)


In [7]:
table = t_env.from_path("bitcoin_table4")

In [9]:
table.select(col("name_coin")).execute().print()

+----+--------------------------------+
| op |                      name_coin |
+----+--------------------------------+
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |
| +I |                        BTC/USD |


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/gerardomartinez/DataEng/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/gerardomartinez/DataEng/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1217, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/gerardomartinez/.pyenv/versions/3.10.4/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

| +I |                        BTC/USD | 2025-02-07 09:27:00.000 |                         <NULL> |


In [5]:
t_env.create_temporary_view("aggregated_view", aggregated)

NameError: name 'aggregated' is not defined

In [7]:
aggregated = t_env.sql_query("""
    SELECT
        name_coin,
        TUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS current_minute,
        AVG(`open`) AS avg_open,
        tiny_time as PROCTIME()
    FROM bitcoin_table4
    GROUP BY
        name_coin,
        TUMBLE(proc_time, INTERVAL '1' MINUTE),
        tiny_time
""")

t_env.create_temporary_view("aggregated_view", aggregated)

Py4JJavaError: An error occurred while calling o8.sqlQuery.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "(" at line 6, column 30.
Was expecting one of:
    <EOF> 
    "EXCEPT" ...
    "FETCH" ...
    "FROM" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
    "MINUS" ...
    "UNION" ...
    ";" ...
    "," ...
    
	at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:1575)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "(" at line 6, column 30.
Was expecting one of:
    <EOF> 
    "EXCEPT" ...
    "FETCH" ...
    "FROM" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
    "MINUS" ...
    "UNION" ...
    ";" ...
    "," ...
    
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:539)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:292)
	at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:156)
	at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:211)
	at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:76)
	... 11 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 6, column 30.
Was expecting one of:
    <EOF> 
    "EXCEPT" ...
    "FETCH" ...
    "FROM" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
    "MINUS" ...
    "UNION" ...
    ";" ...
    "," ...
    
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:50082)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:49890)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:3706)
	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:348)
	at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:209)
	... 12 more


In [6]:
t_env.from_path("aggregated_view").print_schema()

(
  `name_coin` STRING,
  `current_minute` TIMESTAMP(3) NOT NULL,
  `avg_open` FLOAT,
  `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*
)


In [8]:
result = t_env.execute_sql("""
    SELECT 
        name_coin,
        current_minute,
        avg_open,
        LAG(avg_open, 1) OVER (
            PARTITION BY name_coin 
            ORDER BY proc_time
        ) AS prev_avg_open,
        (avg_open - LAG(avg_open, 1) OVER (
            PARTITION BY name_coin 
            ORDER BY proc_time
        )) AS diff_avg_open
    FROM aggregated_view
""")

# Collect or print the result
result_table = result.print_schema()
result_table.print()

TableException: org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:175)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1133)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:1575)


In [12]:
revenue = source_table \
    .window(Tumble.over(expr.lit(1).minute).on(col("proc_time")).alias("w"))\
    .group_by(col('name_coin'),col('w'))\
    .select(
        col('name_coin'),
        col('w').start.alias('rounded_minute'), 
        call("avg", col("open")).alias('avg_open')
)

In [13]:
t_env.list_tables()

['bitcoin_sink', 'bitcoin_table4']

In [13]:
t_env.execute_sql(
    """
      SELECT 
        name_coin,
        `timestamp`,
        TUMBLE_START(`timestamp`, INTERVAL '1' MINUTE) AS rounded_minute,
        AVG(`open`) AS avg_open
    FROM bitcoin_table4
    GROUP BY 
        name_coin,
        `timestamp`,
        TUMBLE(`timestamp`, INTERVAL '1' MINUTE)
    """
).print()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/gerardomartinez/DataEng/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/gerardomartinez/DataEng/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1217, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/gerardomartinez/.pyenv/versions/3.10.4/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [11]:
t_env.execute_sql("""
WITH aggregated AS (
    SELECT 
        name_coin,
        `timestamp`,
        TUMBLE_START(`timestamp`, INTERVAL '1' MINUTE) AS rounded_minute,
        AVG(`open`) AS avg_open
    FROM bitcoin_table4
    GROUP BY 
        name_coin,
        `timestamp`,
        TUMBLE(`timestamp`, INTERVAL '1' MINUTE)
)
SELECT 
    name_coin,
    rounded_minute,
    avg_open,
    LAG(avg_open, 1) OVER (
        PARTITION BY name_coin
        ORDER BY `timestamp`
    ) AS prev_avg_open,
    avg_open - LAG(avg_open, 1) OVER (
        PARTITION BY name_coin
        ORDER BY `timestamp`
    ) AS diff_avg_open
FROM aggregated

""")

TableException: org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:175)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1133)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:1575)


In [None]:
t_env.execute_sql("""
    SELECT 
        name_coin,
        `timestamp`,
        TUMBLE_START(`timestamp`, INTERVAL '1' MINUTE) AS rounded_minute,
        AVG(`open`) AS avg_open
    FROM bitcoin_table4
    GROUP BY 
        name_coin,
        `timestamp`,
        TUMBLE(`timestamp`, INTERVAL '1' MINUTE)
                  """).print()

In [3]:
t_env.execute_sql("""
   SELECT 
        name_coin, 
        TUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS rounded_minute,
        AVG(`open`) AS avg_open
    FROM bitcoin_table4
    GROUP BY name_coin, TUMBLE(proc_time, INTERVAL '1' MINUTE)
""").print()


+----+--------------------------------+-------------------------+--------------------------------+
| op |                      name_coin |          rounded_minute |                       avg_open |
+----+--------------------------------+-------------------------+--------------------------------+
| +I |                        BTC/USD | 2025-02-04 10:39:00.000 |                         <NULL> |


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/gerardomartinez/DataEng/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/gerardomartinez/DataEng/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1217, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/gerardomartinez/.pyenv/versions/3.10.4/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

| +I |                        BTC/USD | 2025-02-04 10:40:00.000 |                         <NULL> |
| +I |                        BTC/USD | 2025-02-04 10:41:00.000 |                         <NULL> |
| +I |                        BTC/USD | 2025-02-04 10:42:00.000 |                         <NULL> |
| +I |                        BTC/USD | 2025-02-04 10:43:00.000 |                         <NULL> |
| +I |                        BTC/USD | 2025-02-04 10:44:00.000 |                         <NULL> |
| +I |                        BTC/USD | 2025-02-04 10:45:00.000 |                         <NULL> |
| +I |                        BTC/USD | 2025-02-04 10:46:00.000 |                         <NULL> |
| +I |                        BTC/USD | 2025-02-04 10:47:00.000 |                         <NULL> |
| +I |                        BTC/USD | 2025-02-04 10:48:00.000 |                         <NULL> |
| +I |                        BTC/USD | 2025-02-04 10:49:00.000 |                         <NULL> |
| +I |    

In [5]:
revenue.execute().print()

+----+--------------------------------+-------------------------+--------------------------------+
| op |                      name_coin |          rounded_minute |                       avg_open |
+----+--------------------------------+-------------------------+--------------------------------+
| +I |                        BTC/USD | 2025-02-03 10:15:00.000 |                      105216.72 |
| +I |                        BTC/USD | 2025-02-03 10:16:00.000 |                       96869.15 |


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/gerardomartinez/DataEng/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/gerardomartinez/DataEng/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1217, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/gerardomartinez/.pyenv/versions/3.10.4/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 