In [22]:
from faker import Faker
from pyflink.table import EnvironmentSettings, TableEnvironment

In [23]:
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

In [24]:
fake = Faker()

In [25]:
data = [(fake.name(), fake.city(), fake.city()) for _ in range(10)]

In [26]:
columns_name = ['name', 'city', 'state']

In [27]:
table = table_env.from_elements(data, schema = col)

In [28]:
table.execute()

<pyflink.table.table_result.TableResult at 0x7e747842ff40>

In [29]:
table_env.create_temporary_view('source_table', table)

In [30]:
table_env.execute_sql(f"SELECT * FROM source_table").print()

+--------------------------------+--------------------------------+--------------------------------+
|                           name |                           city |                          state |
+--------------------------------+--------------------------------+--------------------------------+
|                    Brian Kelly |                    Marissaberg |                    Edwardsfort |
|                    Donna Jones |                    Connerville |                     North John |
|                 Cynthia Obrien |                    Barbermouth |                   Clarkchester |
|                   Ashley Jones |                   East Richard |                      Fritzview |
|                   Richard Dunn |                   Colleenhaven |               South Carriestad |
|              Joseph Montgomery |                   South Alison |                       Smithton |
|                 Bonnie Camacho |                    Alvarezfort |                    Rebe

In [37]:
from pyflink.table.expressions import col
table.select(col("name"), col("city")).execute().print()

+--------------------------------+--------------------------------+
|                           name |                           city |
+--------------------------------+--------------------------------+
|                    Brian Kelly |                    Marissaberg |
|                    Donna Jones |                    Connerville |
|                 Cynthia Obrien |                    Barbermouth |
|                   Ashley Jones |                   East Richard |
|                   Richard Dunn |                   Colleenhaven |
|              Joseph Montgomery |                   South Alison |
|                 Bonnie Camacho |                    Alvarezfort |
|                  Carolyn Johns |                  West Billyton |
|                    Jeremy Reed |                    Williamtown |
|                 Jonathan Green |                    Francisport |
+--------------------------------+--------------------------------+
10 rows in set


In [40]:
table \
    .select(col("name"), col("city"), col("state")) \
    .where(col("state") == 'Fritzview') \
    .execute().print()

+--------------------------------+--------------------------------+--------------------------------+
|                           name |                           city |                          state |
+--------------------------------+--------------------------------+--------------------------------+
|                   Ashley Jones |                   East Richard |                      Fritzview |
+--------------------------------+--------------------------------+--------------------------------+
1 row in set


In [39]:
table \
    .group_by(col("state")) \
    .select(col("state").alias("state"), col("name").count.alias("count")) \
    .execute().print()

+--------------------------------+----------------------+
|                          state |                count |
+--------------------------------+----------------------+
|                     North John |                    1 |
|                    Rebeccafort |                    1 |
|                   Clarkchester |                    1 |
|                       Smithton |                    1 |
|                   Fergusonport |                    1 |
|                      Ninamouth |                    1 |
|               South Carriestad |                    1 |
|                    Edwardsfort |                    1 |
|                      Fritzview |                    1 |
|                     Lake Sarah |                    1 |
+--------------------------------+----------------------+
10 rows in set


In [41]:
table_result = table_env.execute_sql(f"SELECT * FROM source_table ")

In [42]:
with table_result.collect() as results:
   for result in results:
       print(result)

<Row('Brian Kelly', 'Marissaberg', 'Edwardsfort')>
<Row('Donna Jones', 'Connerville', 'North John')>
<Row('Cynthia Obrien', 'Barbermouth', 'Clarkchester')>
<Row('Ashley Jones', 'East Richard', 'Fritzview')>
<Row('Richard Dunn', 'Colleenhaven', 'South Carriestad')>
<Row('Joseph Montgomery', 'South Alison', 'Smithton')>
<Row('Bonnie Camacho', 'Alvarezfort', 'Rebeccafort')>
<Row('Carolyn Johns', 'West Billyton', 'Fergusonport')>
<Row('Jeremy Reed', 'Williamtown', 'Lake Sarah')>
<Row('Jonathan Green', 'Francisport', 'Ninamouth')>


In [43]:
pandas_df = table.to_pandas()
pandas_df


Unnamed: 0,name,city,state
0,Brian Kelly,Marissaberg,Edwardsfort
1,Donna Jones,Connerville,North John
2,Cynthia Obrien,Barbermouth,Clarkchester
3,Ashley Jones,East Richard,Fritzview
4,Richard Dunn,Colleenhaven,South Carriestad
5,Joseph Montgomery,South Alison,Smithton
6,Bonnie Camacho,Alvarezfort,Rebeccafort
7,Carolyn Johns,West Billyton,Fergusonport
8,Jeremy Reed,Williamtown,Lake Sarah
9,Jonathan Green,Francisport,Ninamouth


In [45]:
table_temp = table_env.from_pandas(pandas_df)
table_temp.execute().print()

+--------------------------------+--------------------------------+--------------------------------+
|                           name |                           city |                          state |
+--------------------------------+--------------------------------+--------------------------------+
|                    Brian Kelly |                    Marissaberg |                    Edwardsfort |
|                    Donna Jones |                    Connerville |                     North John |
|                 Cynthia Obrien |                    Barbermouth |                   Clarkchester |
|                   Ashley Jones |                   East Richard |                      Fritzview |
|                   Richard Dunn |                   Colleenhaven |               South Carriestad |
|              Joseph Montgomery |                   South Alison |                       Smithton |
|                 Bonnie Camacho |                    Alvarezfort |                    Rebe

In [46]:
table.execute().print()

+--------------------------------+--------------------------------+--------------------------------+
|                           name |                           city |                          state |
+--------------------------------+--------------------------------+--------------------------------+
|                    Brian Kelly |                    Marissaberg |                    Edwardsfort |
|                    Donna Jones |                    Connerville |                     North John |
|                 Cynthia Obrien |                    Barbermouth |                   Clarkchester |
|                   Ashley Jones |                   East Richard |                      Fritzview |
|                   Richard Dunn |                   Colleenhaven |               South Carriestad |
|              Joseph Montgomery |                   South Alison |                       Smithton |
|                 Bonnie Camacho |                    Alvarezfort |                    Rebe

In [51]:
# import uuid
# import functools  # Import functools

# from pyflink.table.udf import udf
# from pyflink.table.expressions import col, call
# from pyflink.table import TableEnvironment, EnvironmentSettings



# def generate_guid():
#     return str(uuid.uuid4())


# myhash = udf(functools.partial(a), result_type='STRING')

# result_table = table.select(col("city"), col("name"), call(myhash).alias("guid"))

# result_table.execute().print()
