In [1]:
!pip install apache-flink

Collecting apache-flink
  Using cached apache_flink-1.20.0-cp311-cp311-macosx_11_0_arm64.whl.metadata (4.8 kB)
Collecting py4j==0.10.9.7 (from apache-flink)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting apache-beam<2.49.0,>=2.43.0 (from apache-flink)
  Using cached apache_beam-2.48.0-py3-none-any.whl
Collecting cloudpickle>=2.2.0 (from apache-flink)
  Using cached cloudpickle-3.1.0-py3-none-any.whl.metadata (7.0 kB)
Collecting avro-python3!=1.9.2,>=1.8.1 (from apache-flink)
  Using cached avro_python3-1.10.2-py3-none-any.whl
Collecting pytz>=2018.3 (from apache-flink)
  Using cached pytz-2024.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting fastavro!=1.8.0,>=1.1.0 (from apache-flink)
  Using cached fastavro-1.9.7-cp311-cp311-macosx_10_9_universal2.whl.metadata (5.5 kB)
Collecting requests>=2.26.0 (from apache-flink)
  Downloading requests-2.32.3-py3-none-any.whl.metadata (4.6 kB)
Collecting protobuf>=3.19.0 (from apache-flink)
  Using cached protobuf-

In [9]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.java_gateway import get_gateway
gateway = get_gateway()
string_class = gateway.jvm.String
string_array = gateway.new_array(string_class, 0)
stream_env = gateway.jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
j_stream_exection_environment = stream_env.createRemoteEnvironment(
    "localhost", 
    8081, 
    string_array
)

# Step 1: Initialize the streaming environment
env = StreamExecutionEnvironment(j_stream_exection_environment).get_execution_environment()

# Step 2: Define the file source
event_file_path = "user_interactions.csv"

source = FileSource.for_record_stream_format(
    StreamFormat.text_line_format(),  # Reads the file line by line
    event_file_path
).build()

# Step 3: Define a basic watermark strategy
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

# Step 4: Add the source to the environment
event_stream = env.from_source(
    source,
    watermark_strategy=watermark_strategy,  # Use the basic watermark strategy
    source_name="event_source"
)

# Step 5: Process the events
def process_event(event):
    try:
        # Parse the CSV line
        event_time, event_type, event_value = event.split(",")
        event_value = float(event_value)
        if event_type == "click":
            return f"Processed Click Event: {event_time}, {event_value * 1.1}"
        else:
            return f"Ignored Event: {event_type}"
    except Exception as e:
        return f"Error processing: {event} | {str(e)}"

processed_stream = event_stream.map(process_event, output_type=Types.STRING())

# Step 6: Print the processed events
processed_stream.print()

# Step 7: Execute the pipeline
env.execute("File Source Event Processing")


2> Processed Click Event: 2024-12-01 12:00:00, 1.6500000000000001
2> Ignored Event: scroll
2> Processed Click Event: 2024-12-01 12:00:10, 3.3000000000000003
2> Ignored Event: scroll


<pyflink.common.job_execution_result.JobExecutionResult at 0x164549c50>