## Writing a PyFlink Job
A simple PyFlink job that processes a stream of text data:

In [1]:
root_path = "/Users/hamzaharunamohammed/Desktop/etiya/delta_lake/data"

In [2]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from faker import Faker
import random

# Initialize Spark session
builder = SparkSession.builder \
    .appName("DeltaLakeExample") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Generate synthetic telecom data
fake = Faker()
data = []
for _ in range(1000000):  # 1 million records
    data.append((
        fake.phone_number(),
        fake.phone_number(),
        fake.date_time_this_year().strftime("%Y-%m-%d %H:%M:%S"),
        random.randint(1, 3600),  # Call duration between 1 second and 1 hour
        random.choice(["Prepaid", "Postpaid"]),
        random.choice(["Voice", "Data", "SMS"]),
        random.uniform(0.01, 5.0)  # Call cost between 0.01 and 5.0
    ))

columns = ["caller", "receiver", "timestamp", "duration", "plan_type", "service_type", "cost"]
df = spark.createDataFrame(data, columns)

# Save as Delta table
data_table = root_path+"/deltalake/bss.delta"
df.write.format("delta").save(data_table)
spark.stop()



24/06/25 11:15:27 WARN Utils: Your hostname, Hamzas-MacBook-Pro-8665.local resolves to a loopback address: 127.0.0.1; using 10.252.128.42 instead (on interface en0)
24/06/25 11:15:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/hamzaharunamohammed/.ivy2/cache
The jars for the packages stored in: /Users/hamzaharunamohammed/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6cffd1de-9cd7-4338-b0d1-ea999f465b2a;1.0
	confs: [default]


:: loading settings :: url = jar:file:/usr/local/Homebrew/Caskroom/miniforge/base/envs/delta_lake/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 131ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from central in [default]
	io.delta#delta-storage;3.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-6cffd1de-9cd7-4338-b0d1-ea999f465b2a
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/4ms)
24/06/25 11:15:27

##  Using DuckDB for Querying Locally
Read Delta Table with DuckDB

In [3]:
import duckdb


data_table = root_path+"/deltalake/bss.delta"


# Reading Delta table into DuckDB
con = duckdb.connect()
con.execute(f"CREATE TABLE cdrs AS SELECT * FROM '{data_table}/*.parquet'")

Unnamed: 0,caller,total_duration
0,+1-449-678-4458x498,2554.0
1,643-644-4767,386.0
2,+1-324-972-6318x3487,3143.0
3,+1-585-430-1732,2842.0
4,470-744-4296x8082,1965.0
...,...,...
999990,(275)392-6528x52999,1663.0
999991,001-314-674-3293x126,3289.0
999992,561.538.4155x38132,3412.0
999993,510.788.1268x89772,471.0


In [29]:
# Perform some queries
result = con.execute("SELECT caller, SUM(duration) as total_duration FROM cdrs GROUP BY caller").fetchdf()
result

Unnamed: 0,caller,total_duration
0,001-730-464-4505x87146,3179.0
1,+1-768-269-3281,2021.0
2,274-567-7841x31567,2293.0
3,8789465052,2072.0
4,(609)551-9318x49708,2314.0
...,...,...
999990,(758)889-4445,3100.0
999991,(955)589-6186x54898,461.0
999992,001-477-849-5673,3501.0
999993,252-948-3813,2050.0


In [4]:
query1 = con.execute("SELECT plan_type, COUNT(*) as total_calls FROM cdrs GROUP BY plan_type").fetchdf()
query1

Unnamed: 0,plan_type,total_calls
0,Postpaid,500111
1,Prepaid,499889


In [5]:
query2 = con.execute("SELECT caller, SUM(duration) as total_duration FROM cdrs GROUP BY caller ORDER BY total_duration DESC LIMIT 10").fetchdf()
query2

Unnamed: 0,caller,total_duration
0,729-904-4747,6835.0
1,391.226.6764,6012.0
2,576.248.7963,5777.0
3,(698)897-8131,5477.0
4,4497154046,3600.0
5,577-275-1304x23013,3600.0
6,595.933.0716x0680,3600.0
7,717-870-9010,3600.0
8,5673533947,3600.0
9,937-731-3900x64305,3600.0


In [6]:
query3 = con.execute("SELECT service_type, AVG(cost) as avg_cost FROM cdrs GROUP BY service_type").fetchdf()
query3

Unnamed: 0,service_type,avg_cost
0,SMS,2.507938
1,Voice,2.502217
2,Data,2.506711


