Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close spark context started via Celery task in a django app #5865

Closed
priyank-bangar opened this issue Dec 8, 2019 · 5 comments
Closed

Close spark context started via Celery task in a django app #5865

priyank-bangar opened this issue Dec 8, 2019 · 5 comments

Comments

@priyank-bangar
Copy link

I am using Pyspark along with Celery in a Django app. So the flow of my code is as follows:

  1. Put a POST request to upload a file (large file).
  2. Django handles the request and loads the file to Hdfs. This large file in Hdfs is read by Pyspark to load it into the Cassandra.
  3. This upload is handled by Celery (from reading the file to Cassandra upload). Celery starts the process in the background and starts a spark context to start the upload.
  4. The data gets loaded to Cassandra, but the spark context which was created via the celery does not stop even after using spark.stop() when the load is complete.

project -> celery.py

import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
app = Celery('project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

tasks.py

import celery
from project.celery import app
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession

class uploadfile():
    def __init__(self):
        self.cluster = Cluster(getattr(settings, "CASSANDRA_IP", ""))
        self.session = self.cluster.connect()
    def start_spark(self):
        self.spark = SparkSession.builder.master(getattr(settings,'SPARK_MASTER', settings.SPARK_MASTER))\
                                .appName('Load CSV to Cassandra')\
                                .config('spark.jars', self.jar_files_path)\
                                .config('spark.cassandra.connection.host', getattr(settings,'SPARK_CASSANDRA_CONNECTION_HOST','0.0.0.0'))\
                                .getOrCreate()
    def spark_stop(self):
        self.spark.stop()
    def file_upload(self):
        self.start_spark()
        df = self.spark.read.csv(file_from_hdfs)
        # do some operation on the dataframe
        # self.session.create_cassandra_table_if_does_not_exist
        df.write.format('org.apache.spark.sql.cassandra').\
                    .option('table',table_name)\
                    .option('keyspace',keyspace)\
                    .mode('append').save()
        self.spark_stop()  #<<<-------------------- This does not close the spark context

@task(name="api.tasks.uploadfile")
def csv_upload():
    # handle request.FILE and upload the file to hdfs
    spark_obj = uploadfile()
    spark_obj.file_upload()

calling_task_script.py

from task import csv_upload
from rest_framework.views import APIView

class post_it(APIView):
    def post(request):
        csv_upload.delay()
        return Response('success')
@auvipy
Copy link
Member

auvipy commented May 16, 2020

what is the update on this?

@priyank-bangar
Copy link
Author

Still unable to do it.

@auvipy
Copy link
Member

auvipy commented May 19, 2020

Need time and setup to personally debug this actually. can you check pyspark context and celery task you are binding properly?

@priyank-bangar
Copy link
Author

Yes, the Pyspark context and celery task are binding properly, that is why I am able to submit the task to Pyspark and Pyspark is able to load the data. The only problem is once everything is over, the spark context does not stop.

@auvipy
Copy link
Member

auvipy commented May 20, 2020

that's a spark issue, not celery.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants