Skip to content

Commit

Permalink
Merge pull request #380 from datosgobar/xlsx-dumps
Browse files Browse the repository at this point in the history
Refactor xlsx dumps
  • Loading branch information
lucaslavandeira committed Oct 12, 2018
2 parents 185678a + 9c941dc commit 4564dff
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 75 deletions.
18 changes: 8 additions & 10 deletions series_tiempo_ar_api/apps/dump/admin.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
#!coding=utf8
from django.contrib import admin

from series_tiempo_ar_api.apps.dump.tasks import enqueue_csv_dump_task, enqueue_xlsx_dump_task
from django_datajsonar.admin import AbstractTaskAdmin

from series_tiempo_ar_api.apps.dump.tasks import enqueue_dump_task
from .models import GenerateDumpTask


class GenerateDumpTaskAdmin(admin.ModelAdmin):
readonly_fields = ('status', 'created', 'finished', 'logs')
class GenerateDumpTaskAdmin(AbstractTaskAdmin):
model = GenerateDumpTask

def save_model(self, request, obj: GenerateDumpTask, form, change):
super(GenerateDumpTaskAdmin, self).save_model(request, obj, form, change)
task = {
GenerateDumpTask.TYPE_CSV: enqueue_csv_dump_task,
GenerateDumpTask.TYPE_XLSX: enqueue_xlsx_dump_task,
}
task = enqueue_dump_task

task[obj.file_type](obj.id)
def save_model(self, request, obj, form, change):
enqueue_dump_task.delay(GenerateDumpTask.objects.create(file_type=obj.file_type))

def get_readonly_fields(self, request, obj=None):
if obj:
Expand Down
18 changes: 11 additions & 7 deletions series_tiempo_ar_api/apps/dump/generator/dump_csv_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,25 @@ def __init__(self, task: GenerateDumpTask, fields_data: dict, rows: Callable):
self.rows = rows

def write(self, filepath, header):
fields = Field.objects.filter(
enhanced_meta__key=meta_keys.AVAILABLE,
identifier__in=self.fields_data.keys(),
)

distribution_ids = fields.values_list('distribution', flat=True)
os.makedirs(os.path.dirname(filepath), exist_ok=True)

with open(filepath, mode='w') as f:
writer = csv.writer(f)
writer.writerow(header)

for distribution in Distribution.objects.filter(id__in=distribution_ids).order_by('identifier'):
for distribution in self.get_distributions_sorted_by_identifier():
self.write_distribution(distribution, writer)

def get_distributions_sorted_by_identifier(self):
fields = Field.objects.filter(
identifier__in=self.fields_data.keys(),
)
distribution_ids = fields.values_list('distribution', flat=True)

return Distribution.objects\
.filter(id__in=distribution_ids)\
.order_by('dataset__catalog__identifier', 'dataset__identifier', 'identifier')

def write_distribution(self, distribution: Distribution, writer: csv.writer):
# noinspection PyBroadException
try:
Expand Down
92 changes: 46 additions & 46 deletions series_tiempo_ar_api/apps/dump/generator/xlsx/generator.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,68 @@
import csv
import io
import os

from django.core.files import File
from xlsxwriter.workbook import Workbook

from series_tiempo_ar_api.apps.dump.generator.generator import remove_old_dumps
from series_tiempo_ar_api.apps.dump.generator.xlsx.workbook import DumpWorkbook
from series_tiempo_ar_api.apps.dump.models import DumpFile, GenerateDumpTask
from series_tiempo_ar_api.utils import read_file_as_csv


def read_file_as_csv(file):
ios = io.StringIO()
ios.write(file.read().decode('utf-8'))
class XLSXWriter:
multiple_sheets = {
DumpFile.FILENAME_VALUES: True,
DumpFile.FILENAME_FULL: True,
DumpFile.FILENAME_METADATA: False,
DumpFile.FILENAME_SOURCES: False,
}

ios.seek(0)
reader = csv.reader(ios)
return reader
def __init__(self, task: GenerateDumpTask, dump_file: DumpFile, workbook_class=DumpWorkbook):
self.workbook_class = workbook_class
self.task = task
self.csv_dump_file = dump_file
self.frequency_column_index = None
self.worksheets = {}

def write(self):
try:
self.csv_to_xlsx()
except IOError as e:
catalog = self.csv_dump_file.node or 'global'
msg = f"Error escribiendo dump XLSX de dump {catalog} {self.csv_dump_file.file_name}: {e.__class__}: {e}"
GenerateDumpTask.info(self.task, msg)

def csv_to_xlsx(dump_file: DumpFile):
xlsx = f'{dump_file.file_name}-{dump_file.id}.xlsx'
workbook = Workbook(xlsx)
worksheet = workbook.add_worksheet()
sheet_count = 0
with dump_file.file as f:
reader = read_file_as_csv(f)
for r, row in enumerate(reader):
if sheet_count > 1000000:
worksheet = workbook.add_worksheet()
sheet_count = 0
def csv_to_xlsx(self):
"""Escribe el dump en XLSX en un archivo temporal, luego lo guarda en el storage,
por último borra el archivo temporal. Se debe hacer así porque """
xlsx = self.xlsx_file_name()
with self.csv_dump_file.file as f:
reader = read_file_as_csv(f)
header_row = next(reader)

for c, col in enumerate(row):
worksheet.write(sheet_count, c, col)
workbook = self.workbook_class(xlsx,
header_row=header_row,
split_by_frequency=self.multiple_sheets[self.csv_dump_file.file_name])

sheet_count += 1
workbook.close()
for row in reader:
workbook.write_row(row)

with open(xlsx, 'rb') as f:
dump_file.task.dumpfile_set.create(file_name=dump_file.file_name,
file_type=DumpFile.TYPE_XLSX,
node=dump_file.node,
file=File(f))
workbook.close()

os.remove(xlsx)
with open(xlsx, 'rb') as f:
self.task.dumpfile_set.create(file_name=self.csv_dump_file.file_name,
file_type=DumpFile.TYPE_XLSX,
node=self.csv_dump_file.node,
file=File(f))

os.remove(xlsx)

def generate(task: GenerateDumpTask, node: str = None):
dumps_qs = DumpFile.objects.all()
if node:
dumps_qs = dumps_qs.filter(node__catalog_id=node)
def xlsx_file_name(self):
return f'{self.csv_dump_file.file_name}-{self.csv_dump_file.id}.{DumpFile.TYPE_XLSX}'

dumps = []
for dump_name, _ in DumpFile.FILENAME_CHOICES:
dump_file = dumps_qs.filter(file_name=dump_name, file_type=DumpFile.TYPE_CSV).last()
if dump_file is not None:
dumps.append(dump_file)

for dump in dumps:
try:
csv_to_xlsx(dump)
except Exception as e:
catalog = node or 'global'
msg = f"Error escribiendo dump XLSX de dump {catalog} {dump.file_name}: {e.__class__}: {e}"
GenerateDumpTask.info(task, msg)
def generate(task: GenerateDumpTask, node: str = None, workbook_class=DumpWorkbook):
for dump in DumpFile.get_last_of_type(DumpFile.TYPE_CSV, node):
XLSXWriter(task, dump, workbook_class).write()

for filename, _ in DumpFile.FILENAME_CHOICES:
remove_old_dumps(filename, DumpFile.TYPE_CSV, node)
52 changes: 52 additions & 0 deletions series_tiempo_ar_api/apps/dump/generator/xlsx/workbook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from xlsxwriter import Workbook

from series_tiempo_ar_api.apps.dump.generator.xlsx.worksheet import DumpWorksheet, SingleWorksheet


class DumpWorkbook:
frequency_col_name = 'indice_tiempo_frecuencia'

def __init__(self, filename: str, header_row: list, split_by_frequency=False):
self.workbook = Workbook(filename)
self.split_by_frequency = split_by_frequency

self.header_row = header_row
for i, col_name in enumerate(self.header_row):
if col_name == self.frequency_col_name:
self.frequency_column_index = i
break

self.sheets = {}
self.single_sheet = None

def add_worksheet(self, sheet_name, frequency):
self.sheets[frequency] = DumpWorksheet(self.workbook, sheet_name)
self.sheets[frequency].write_row(self.header_row)

def write_row(self, row):
if self.split_by_frequency:
sheet = row[self.frequency_column_index]
if sheet not in self.sheets:
self.init_worksheet(sheet)
self.sheets[sheet].write_row(row)
return

if not self.single_sheet:
self.single_sheet = SingleWorksheet(self.workbook)
self.single_sheet.write_row(self.header_row)

self.single_sheet.write_row(row)

def close(self):
self.workbook.close()

def init_worksheet(self, frequency: str):
names = {
'R/P1Y': 'anual',
'R/P6M': 'semestral',
'R/P3M': 'trimestral',
'R/P1M': 'mensual',
'R/P1D': 'diaria',
}
sheet_name = names[frequency]
self.add_worksheet(sheet_name, frequency)
36 changes: 36 additions & 0 deletions series_tiempo_ar_api/apps/dump/generator/xlsx/worksheet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from xlsxwriter import Workbook


class DumpWorksheet:
MAX_ROWS_PER_SHEET = 1000000

def __init__(self, workbook: Workbook, name: str):
self.name = name
self.workbook = workbook
self.sheet_count = 0
self.current_row = 0
self.current_sheet = None

self.init_worksheet()

def write_row(self, row: list):
self.current_sheet.write_row(self.current_row, 0, row)
self.current_row += 1

