In [1]:
import requests
import time
import pandas as pd
from typing import Optional


class Flink:
    def __init__(self, host: str = "localhost", port: int = 8083):
        self.url = f"http://{host}:{port}"
        self.session_id: Optional[str] = None
        self._create_session()
    
    def _create_session(self):
        """Create a new session"""
        r = requests.post(f"{self.url}/v1/sessions", json={})
        r.raise_for_status()
        self.session_id = r.json()['sessionHandle']
    
    def sql(self, query: str, timeout: int = 60) -> pd.DataFrame:
        """Execute SQL and return pandas DataFrame"""
        # Submit query
        r = requests.post(
            f"{self.url}/v1/sessions/{self.session_id}/statements",
            json={"statement": query}
        )
        r.raise_for_status()
        op_handle = r.json()['operationHandle']
        
        # Wait for completion
        start = time.time()
        while time.time() - start < timeout:
            r = requests.get(
                f"{self.url}/v1/sessions/{self.session_id}/operations/{op_handle}/status"
            )
            r.raise_for_status()
            status = r.json()['status']
            
            if status == 'FINISHED':
                break
            elif status == 'ERROR':
                error = r.json().get('errorMessage', 'Unknown error')
                raise Exception(f"Query failed: {error}")
            
            time.sleep(0.5)
        else:
            raise TimeoutError(f"Query timeout after {timeout}s")
        
        # Fetch results
        r = requests.get(
            f"{self.url}/v1/sessions/{self.session_id}/operations/{op_handle}/result/0"
        )
        r.raise_for_status()
        result = r.json()
        
        # Convert to DataFrame
        if 'results' not in result or 'data' not in result['results']:
            print("✓ Statement executed")
            return pd.DataFrame()
        
        columns = [col['name'] for col in result['results']['columns']]
        rows = []
        for row_data in result['results']['data']:
            rows.append([field for field in row_data['fields']])
        
        return pd.DataFrame(rows, columns=columns)
    
    def close(self):
        """Close session"""
        if self.session_id:
            requests.delete(f"{self.url}/v1/sessions/{self.session_id}")
            self.session_id = None


# Global instance for convenience
_flink: Optional[Flink] = None


def sql(query: str) -> pd.DataFrame:
    """Execute SQL query (creates session if needed)"""
    global _flink
    if _flink is None:
        _flink = Flink()
    return _flink.sql(query)


def connect(host: str = "localhost", port: int = 8083):
    """Connect to Flink SQL Gateway"""
    global _flink
    if _flink:
        _flink.close()
    _flink = Flink(host, port)
    print(f"✓ Connected to {host}:{port}")


def close():
    """Close connection"""
    global _flink
    if _flink:
        _flink.close()
        _flink = None

In [2]:
connect("localhost", 8083)

✓ Connected to localhost:8083


In [7]:
sql("SHOW CATALOGS")

Unnamed: 0,catalog name
0,default_catalog
1,paimon_catalog


In [5]:
sql("""CREATE CATALOG IF NOT EXISTS paimon_catalog WITH (
  'type' = 'paimon',
  'warehouse' = 's3://paimon-data/paimon-warehouse',
  's3.endpoint' = 'http://seaweedfs-s3:8333',
  's3.access-key' = 'paimonAdmin123',
  's3.secret-key' = 'paimonSecretKey456789abcdef',
  's3.path-style-access' = 'true'
);""")

Unnamed: 0,result
0,OK


In [None]:
sql("USE CATALOG paimon_catalog")
sql("SHOW TABLES")

Unnamed: 0,table name


In [9]:
sql("""CREATE DATABASE IF NOT EXISTS testdb;""")
sql("""USE testdb;""")

Unnamed: 0,result
0,OK


In [10]:
sql("""CREATE TEMPORARY TABLE IF NOT EXISTS mysql_users (
  id INT,
  name STRING,
  email STRING,
  age INT,
  created_at TIMESTAMP(3),
  updated_at TIMESTAMP(3),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql',
  'port' = '3306',
  'username' = 'flink',
  'password' = 'flink123',
  'database-name' = 'testdb',
  'table-name' = 'users',
  'scan.startup.mode' = 'initial'
);""")
sql("SELECT * FROM mysql_users;")

Unnamed: 0,id,name,email,age,created_at,updated_at


In [None]:
sql("""CREATE TABLE IF NOT EXISTS users (
  id INT,
  name STRING,
  email STRING,
  age INT,
  created_at TIMESTAMP(4),
  updated_at TIMESTAMP(4),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'metadata.iceberg.storage' = 'table-location',
  'data-file.path-directory' = 'data'
);""")
sql("""INSERT INTO users SELECT * FROM mysql_users;""")

Unnamed: 0,id,name,email,age,created_at,updated_at


In [13]:
sql("""CREATE TABLE IF NOT EXISTS users (
  id INT,
  name STRING,
  email STRING,
  age INT,
  created_at TIMESTAMP(4),
  updated_at TIMESTAMP(4),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'metadata.iceberg.storage' = 'table-location',
  'data-file.path-directory' = 'data'
);""")

Unnamed: 0,result
0,OK


In [12]:
sql("SELECT * FROM users;")

Unnamed: 0,id,name,email,age,created_at,updated_at


In [14]:
close()