Remove this and queries.json when done

In [3]:
import os
import logging
import pandas
from grizly import QFrame, S3, Workflow
import dask
from distributed import Client, fire_and_forget
from sqlalchemy import create_engine
import cx_Oracle

@dask.delayed
def clean_s3_table(table, s3_key):
    s3 = S3(s3_key=s3_key)
    for file_name in s3.list():
        S3(file_name=file_name, s3_key=s3_key, bucket="acoe-s3").delete()

@dask.delayed
def get_downloaded_partitions(qf, logger):
    table = qf.data["select"]["table"].lower()
    logger.info("starting_ extract process")
    s3 = S3(s3_key=f"data_loads/inventory/{table}")
    downloaded_partitions = []
    for item in s3.list():
        items = item.split(".")
        if items[1]=="parquet":
            downloaded_partitions.append(items[0].split("_")[1])
    logger.info("denodo  get_downloaded_partitions() done")
    return downloaded_partitions

@dask.delayed
def get_partitions(qf, downloaded_partitions, partition_column):
    _qf = qf.copy()
    _qf.select([partition_column])
    _qf.assign(partition_column="sq." + partition_column)
    _qf.groupby([f"sq.{partition_column}", "partition_column" ])[f"sq.{partition_column}"].agg("count")
    df = _qf.to_df()
    l = []
    for partition in df["partition_column"]:
        if partition not in downloaded_partitions:
            #l.append("'"+partition+"'")
            l.append(partition)
    logger.info("denodo  get_partitions() done")
    return l

@dask.delayed
def to_parquet(qf, partition_column, partitions_to_download, downloads_path, fillna={}):
    table = qf.data["select"]["table"].lower()
    s3_key=f'data_loads/inventory/{table}'
    if partitions_to_download == []:
        logger.info("Done. No more partitions to download")
        return "done"
    else:
        for partition in  partitions_to_download:
            _qf = qf.copy()
            logger.info(f"extracting table {table} for {partition_column} {partition} in {s3_key}")
            partition_id = partition.replace("'","")
            file_name = f"{partition_column}_{partition_id}.parquet"
            where = f"{partition_column}='{partition}'"
            _qf.query(where)
            logger.info(_qf.get_sql())
            df = _qf.to_df()
            df.fillna(fillna).to_parquet(downloads_path+"/"+file_name)
            if not df.empty:
                logger.info(f'data_loads/inventory/{table}')
                s3 = S3(file_name = file_name, s3_key=s3_key
                        , file_dir=downloads_path).from_file(keep_file=False)
            else:
                logger.info(df)
        logger.info(f'all loaded data_loads/inventory/{table}')

In [4]:
import logging
import json
from distributed import Client, fire_and_forget, Future

from grizly import QFrame, Workflow

def run_job(json_path, subquery, reload=False):
    with open(json_path, "r") as f:
        job = json.load(f)[subquery]

    logger = logging.getLogger("distributed.worker.extract." + subquery)
    client = Client(job["workflow"]["ip"])
    
    qf = QFrame(engine=job["origin"]["engine"]
               ).from_json(json_path, subquery)
    fillna = {field:"NoValue" for field in qf.get_fields()}
    
    s3_key = job["destination"]["s3_root"]+job["destination"]["table"]
    
    if reload:
        reloaded = clean_s3_table(table, s3_key)
    else:
        reloaded = ""
        
    downloaded_partitions = get_downloaded_partitions(qf, logger)
    uploads = [downloaded_partitions]

    wf = Workflow(
            subquery,
            owner_email="acivitillo@te.com",
            backup_email="marcin.socha@te.com",
            tasks=uploads,
            priority=0,
        )
    wf.submit(client)
    
def kill_job(subquery):
    with open(json_path, "r") as f:
        job = json.load(f)[subquery]
    client = Client(job["workflow"]["ip"])
    f = Future(f"{subquery}_graph", client=client)
    f.cancel(force=True)
    client.close()

        
json_path = "/home/analyst/grizly/notebooks/queries.json"
subquery = "planned_indp"
run_job(json_path, subquery)

In [None]:
kill_job(subquery)

In [4]:

client = Client("acoe.connect.te.com:8786")

subquery = "planned_indp"
partition_column = "plant"
fillna = {"indicator_active_version":"NoValue"}

root = "/home/analyst/inventory"
downloads_path=f"/home/analyst/data_loads"
qf = QFrame(engine="mssql+pyodbc://denodo_dev"
           ).from_json(os.path.join(root, "queries.json"), subquery=subquery)
table = qf.data["select"]["table"].lower()
s3_key=f"data_loads/inventory/{table}"

done = clean_s3_table(table, s3_key)
downloaded_partitions = get_downloaded_partitions(qf)
partitions_to_download = get_partitions(qf, downloaded_partitions, partition_column)
parquets = to_parquet(qf, partition_column, partitions_to_download, downloads_path, fillna)

#uploads = [done]
uploads = [parquets]

wf = Workflow(
        subquery,
        owner_email="acivitillo@te.com",
        backup_email="marcin.socha@te.com",
        tasks=uploads,
        priority=0,
    )
wf.submit(client)

{'planned_indp': {'destination': {'download_path': '/home/analyst/data_loads',
   's3_key': 'ac_testing/loads/',
   'table': 'table',
   'schema': 'schema'},
  'origin': {'engine': 'mssql+pyodbc://denodo_dev', 'type': 'odbc'},
  'workflow': {'ip': 'acoe.connect.te.com:8786',
   'partition_column': 'plant',
   'owner_email': 'acivitillo@te.com',
   'backup_email': 'marcin.socha@te.com'},
  'select': {'table': 'PLANNED_INDP_REQR_FORECASTS_V',
   'fields': {'material_number': {'type': 'dim',
     'as': '',
     'group_by': '',
     'order_by': '',
     'expression': '',
     'select': '',
     'custom_type': ''},
    'plant': {'type': 'dim',
     'as': '',
     'group_by': '',
     'order_by': '',
     'expression': '',
     'select': '',
     'custom_type': ''},
    'requirements_type': {'type': 'dim',
     'as': '',
     'group_by': '',
     'order_by': '',
     'expression': '',
     'select': '',
     'custom_type': ''},
    'requirements_version': {'type': 'dim',
     'as': '',
     