In [26]:
query4 = con.execute("SELECT * FROM cdrs ORDER BY cost DESC LIMIT 10").fetchdf()
query4


Unnamed: 0,caller,receiver,timestamp,duration,plan_type,service_type,cost
0,646-494-3243,486.335.2218x566,2024-05-30 03:45:33,2722,Postpaid,Data,4.999998
1,265-966-7625x2896,(443)309-0432,2024-04-02 17:09:32,735,Prepaid,Data,4.999993
2,001-623-720-3971x154,(366)801-7391x14923,2024-06-05 21:02:05,2703,Postpaid,Voice,4.999989
3,341.898.0266x32443,001-801-483-2218x2646,2024-01-02 18:54:40,1651,Prepaid,Voice,4.999989
4,337.763.3945,(838)891-6965x1919,2024-02-08 18:03:04,3261,Prepaid,SMS,4.999989
5,001-663-417-7483x066,001-669-373-7383x8344,2024-06-16 02:07:13,3214,Postpaid,Voice,4.99998
6,+1-513-511-6874x21427,(304)252-6060x4065,2024-01-18 22:06:12,2677,Prepaid,Voice,4.999975
7,818-474-7833,742.488.9753,2024-01-04 08:00:14,1617,Postpaid,SMS,4.999974
8,+1-591-243-5772x8435,+1-938-578-7824x21548,2024-05-18 19:55:15,3371,Postpaid,Data,4.999971
9,+1-240-373-5104,690-355-9258x84651,2024-02-07 13:04:15,1312,Postpaid,Data,4.999971


In [27]:
query5 = con.execute("SELECT plan_type, AVG(duration) as avg_duration FROM cdrs GROUP BY plan_type").fetchdf()
query5

Unnamed: 0,plan_type,avg_duration
0,Postpaid,1798.892382
1,Prepaid,1800.512034


In [28]:
query6 = con.execute("SELECT service_type, SUM(cost) as total_revenue FROM cdrs GROUP BY service_type").fetchdf()
query6

Unnamed: 0,service_type,total_revenue
0,Data,834795.051693
1,Voice,833948.764896
2,SMS,836878.697915


##  Integrating with Apache Flink Locally: Real-Time Processing


In [7]:
from pyflink.table import TableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

In [19]:
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

# Set CPPFLAGS environment variable for JDK include directory
os.environ['CPPFLAGS'] = "-I/opt/homebrew/opt/openjdk@17/include"

# Define the root path and data table path
root_path = "/Users/hamzaharunamohammed/Desktop/etiya/delta_lake"
data_table = root_path + "/data/deltalake/bss.delta"

# Add the necessary JARs to the classpath
jars = [
    f"{root_path}/flink-connector-filesystem_2.12-1.11.5.jar",
    f"{root_path}/flink-parquet_2.12-1.13.2.jar"
]
os.environ['FLINK_CLASSPATH'] = ':'.join(jars)

# Initialize Flink environment
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)

# Print classpath to debug
print(f"Flink classpath: {os.environ['FLINK_CLASSPATH']}")

# Define Delta table source using SQL DDL
source_ddl = f"""
CREATE TABLE delta_source (
    caller STRING,
    receiver STRING,
    call_timestamp TIMESTAMP(3),
    duration INT,
    plan_type STRING,
    service_type STRING,
    cost DOUBLE
) WITH (
    'connector' = 'filesystem',
    'path' = '{data_table}',
    'format' = 'parquet'
)
"""
try:
    t_env.execute_sql(source_ddl)
    print("Source table created successfully.")
except Exception as e:
    print(f"Failed to execute SQL DDL statement: {e}")


Flink classpath: /Users/hamzaharunamohammed/Desktop/etiya/delta_lake/flink-connector-filesystem_2.12-1.11.5.jar:/Users/hamzaharunamohammed/Desktop/etiya/delta_lake/flink-parquet_2.12-1.13.2.jar
Source table created successfully.


In [22]:
# Define another sink for real-time analytics
real_time_sink_ddl = """
CREATE TABLE real_time_sink (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    plan_type STRING,
    call_count BIGINT,
    total_duration BIGINT,
    total_cost DOUBLE
) WITH (
    'connector' = 'print'
)
"""


try:
    t_env.execute_sql(real_time_sink_ddl)
    print("Real-time sink table created successfully.")
except Exception as e:
    print(f"Failed to execute real-time sink DDL statement: {e}")


Real-time sink table created successfully.
Failed to insert real-time query result into sink: name 'real_time_query' is not defined