if self.current_row > self.MAX_ROWS_PER_SHEET:
self.init_worksheet()

def init_worksheet(self):
self.sheet_count += 1
sheet_name = f'{self.name}-{self.sheet_count}'
self.current_sheet = self.workbook.add_worksheet(sheet_name)


class SingleWorksheet:
def __init__(self, workbook: Workbook, name: str = None):
self.current_row = 0
self.current_sheet = workbook.add_worksheet(name)

def write_row(self, row: list):
self.current_sheet.write_row(self.current_row, 0, row)
self.current_row += 1
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#! coding: utf-8
from django.core.management import BaseCommand

from series_tiempo_ar_api.apps.dump.tasks import enqueue_csv_dump_task
from series_tiempo_ar_api.apps.dump.models import GenerateDumpTask
from series_tiempo_ar_api.apps.dump.tasks import enqueue_dump_task


class Command(BaseCommand):
Expand All @@ -10,4 +11,5 @@ def add_arguments(self, parser):
parser.add_argument('--index', type=str, default=None)

def handle(self, *args, **options):
enqueue_csv_dump_task()
task = GenerateDumpTask.objects.create(file_type=GenerateDumpTask.TYPE_CSV)
enqueue_dump_task(task)
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#! coding: utf-8
from django.core.management import BaseCommand

from series_tiempo_ar_api.apps.dump.tasks import enqueue_xlsx_dump_task
from series_tiempo_ar_api.apps.dump.models import GenerateDumpTask
from series_tiempo_ar_api.apps.dump.tasks import enqueue_dump_task


class Command(BaseCommand):
def handle(self, *args, **options):
enqueue_xlsx_dump_task()
task = GenerateDumpTask.objects.create(file_type=GenerateDumpTask.TYPE_XLSX)
enqueue_dump_task(task)
18 changes: 18 additions & 0 deletions series_tiempo_ar_api/apps/dump/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,21 @@ def get_from_path(cls, filename: str, node: str = None) -> 'DumpFile':
if dump is None:
raise cls.DoesNotExist
return dump

@classmethod
def get_last_of_type(cls, file_type: str, node: str = None) -> list:
"""Devuelve el último dump generado del formato file_type especificado.
Si se pasa un parámetro node, devuelve los últimos dumps para ese node.
"""

dumps_qs = cls.objects.all()
if node:
dumps_qs = dumps_qs.filter(node__catalog_id=node)

dumps = []
for dump_name, _ in cls.FILENAME_CHOICES:
dump_file = dumps_qs.filter(file_name=dump_name, file_type=file_type).last()
if dump_file is not None:
dumps.append(dump_file)

return dumps
20 changes: 12 additions & 8 deletions series_tiempo_ar_api/apps/dump/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from django_rq import job

from series_tiempo_ar_api.apps.dump.models import GenerateDumpTask
from series_tiempo_ar_api.apps.dump.writer import Writer
from .generator.generator import DumpGenerator
from .generator.xlsx import generator
Expand All @@ -10,6 +11,17 @@
logger = logging.Logger(__name__)


@job('default', timeout='2h')
def enqueue_dump_task(task: GenerateDumpTask):
task_choices = {
GenerateDumpTask.TYPE_CSV: write_csv,
GenerateDumpTask.TYPE_XLSX: write_xlsx,
}

task.refresh_from_db()
task_choices[task.file_type](task.id)


@job('default', timeout='2h')
def write_csv(task_id, catalog=None):
Writer('CSV', lambda task, catalog_id: DumpGenerator(task, catalog_id).generate(),
Expand All @@ -18,14 +30,6 @@ def write_csv(task_id, catalog=None):
catalog).write()


def enqueue_csv_dump_task(task=None):
write_csv.delay(task)


@job('default', timeout='2h')
def write_xlsx(task_id, catalog=None):
Writer('XLSX', generator.generate, write_xlsx, task_id, catalog).write()


def enqueue_xlsx_dump_task(task=None):
write_xlsx.delay(task)
11 changes: 11 additions & 0 deletions series_tiempo_ar_api/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!coding=utf8
import csv
import io
import json

from django_datajsonar.models import Metadata, Node, ReadDataJsonTask, \
Expand Down Expand Up @@ -36,3 +38,12 @@ def index_catalog(catalog_id, catalog_path, index, node=None):
for distribution in Distribution.objects.filter(dataset__catalog__identifier=catalog_id):
DistributionIndexer(index=index).run(distribution)
ElasticInstance.get().indices.forcemerge(index=index)


def read_file_as_csv(file):
ios = io.StringIO()
ios.write(file.read().decode('utf-8'))

ios.seek(0)
reader = csv.reader(ios)
return reader

0 comments on commit 4564dff

Please sign in to comment.