From 5b6f68ffe6311791fe14abd913a22fe1c469fdf1 Mon Sep 17 00:00:00 2001 From: Eno Bassey Date: Sat, 19 Dec 2020 16:15:10 +0100 Subject: [PATCH] fix app ready error refactor --- stock_maintain/admin.py | 121 ++++--------------------------------- stock_maintain/tasks.py | 111 ++++++++++++++++++++++++++++++++++ stockman_project/celery.py | 18 +++++- 3 files changed, 140 insertions(+), 110 deletions(-) diff --git a/stock_maintain/admin.py b/stock_maintain/admin.py index cbbc167..ecc5b34 100755 --- a/stock_maintain/admin.py +++ b/stock_maintain/admin.py @@ -7,7 +7,7 @@ from django.contrib import admin from django.db import transaction from django.forms import forms -from stockman_project.celery import refresh_analysis_data +from stockman_project.celery import refresh_analysis_data, process_csv_upload_task from django.shortcuts import render, redirect from django.utils.encoding import force_text @@ -214,120 +214,23 @@ def import_csv(self, request): :return: :rtype: """ - error_found = "" if request.method == "POST": csv_file = request.FILES["csv_file"] print("importing file ....") logger.info("importing file ....") date_import = request.POST["date_import"] - decoded_csv_file = io.StringIO(csv_file.read().decode("utf-8")) - reader = csv.reader(decoded_csv_file) - new_date = datetime.strptime(date_import, "%Y-%m-%d") - print(f"PRINT: finished reading file....for date {new_date}") - logger.info(f"LOG: importing file....for date {new_date}") - # Create pricelist objects from passed in data - try: - print("entering transaction mode ...") - logger.info("entering transaction mode ....") - # with transaction.atomic(): - print("PRINT: starting each data") - logger.info("LOG: starting each data") - - for line in reader: - # pdb.set_trace() - - stock_code = line[0].strip() - - print(f"PRINT: getting current stock - {stock_code}") - logger.info(f"LOG: getting current stock - {stock_code}") - uploaded_stock = PriceList.objects.filter( - sec_code=stock_code, price_date=new_date - ) - if uploaded_stock: - stock = Stock.objects.get(stock_code=stock_code) - - # pdb.set_trace() - if stock: - x_change = 0.0 - sign = "" - if line[1].strip() == "" or line[6].strip() == "": - # pdb.set_trace() - error_found = ( - f"The stock {stock_code} has no data in the pricelist. " - f"Kindly review your csv and upload correct data" - ) - raise ValueError() - if float(line[1].strip()) <= float(line[6].strip()): - sign = "+" - x_change = float(line[6]) - float(line[1]) - else: - sign = "-" - x_change = float(line[1]) - float(line[6]) - # pdb.set_trace() - price_list_object = PriceList.objects.create( - sec_code=line[0], - price_date=new_date, # line[12], - price_close=float(line[6].strip()), - x_open=float(line[2].strip()), - x_high=float(line[3].strip()), - x_low=float(line[4].strip()), - price=float(line[1].strip()), - offer_bid_sign=sign, - x_change=x_change, - num_of_deals=float(line[9].strip().replace(",", "")), - volume=float(line[10].strip().replace(",", "")), - x_value=float(line[11].strip().replace(",", "")), - stock_id=stock.id, - ) - # pdb.set_trace() - print( - f"PRINT: attempting to save current stock pric- {stock_code}" - ) - logger.info( - f"LOG: attempting current stock price- {stock_code}" - ) - price_list_object.save() - else: - error_found = f"The stock {stock_code} could not be found" - # pdb.set_trace() - raise ValueError() - else: - continue - print("finished importing") - logger.info(" finishedimporting file ....") - self.message_user(request, "Your csv file has been imported") - # trigger a cron job re-calculate temp table - print("about to send delay task") - logger.info("about to send delay task ....") - refresh_analysis_data.delay(date_import) - return redirect("..") - - except ValueError: - if error_found == "": - error_found = f"The stock {line[0].strip()} had error in its values. Pls check" - logger.info(error_found) - self.message_user(request, error_found, level=messages.ERROR) - except Stock.DoesNotExist: - logger.error( - "Stock Not Exist: A stock value does not exist in the database ...." - ) - self.message_user( - request, - " Stock Not Exist: A stock value does not exist in the database. " - "Pls enter details for this stock or update previous name", - level=messages.ERROR, - ) - except IntegrityError: - logger.error( - "Data Integritiy Error: A stock value does not exist in the database. ...." - ) - self.message_user( - request, - " Data Integritiy Error: A stock value does not exist in the database. " - "Pls enter details for this stock or update previous name", - level=messages.ERROR, - ) + import boto3 + + s3 = boto3.resource("s3") + s3.meta.client.upload_file(csv_file, "csv_uploads", f"{date_import}.csv") + + # process_csv_upload_task.delay(date_import) + self.message_user( + request, + "Your csv file is imported and currently being processed. You will be notified when its done", + ) + print("successful") logger.info("successful importing file ....") form = CsvImportForm() diff --git a/stock_maintain/tasks.py b/stock_maintain/tasks.py index fd29af1..62a4c7a 100755 --- a/stock_maintain/tasks.py +++ b/stock_maintain/tasks.py @@ -1,13 +1,23 @@ # Create your tasks here from __future__ import absolute_import, unicode_literals +import csv +import io +import logging +from datetime import datetime + from celery.schedules import crontab from celery.task import periodic_task from django.core.mail import send_mail from celery import shared_task from django.db import connection +from django.shortcuts import redirect +from psycopg2._psycopg import IntegrityError from stock_maintain.models import PriceList, GeneratedAnalysisDate +from stock_setup_info.models import Stock + +logger = logging.getLogger(__name__) @shared_task @@ -35,3 +45,104 @@ def update_price_analysis_task(): finally: cursor.close() + + +def process_csv_upload(date_import): + download_location = f"/tmp/{date_import}.csv" + error_found = "" + + import boto3 + + s3 = boto3.resource("s3") + s3.meta.client.download_file("csv_uploads", f"{date_import}.csv", download_location) + # print(open('/tmp/hello.txt').read()) + decoded_csv_file = io.StringIO(download_location.read().decode("utf-8")) + reader = csv.reader(decoded_csv_file) + new_date = datetime.strptime(date_import, "%Y-%m-%d") + print(f"PRINT: finished reading file....for date {new_date}") + logger.info(f"LOG: importing file....for date {new_date}") + # Create pricelist objects from passed in data + try: + print("entering transaction mode ...") + logger.info("entering transaction mode ....") + # with transaction.atomic(): + print("PRINT: starting each data") + logger.info("LOG: starting each data") + + for line in reader: + # pdb.set_trace() + + stock_code = line[0].strip() + + print(f"PRINT: getting current stock - {stock_code}") + logger.info(f"LOG: getting current stock - {stock_code}") + uploaded_stock = PriceList.objects.filter( + sec_code=stock_code, price_date=new_date + ) + if uploaded_stock: + stock = Stock.objects.get(stock_code=stock_code) + + # pdb.set_trace() + if stock: + x_change = 0.0 + sign = "" + if line[1].strip() == "" or line[6].strip() == "": + # pdb.set_trace() + error_found = ( + f"The stock {stock_code} has no data in the pricelist. " + f"Kindly review your csv and upload correct data" + ) + raise ValueError() + if float(line[1].strip()) <= float(line[6].strip()): + sign = "+" + x_change = float(line[6]) - float(line[1]) + else: + sign = "-" + x_change = float(line[1]) - float(line[6]) + # pdb.set_trace() + price_list_object = PriceList.objects.create( + sec_code=line[0], + price_date=new_date, # line[12], + price_close=float(line[6].strip()), + x_open=float(line[2].strip()), + x_high=float(line[3].strip()), + x_low=float(line[4].strip()), + price=float(line[1].strip()), + offer_bid_sign=sign, + x_change=x_change, + num_of_deals=float(line[9].strip().replace(",", "")), + volume=float(line[10].strip().replace(",", "")), + x_value=float(line[11].strip().replace(",", "")), + stock_id=stock.id, + ) + # pdb.set_trace() + print(f"PRINT: attempting to save current stock pric- {stock_code}") + logger.info(f"LOG: attempting current stock price- {stock_code}") + price_list_object.save() + else: + error_found = f"The stock {stock_code} could not be found" + # pdb.set_trace() + raise ValueError() + else: + continue + print("finished importing") + logger.info(" finishedimporting file ....") + # trigger a cron job re-calculate temp table + print("about to send delay task") + logger.info("about to send delay task ....") + + except ValueError: + if error_found == "": + error_found = ( + f"The stock {line[0].strip()} had error in its values. Pls check" + ) + logger.info(error_found) + except Stock.DoesNotExist: + logger.error( + "Stock Not Exist: A stock value does not exist in the database ...." + ) + + except IntegrityError: + logger.error( + "Data Integritiy Error: A stock value does not exist in the database. ...." + ) diff --git a/stockman_project/celery.py b/stockman_project/celery.py index dc8efad..ee9fd05 100755 --- a/stockman_project/celery.py +++ b/stockman_project/celery.py @@ -1,8 +1,13 @@ from __future__ import absolute_import, unicode_literals + +import logging + from django.db import connection import os from datetime import datetime, date from celery import Celery + + from utils.utils import get_first_working_day_of_month # set the default Django settings module for the 'celery' program. @@ -12,7 +17,7 @@ "stockman_project", broker=f"{REDIS_URL}", ) - +logger = logging.getLogger(__name__) # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys @@ -36,3 +41,14 @@ def refresh_analysis_data(date_data): return cursor.execute( f"call update_price_analysis('{date_data}','{year_to_date}')" ) + + +@app.task +def process_csv_upload_task(date_import): + print("entering transaction mode ....") + logger.info("entering transaction mode ....") + from stock_maintain.tasks import process_csv_upload + + process_csv_upload(date_import) + refresh_analysis_data.delay(date_import) + return True