-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run python process failed when running tumbling-window.py on KDA #4
Comments
Hi @flo-mair, thank you for the CloudWatch logs. it appears that the python program itself has a user exception in it and the application cannot start due to this. These code samples were developed and meant to be run on a local machine, and I've created supplemental code to support running in a distributed KDA environment. Working to get that merged in, but see if the code in this repository works better for you. |
Got the same issue when running the KDA official guide, re-uploaded with modified code (S3Sink example) and had errors as below:
It seems connection issue with Kinesis Data Stream. |
Hi, can you paste the entire stack trace that is in CloudWatch? Also can you double check your IAM Permissions? |
Thanks for the help~ Please refer to the CloudWatch logs and IAM policy below. Thanks.
|
Still same error with this code |
Are you supplying the Kinesis Connector when bundling / deploying your application and referencing it in the I see the error is showcasing the Table Options, but it's cutting off the output which shows the options for table sources which is unfortunate. The Kinesis Source needs to be passed in via a jar file which can be downloaded and bundled with your application here: |
Yes, I've zip the jar file and specify the jarfile in kinesis.analytics.flink.run.options. |
Tried again and still got the same error. Go through the error logs, something related to jackson package? |
We've identified the root cause as the maven central jar being provided does not include the shaded dependencies required to run the Flink Kinesis Connector. Short Term: You can build an UBER jar manually to use within your pyflink application by building the project here and bundling the required dependencies. Longer term: we will be publishing the same jar somewhere for ease of use and downloadability. I'll leave the issue open until the short term solution is verified or the longer term solution is implemented. |
Thanks for the info @jeremyber-aws will try this. |
Hi, I was facing the same problem and it seems to be resolved by adding the dependency and packaging as @jeremyber-aws said.
My pyflink job is started on KDA 🎉 However, my job is FAILED soon and following exception is appeard on the Flink dashboard.
My full codes (includes CDK) are here. |
Glad the fix is working for you, @sambaiz! In speaking with the development engineers, they included the following dependencies:
Please see if you have any issues including these extra dependencies. |
Thanks @jeremyber-aws! I have succeeded to run my job by adding some dependencies.
|
When running the sample code from your repo @jeremyber-aws I still get an error in cloudwatch. "applicationARN": "arn:aws:kinesisanalytics:us-east-1:xxxxxxxxxxxxx:application/sample-app",
"applicationVersionId": 5,
"message": "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:207)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramAbortException\n\tat java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)\n\tat java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)\n\t... 6 more\nCaused by: org.apache.flink.client.program.ProgramAbortException\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:111)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)\n\t... 6 more\n",
"messageType": "ERROR",
"messageSchemaVersion": "1",
"errorCode": "CodeError.InvalidApplicationCode"
}` |
Hi @flo-mair without any more context into the error (are there any more logs that are helpful?) I am wondering if you provided the environment variables required to run the application? Search Cloudwatch for errors containing |
Hi all, thank you for your patience with this issue. The Amazon Kinesis SQL Connector for Flink (v2.0.3) has been indexed into Maven and is recommended for use within your PyFlink applications reading from Kinesis. You can find the jar here. I will mark this issue as closed, as including this jar will resolve any issues within the thread. |
@jeremyber-aws Hi~ I've tried the new Amazon Kinesis SQL Connector for Flink (v2.0.3) and the job run successfully. However, when I changed the format from 'csv' to 'parquet', the job failed. Based on the guide on Flink, I also tried combine the jar, but still with same errors. Is there any extra configuration to make parquet format available? Thanks~ |
@davidshtian how did you combine the jar? You can include it in your application configuration properties at the |
@jeremyber-aws Hi~ As I need specify jarfile to amazon-kinesis-sql-connector-flink-2.0.3.jar, I've tried
Not sure if I make it right, but jobs still failed. Thanks~ |
Please see the following link on how to create UBER jars--the jar does not need to be named anything in particular. |
@jeremyber-aws Really appreciate for your help~ Following the link you provided, I tried many combo that could make PyFlink KDA job running for "S3 + Parquet", and finally I got it with below pom.xml. Thanks~
|
I do the same thing but I got below error: |
When running the tumbling window sample (python) the application does not start in KDA. Followed the instructions in the Docs. I see the following logs in CloudWatch:
The text was updated successfully, but these errors were encountered: