In [1]:
# db_utils

import json
import pandas as pd
import traceback
import sqlalchemy
import os
import pyodbc
import sys
# import pymssql
import numpy as np

from MySQLdb._exceptions import OperationalError
from sqlalchemy import create_engine, exc,event
from urllib.parse import quote_plus

from time import time

#connection_string = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=192.168.15.126;UID=BRS;PWD=Fint$123;Trusted Connection=yes;DATABASE="
connection_string = None
import logging
   
# logging = Logging()

class DB(object):
    def __init__(self, database, host='127.0.0.1', user='root', password='', port='3306', tenant_id=None):
        """
        Initialization of databse object.

        Args:
            databse (str): The database to connect to.
            host (str): Host IP address. For dockerized app, it is the name of
                the service set in the compose file.
            user (str): Username of MySQL server. (default = 'root')
            password (str): Password of MySQL server. For dockerized app, the
                password is set in the compose file. (default = '')
            port (str): Port number for MySQL. For dockerized app, the port that
                is mapped in the compose file. (default = '3306')
        """

        if host in ["common_db","extraction_db", "queue_db", "template_db", "table_db", "stats_db", "business_rules_db", "reports_db"]:
            self.HOST = os.environ['HOST_IP']
            self.USER = os.environ['LOCAL_DB_USER']
            self.PASSWORD = os.environ['LOCAL_DB_PASSWORD']
            self.PORT = os.environ['LOCAL_DB_PORT']
            self.DATABASE = f'{tenant_id}_{database}' if tenant_id is not None and tenant_id else database
        else:
            self.HOST = os.environ['HOST_IP']
            self.USER = os.environ['LOCAL_DB_USER']
            self.PASSWORD = os.environ['LOCAL_DB_PASSWORD']
            self.PORT = os.environ['LOCAL_DB_PORT']
            self.DATABASE = f'{tenant_id}_{database}' if tenant_id is not None and tenant_id else database
       
        logging.info(f'Host: {self.HOST}')
        logging.info(f'User: {self.USER}')
        logging.info(f'Password: {self.PASSWORD}')
        logging.info(f'Port: {self.PORT}')
        logging.info(f'Database: {self.DATABASE}')
        # self.connect()
    def connect(self, max_retry=5):
#         retry = 1

#         try:
#             start = time()
#             logging.debug(f'Making connection to {self.DATABASE}...')
#             config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{self.DATABASE}?charset=utf8'
#             self.db_ = create_engine(config, connect_args={'connect_timeout': 2}, pool_recycle=300)
#             logging.info(f'Engine created for {self.DATABASE}')
#             while retry <= max_retry:
#                 try:
#                     self.engine = self.db_.connect()
#                     logging.info(f'Connection established succesfully to {self.DATABASE}! ({round(time() - start, 2)} secs to connect)')
#                     break
#                 except Exception as e:
#                     logging.warning(f'Connection failed. Retrying... ({retry}) [{e}]')
#                     retry += 1
#                     self.db_.dispose()
#         except:
#             logging.exception(f'Something went wrong while connecting. Check trace.')
#             return
        data = []
        inds = [i for i in range(len(sql)) if sql[i] == '']
        print(inds)
        for pos, ind in enumerate(inds):
            if pos % 2 == 0:
                sql = sql[:ind] + '[' + sql[ind+1:]
            else:
                sql = sql[:ind] + ']' + sql[ind + 1:]
        if connection_string:
            try:
                conn = pyodbc.connect(connection_string + self.DATABASE)
            except Exception as e:
                print('Connection string invalid. ', e)
        else:
            try:
                if user_ or password_:
                    conn = pyodbc.connect('DRIVER={' + driver + '};SERVER=' + host_ + ';DATABASE=' + database+ ';UID=' + user_ + ';PWD=' + password_ + ';Trusted Connection=yes;')
                else:
                    conn = pyodbc.connect('DRIVER={' + driver + '};SERVER=' + host_ + ';DATABASE=' + database + ';Trusted Connection=yes;')
            except Exception as e:
                print("Error establishing connection to DB. ", e)
                conn = pyodbc.connect('DRIVER={' + driver + '};SERVER=' + host_ + ';DATABASE=' + database + ';Trusted Connection=yes;')

    def convert_to_mssql(self, query):
        inds = [i for i in range(len(query)) if query[i] == '`']
        for pos, ind in enumerate(inds):
            if pos % 2 == 0:
                query = query[:ind] + '[' + query[ind+1:]
            else:
                query = query[:ind] + ']' + query[ind + 1:]
       
        query = query.replace('%s', '?')

        return query

    def execute(self, query, database=None, index_col='id', **kwargs):
        logging.debug(f'Before converting: {query}')
        query = self.convert_to_mssql(query)
        logging.debug(f'After converting: {query}')

        logging.debug('Connecting to DB')
        conn = pyodbc.connect(f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={self.HOST};UID={self.USER};PWD={self.PASSWORD};Trusted Connection=yes;DATABASE={self.DATABASE}', as_dict=True)
        logging.debug(f'Connection established with {self.DATABASE}. [{conn}]')
        curs = conn.cursor()
        logging.debug(f'Cursor object created. [{curs}]')
        params = tuple(kwargs.get('params', []))
       
        logging.debug(f'Params: {params}')
        logging.debug(f'Params Type: {type(params)}')
        params = [int(i) if isinstance(i, np.int64) else i for i in params]
        curs.execute(query, params)
        logging.debug(f'Query executed.')
       
        data = None

        try:
            logging.debug(f'Fetching all data.')
            data = curs.fetchall()
            # logging.debug(f'Data fetched: {data}')
            columns = [column[0] for column in curs.description]
            logging.debug(f'Columns: {columns}')
            result = []
            for row in data:
                result.append(dict(zip(columns, row)))
            # logging.debug(f'Zipped result: {result}')
            if result:
                data = pd.DataFrame(result)
            else:
                data = pd.DataFrame(columns=columns)
            # logging.debug(f'Data to DF: {data}')
        except:
            logging.debug('Update Query')
        conn.commit()
        conn.close()
        if not isinstance(data, pd.DataFrame):
            logging.debug(f'Data is not a DataFrame. Returning True. [{type(data)}]')
            return True
       
        try:
            if index_col is not None:
                logging.debug(f'Setting ID as index')
                return data.where((pd.notnull(data)), None).set_index('id')
            else:
                return data.where((pd.notnull(data)), None)
        except:
            logging.exception(f'Failed to set ID as index')
            return data.where((pd.notnull(data)), None)

    def execute__(self, query, database=None, **kwargs):
        """
        Executes an SQL query.

        Args:
            query (str): The query that needs to be executed.
            database (str): Name of the database to execute the query in. Leave
                it none if you want use database during object creation.
            params (list/tuple/dict): List of parameters to pass to in the query.

        Returns:
            (DataFrame) A pandas dataframe containing the data from the executed
            query. (None if an error occurs)
        """
        data = None

#         # Use new database if a new databse is given
#         if database is not None:
#             try:
#                 config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
#                 engine = create_engine(config, pool_recycle=300)
#             except:
#                 logging.exception(f'Something went wrong while connecting. Check trace.')
#                 return False
#         else:
#             engine = self.engine
       
        print('query', query)
        if database is None:
            database = 'karvy'
        data = None
        sql = query
        user_ = self.USER
        host_ = self.HOST
        database = self.DATABASE
        password_ = self.PASSWORD
        inds = [i for i in range(len(sql)) if sql[i] == '']
        for pos, ind in enumerate(inds):
            if pos % 2 == 0:
                sql = sql[:ind] + '[' + sql[ind+1:]
            else:
                sql = sql[:ind] + ']' + sql[ind + 1:]
               
        if connection_string:
            print('connection string', connection_string)
            print('database', database)
            print(type(connection_string + database))
            print(type(connection_string + database))

            try:
                conn = pyodbc.connect(connection_string + database)
            except Exception as e:
                print('Connection string invalid. ', e)
        else:
            try:
                if user_ or password_:
                    conn = pymssql.connect(host=host_,database=database,user=user_,password=password_)
                else:
                    conn = pymssql.connect(host=host_,database=database)
            except Exception as e:
                print("Error establishing connection to DB. ", e)
                conn = pymssql.connect(host=host_,database=database)
        try:
            logging.debug(f'Query: {query}')
            logging.debug(f'Query: {query}')
            curs = conn.cursor(as_dict = True)
            params = kwargs.get('params', [])
            logging.debug(f'Params: {params}')
            curs.execute(sql, tuple(params))
            print('query executed')
            try:
                data = curs.fetchall()
                data = pd.DataFrame(data)
                # print(data)
            except Exception as e:
                logging.debug('Update Query')
            # data = pd.read_sql(sql, conn, index_col='id', **kwargs)
        except exc.ResourceClosedError:
            logging.warning('Query does not have any value to return.')
            return True
        except (exc.StatementError, OperationalError) as e:
            logging.warning(f'Creating new connection. Engine/Connection is probably None. [{e}]')
            self.connect()
            data = pd.read_sql(query, self.engine, index_col='id', **kwargs)
        except:
            logging.exception('Something went wrong executing query. Check trace.')
            params = kwargs['params'] if 'params' in kwargs else None
            conn.rollback()
            return False
        conn.close()
        return data.where((pd.notnull(data)), None).set_index('id')

    def execute_(self, query, database=None, **kwargs):
        """
        Executes an SQL query.

        Args:
            query (str): The query that needs to be executed.
            database (str): Name of the database to execute the query in. Leave
                it none if you want use database during object creation.
            params (list/tuple/dict): List of parameters to pass to in the query.

        Returns:
            (DataFrame) A pandas dataframe containing the data from the executed
            query. (None if an error occurs)
        """
        logging.debug(f'Executing `execute` instead of `execute_`')
        return self.execute(query, index_col=None, **kwargs)
       
        data = None

#         # Use new database if a new database is given
#         if database is not None:
#             try:
#                 config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
#                 engine = create_engine(config, pool_recycle=300)
#             except:
#                 logging.exception(f'Something went wrong while connecting. Check trace.')
#                 return False
#         else:
#             engine = self.engine

#         try:
#             data = pd.read_sql(query, engine, **kwargs)
#         except exc.ResourceClosedError:
#             return True
#         except:
#             logging.exception(f'Something went wrong while connecting. Check trace.')
#             params = kwargs['params'] if 'params' in kwargs else None
#             return False
        print('query', query)
        if database is None:
            database = 'karvy'
        data = None
        sql = query
        user_ = self.USER
        database = self.DATABASE
        host_ = self.HOST
        password_ = self.PASSWORD
        inds = [i for i in range(len(sql)) if sql[i] == '']
        # for pos, ind in enumerate(inds):
        #     if pos % 2 == 0:
        #         sql = sql[:ind] + '[' + sql[ind+1:]
        #     else:
        #         sql = sql[:ind] + ']' + sql[ind + 1:]
               
        if connection_string:
            print('connection string', connection_string)
            print('database', database)
            print(type(connection_string + database))
            print(type(connection_string + database))

            try:
                conn = pyodbc.connect(connection_string + database)
            except Exception as e:
                print('Connection string invalid. ', e)
        else:
            try:
                if user_ or password_:
                    conn = pymssql.connect(host=host_,database=database,user=user_,password=password_)
                else:
                    conn = pymssql.connect(host=host_,database=database)
            except Exception as e:
                print("Error establishing connection to DB. ", e)
                conn = pymssql.connect(host=host_,database=database)
        try:
            logging.debug(f'Query: {query}')
            curs = conn.cursor(as_dict = True)
            params = kwargs.get('params', [])
            logging.debug(f'Params: {params}')
            curs.execute(sql, params)
            print('query executed')
            try:
                data = curs.fetchall()
                data = pd.DataFrame(data)
                print(data)
            except Exception as e:
                logging.debug('Update Query')

            #data = pd.read_sql(sql, conn,**kwargs)
        except exc.ResourceClosedError:
            logging.warning('Query does not have any value to return.')
            return True
        except (exc.StatementError, OperationalError) as e:
            logging.warning(f'Creating new connection. Engine/Connection is probably None. [{e}]')
            self.connect()
            data = pd.read_sql(query, conn,**kwargs)
        except:
            logging.exception('Something went wrong executing query. Check trace.')
            params = kwargs['params'] if 'params' in kwargs else None
            conn.rollback()
            return False
        conn.commit()
        conn.close()
#         return data.where((pd.notnull(data)), None)
        try:
            return data.replace({pd.np.nan: None}).set_index('id')
        except AttributeError as e:
            return True

    def insert(self, data, table, **kwargs):
        """
        Write records stored in a DataFrame to a SQL database.

        Args:
            data (DataFrame): The DataFrame that needs to be write to SQL database.
            table (str): The table in which the rcords should be written to.
            kwargs: Keyword arguments for pandas to_sql function.
                See https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html
                to know the arguments that can be passed.

        Returns:
            (bool) True is succesfully inserted, else false.
        """
        logging.info(f'Inserting into `{table}`')
       
        conn = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={self.HOST};UID={self.USER};PWD={self.PASSWORD};Trusted Connection=yes;DATABASE={self.DATABASE}'

       
#         conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
        quoted = quote_plus(conn)
        new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
        self.engine = create_engine(new_con)
#         print(self.engine)
        try:
            data.to_sql(table, self.engine,chunksize = None, **kwargs)
            try:
                self.execute(f'ALTER TABLE `{table}` ADD PRIMARY KEY (`id`);')
            except:
                pass
            return True
        except:
            logging.exception('Something went wrong inserting. Check trace.')
            return False

   
   
    def insert_(self, data, table, database=None, **kwargs):
        """
        Write records stored in a DataFrame to a SQL database.

        Args:
            data (DataFrame): The DataFrame that needs to be write to SQL database.
            table (str): The table in which the rcords should be written to.
            database (str): The database the table lies in. Leave it none if you
                want use database during object creation.
            kwargs: Keyword arguments for pandas to_sql function.
                See https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html
                to know the arguments that can be passed.

        Returns:
            (bool) True is succesfully inserted, else false.
        """
        logging.info(f'Inserting into {table}')

        # # Use new database if a new databse is given
        # if database is not None:
        #     try:
        #         config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
        #         engine = create_engine(config, pool_recycle=300)
        #     except:
        #         logging.exception(f'Something went wrong while connecting. Check trace.')
        #         return False
        # else:
        #     engine = self.engine

        if database is None:
            database = 'karvy'
        data = None
        sql = query
        user_ = self.USER
        database = self.DATABASE
        host_ = self.HOST
        password_ = self.PASSWORD
        inds = [i for i in range(len(sql)) if sql[i] == '']
        # for pos, ind in enumerate(inds):
        #     if pos % 2 == 0:
        #         sql = sql[:ind] + '[' + sql[ind+1:]
        #     else:
        #         sql = sql[:ind] + ']' + sql[ind + 1:]
               
        if connection_string:
            print('connection string', connection_string)
            print('database', database)
            print(type(connection_string + database))
            print(type(connection_string + database))

            try:
                conn = pyodbc.connect(connection_string + database)
            except Exception as e:
                print('Connection string invalid. ', e)
        else:
            try:
                if user_ or password_:
                    conn = pymssql.connect(host=host_,database=database,user=user_,password=password_)
                else:
                    conn = pymssql.connect(host=host_,database=database)
            except Exception as e:
                print("Error establishing connection to DB. ", e)
                conn = pymssql.connect(host=host_,database=database)
        try:
            logging.debug(f'Query: {query}')
            # data.to_sql(table, conn, **kwargs)
            curs = conn.cursor(as_dict = True)
            curs.execute(sql, tuple(kwargs.get('params', [])))
            print('query executed')
            try:
                data = curs.fetchall()
                data = pd.DataFrame(data)
                print(data)
            except Exception as e:
                logging.debug('Update Query')

            try:
                self.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id);')
            except:
                pass
            conn.commit()
            conn.close()
            return True
        except:
            logging.exception('Something went wrong inserting. Check trace.')
            return False

    def insert_dict(self, data, table):
        """
        Insert dictionary into a SQL database table.

        Args:
            data (DataFrame): The DataFrame that needs to be write to SQL database.
            table (str): The table in which the rcords should be written to.

        Returns:
            (bool) True is succesfully inserted, else false.
        """
        logging.info(f'Inserting dictionary data into {table}...')
        logging.debug(f'Data:\n{data}')

        try:
            column_names = []
            params = []

            for column_name, value in data.items():
                column_names.append(f'{column_name}')
                params.append(value)

            logging.debug(f'Column names: {column_names}')
            logging.debug(f'Params: {params}')

            columns_string = ', '.join(column_names)
            param_placeholders = ', '.join(['%s'] * len(column_names))

            query = f'INSERT INTO {table} ({columns_string}) VALUES ({param_placeholders})'

            return self.execute(query, params=params)
        except:
            logging.exception('Error inserting data.')
            return False

    def update(self, table, update=None, where=None, database=None, force_update=False):
        # Use new database if a new databse is given
        # if database is not None:
        #     try:
        #         config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
        #         self.engine = create_engine(config, pool_recycle=300)
        #     except:
        #         logging.exception(f'Something went wrong while connecting. Check trace.')
        #         return False

        logging.info(f'Updating table: {table}')
        logging.info(f'Update data: {update}')
        logging.info(f'Where clause data: {where}')
        logging.info(f'Force update flag: {force_update}')

        try:
            set_clause = []
            set_value_list = []
            where_clause = []
            where_value_list = []

            if where is not None and where:
                for set_column, set_value in update.items():
                    set_clause.append(f'{set_column}=%s')
                    set_value_list.append(set_value)
                set_clause_string = ', '.join(set_clause)
            else:
                logging.error(f'Update dictionary is None/empty. Must have some update clause.')
                return False

            if where is not None and where:
                for where_column, where_value in where.items():
                    where_clause.append(f'{where_column}=%s')
                    where_value_list.append(where_value)
                where_clause_string = ' AND '.join(where_clause)
                query = f'UPDATE {table} SET {set_clause_string} WHERE {where_clause_string}'
            else:
                if force_update:
                    query = f'UPDATE {table} SET {set_clause_string}'
                else:
                    message = 'Where dictionary is None/empty. If you want to force update every row, pass force_update as True.'
                    logging.error(message)
                    return False

            params = set_value_list + where_value_list
            self.execute(query, params=params)
            return True
        except:
            logging.exception('Something went wrong updating. Check trace.')
            return False

    def get_column_names(self, table, database=None):
        """
        Get all column names from an SQL table.

        Args:
            table (str): Name of the table from which column names should be extracted.
            database (str): Name of the database in which the table lies. Leave
                it none if you want use database during object creation.

        Returns:
            (list) List of headers. (None if an error occurs)
        """
        try:
            logging.info(f'Getting column names of table {table}')
            return list(self.execute(f'SELECT * FROM {table}', database))
        except:
            logging.exception('Something went wrong getting column names. Check trace.')
            return

    def execute_default_index(self, query, database=None, **kwargs):
        """
        Executes an SQL query.

        Args:
            query (str): The query that needs to be executed.
            database (str): Name of the database to execute the query in. Leave
                it none if you want use database during object creation.
            params (list/tuple/dict): List of parameters to pass to in the query.

        Returns:
            (DataFrame) A pandas dataframe containing the data from the executed
            query. (None if an error occurs)
        """

        logging.debug(f'Executing `execute` instead of `execute_default_index`')
        return self.execute(query, index_col=None, **kwargs)
        data = None

        # # Use new database if a new databse is given
        # if database is not None:
        #     try:
        #         config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
        #         engine = create_engine(config, pool_recycle=300)
        #     except:
        #         logging.exception(f'Something went wrong while connecting. Check trace.')
        #         return False
        # else:
        #     engine = self.engine

        print('query', query)
        if database is None:
            database = 'karvy'
        data = None
        sql = query
        user_ = self.USER
        host_ = self.HOST
        database = self.DATABASE
        password_ = self.PASSWORD
        inds = [i for i in range(len(sql)) if sql[i] == '']
        for pos, ind in enumerate(inds):
            if pos % 2 == 0:
                sql = sql[:ind] + '[' + sql[ind+1:]
            else:
                sql = sql[:ind] + ']' + sql[ind + 1:]
               
        if connection_string:
            print('connection string', connection_string)
            print('database', database)
            print(type(connection_string + database))
            print(type(connection_string + database))

            try:
                conn = pyodbc.connect(connection_string + database)
            except Exception as e:
                print('Connection string invalid. ', e)
        else:
            try:
                if user_ or password_:
                    conn = pymssql.connect(host=host_,database=database,user=user_,password=password_)
                else:
                    conn = pymssql.connect(host=host_,database=database)
            except Exception as e:
                print("Error establishing connection to DB. ", e)
                conn = pymssql.connect(host=host_,database=database)

        try:
            logging.debug(f'Query: {query}')
            # data.to_sql(table, conn, **kwargs)
            curs = conn.cursor(as_dict = True)
           
            curs.execute(sql, tuple(kwargs.get('params', [])))
            print('query executed')
            try:
                data = curs.fetchall()
                data = pd.DataFrame(data)
                print(data)
            except Exception as e:
                logging.debug('Update Query')
            # data = pd.read_sql(query, conn, **kwargs)
            conn.commit()
            conn.close()
        except exc.ResourceClosedError:
            return True
        except:
            logging.exception(f'Something went wrong while executing query. Check trace.')
            params = kwargs['params'] if 'params' in kwargs else None
            return False

        return data.where((pd.notnull(data)), None).set_index('id')


    def get_all(self, table, database=None, discard=None):
        """
        Get all data from an SQL table.

        Args:
            table (str): Name of the table from which data should be extracted.
            database (str): Name of the database in which the table lies. Leave
                it none if you want use database during object creation.
            discard (list): columns to be excluded while selecting all
        Returns:
            (DataFrame) A pandas dataframe containing the data. (None if an error
            occurs)
        """
        logging.info(f'Getting all data from {table}')
        if discard:
            logging.info(f'Discarding columns {discard}')
            columns = list(self.execute_default_index(f'SHOW COLUMNS FROM {table}',database).Field)
            columns = [col for col in columns if col not in discard]
            columns_str = json.dumps(columns).replace("'",'').replace('"','')[1:-1]
            return self.execute(f'SELECT {columns_str} FROM {table}', database)

        return self.execute(f'SELECT * FROM {table}', database)

    def get_latest(self, data, group_by_col, sort_col):
        """
        Group data by a column containing repeated values and get latest from it by
        taking the latest value based on another column.

        Example:
        Get the latest products
            id     product   date
            220    6647     2014-09-01
            220    6647     2014-10-16
            826    3380     2014-11-11
            826    3380     2015-05-19
            901    4555     2014-09-01
            901    4555     2014-11-01

        The function will return
            id     product   date
            220    6647     2014-10-16
            826    3380     2015-05-19
            901    4555     2014-11-01

        Args:
            data (DataFrame): Pandas DataFrame to query on.
            group_by_col (str): Column containing repeated values.
            sort_col (str): Column to identify the latest record.

        Returns:
            (DataFrame) Contains the latest records. (None if an error occurs)
        """
        try:
            logging.info('Grouping data...')
            logging.info(f'Data: {data}')
            logging.info(f'Group by column: {group_by_col}')
            logging.info(f'Sort column: {sort_col}')
            return data.sort_values(sort_col).set_index('id').groupby(group_by_col).tail(1)
        except KeyError as e:
            logging.error(f'Column {e.args[0]} does not exist.')
            return None
        except:
            logging.exception('Something went wrong while grouping data.')
            return None

db_config = {
   'host': '13.233.100.20',
   'port': '1433',
   'user': 'SA',
   'password':'Akhil@Akhil1'
}
import os
os.environ['HOST_IP'] = '13.233.100.20'
os.environ['LOCAL_DB_USER']='SA'
os.environ['LOCAL_DB_PASSWORD'] = 'Akhil@Akhil1'
os.environ['LOCAL_DB_PORT'] = '1433'




In [2]:
# helper functions
def debug_df(df, num=20):
    df.printSchema()
    df.show(num)
    

def decrease_date(s, days):
    date = datetime.datetime.strptime(s, "%Y-%m-%d")
    modified_date = date - datetime.timedelta(days=days)
    return datetime.datetime.strftime(modified_date, "%Y-%m-%d")

def read_df(table, columns_to_retrieve, database):
    
    query = f"SELECT {','.join(columns_to_retrieve)} from {table}"
    # logging.info(f"query to be executed is {query}")
    
    data = spark.read.format("jdbc") \
            .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") \
            .option("query", query) \
            .option("user", user) \
            .option("password", password) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .load()
    return data

from datetime import timedelta, date
#
def daterange(start_date, end_date):
    start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d")
    for n in range(int((end_date - start_date).days)):
        yield datetime.datetime.strftime(start_date + timedelta(n), "%Y-%m-%d")

In [3]:
# imports
# required libraries
from pyspark import SparkContext, SparkConf #
from pyspark.sql import SparkSession # for dataframe conversions
# for type conversions
from datetime import datetime
from pyspark.sql.window import Window
from pyspark.sql.functions import col, udf, sum # col, udf (user defined functions)
from pyspark.sql.types import DateType, IntegerType # type
from pyspark.sql.types import *
from pyspark.sql.functions import trim # for trimming
from pyspark.sql.functions import collect_list, sort_array, row_number # for grouping and taking the last/first element
from pyspark.sql.functions import *
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import time
import datetime 
import multiprocessing


# intialize spark
conf = SparkConf()
conf.setMaster("local[*]").setAppName("My app").set("spark.sql.shuffle.partitions", 2*multiprocessing.cpu_count())
 
    

import pandas as pd
import numpy as np

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def populate_db_func(purred: pd.Series, units: pd.Series) -> pd.Series:
    return pd.to_numeric(pd.Series(np.where(purred=='P', units, 0)))

def populate_cr_func(purred: pd.Series, units: pd.Series) -> pd.Series:
    return pd.to_numeric(pd.Series(np.where(purred=='R', units, 0)))

populate_db = pandas_udf(populate_db_func, returnType=FloatType())
populate_cr = pandas_udf(populate_cr_func, returnType=FloatType())    
    
#Create spark context and sparksession

SparkContext.setSystemProperty("spark.driver.memory", "60g")
SparkContext.setSystemProperty("spark.executor.memory", "60g")
SparkContext.setSystemProperty("spark.executor.offHeap.enabled", "true")
SparkContext.setSystemProperty("spark.executor.offHeap.size", "200g")
sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")


In [4]:
import multiprocessing
(2*multiprocessing.cpu_count())

16

In [5]:
# configs
import os
os.environ['HOST_IP'] = '192.168.15.126'
os.environ['LOCAL_DB_USER'] = 'BRS'
os.environ['LOCAL_DB_PASSWORD'] = 'Kfintech123$'
os.environ['LOCAL_DB_PORT'] = '1433'

import os

# comment when using the configs from env file
default_ip = '13.233.100.20'
default_user  = 'SA'
default_password = 'Akhil@Akhil1'
default_port = '1433'
default_tenant_id = 'karvy'

# # initializations 
server = os.environ.get('HOST_IP', default_ip)
port = os.environ.get('LOCAL_DB_PORT', default_port)
user = os.environ.get('LOCAL_DB_USER', default_user)
password = os.environ.get('LOCAL_DB_PASSWORD', default_password)

db_config = {
   'host': server,
   'port': port,
   'user': user,
   'password':password
}


def save_metric(date, metric_name, metric_value, fund_name, group_level, table_name, database='IB_Comp_funds'):
    db = DB(database, tenant_id='',**db_config)
    try:
        query = f"INSERT INTO `karvy_metrics` VALUES ( '{date}','{metric_name}','{metric_value}', '{table_name}', '{group_level}', '{fund_name}')"
        db.execute_(query)
    except Exception as e:
        logging.error("Unable to insert metrics data")
        logging.error(e)


#### initialize

In [6]:
%%time
def initialize(date_str, table, database='funds', date_column='BatchCloseDate', tenant_id='karvy',
              transaction_status='Active', purred = 'Purred', transaction_type = 'TransactionType',
              folio = 'Folio', purchase_units = 'DB_Units', redemption_units = 'Cr_Units',scheme = 'SchemeCode',
               plan = 'PlanCode',groupby_level='SP',ter_flag='TerFlag', direct_db=None):
    """Initialization script which takes in batch_close_date and populates opening balance
    Args:
        date(str) The date upto which we need to initialize
        table(str) 
        date_column(str)
        tenant_id(str)
    
    Note: batch_close_date is (yyyy-MM-dd) (for ex. 2020-04-30), inclusive 
    """
    # fundtable column names
    purchase_units = 'DB_Units'
    redemption_units = 'Cr_Units'
    scheme = 'SchemeCode'
    plan = 'PlanCode'
    
    # configurations we use
    batch_close_date = 'batch_close_date'
    db_units = 'purchase_units'
    cr_units = 'redemption_units'
    balance_units = 'balance_units'
    day_purchase_units = 'day_pu'
    day_redemption_units = 'day_ru'
    balance_purchase_units = 'balance_pu'
    balance_redemption_units = 'balance_ru'
    calculated_date = 'calculated_date'
    
#     inflow_db_units = 'inflow_purchase_units'
#     inflow_cr_units = 'inflow_redemption_units'
#     inflow_balance_units = 'inflow_balance_units'
#     inflow_day_purchase_units = 'inflow_day_pu'
#     inflow_day_redemption_units = 'inflow_day_ru'
#     inflow_balance_purchase_units = 'inflow_balance_pu'
#     inflow_balance_redemption_units = 'inflow_balance_ru'

    
#     outflow_db_units = 'outflow_purchase_units'
#     outflow_cr_units = 'outflow_redemption_units'
#     outlfow_balance_units = 'outlfow_balance_units'
#     outlfow_day_purchase_units = 'outlfow_day_pu'
#     outlfow_day_redemption_units = 'outlfow_day_ru'
#     outlfow_balance_purchase_units = 'outlfow_balance_pu'
#     outlfow_balance_redemption_units = 'outlfow_balance_ru'

    
    
    
    
    start = time.time()

    database = direct_db or (f'{tenant_id or default_tenant_id}_{database}')
    

    # read data
    data = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()
#     data = data.filter(col(scheme) == 'TF')
    
#     data = data.cache()
    total_count = data.count()
    # debug_df(data, 10)
    
    # some preprocessings in the data, additional trimmings etc
    data = data.withColumn(transaction_status, upper(trim(col(transaction_status))))
    data = data.withColumn(purred, upper(trim(col(purred))))
    data = data.withColumn(folio, upper(trim(col(folio))))
    data = data.withColumn(scheme, upper(trim(col(scheme))))
    data = data.withColumn(plan, upper(trim(col(plan))))
    data = data.withColumn(transaction_type, upper(trim(col(transaction_type))))
    
#     data = data.filter((col(scheme).isin('IC','HC')))
    
#     print ('here')
    
    # cast the date column into dates, as we are concerned only with dates now
    data = data.withColumn(date_column, col(date_column).cast('date'))
    
    # filter the date till the batch_close_date (inclusive)
    data = data.filter(col(date_column) <= date_str)
        
    # filter the data according to rules
    data = data.filter((col(date_column).isNotNull()) )
    # data = data.filter((col(date_column) != '') ) # this will not work for few types
    data = data.filter( ~trim(col(date_column)).cast("string").eqNullSafe(''))
    
    # do must be rules
    data = data.filter( (trim(upper((col(transaction_status))))) == "Y")
    data = data.filter( (trim(upper(col(purred))) == "P") | (trim(upper(col(purred))) == "R") )
    data = data.withColumn(transaction_type, upper(trim(col(transaction_type))))
    
    # our configurations
    data = data.withColumn(batch_close_date, data[date_column])
    
#     print ('pur, redem')
    # bring in purchase and redemption units
    data = data.withColumn(db_units, when((col(purred) == "P"), col(purchase_units)).otherwise(0))
    data = data.withColumn(cr_units, when((col(purred) == "R"), col(redemption_units)).otherwise(0))
    

    
    
#     print ('1')
    # get the group by and window partitions based on partitions
    group_by_cols = []
    window_partition = []
    if groupby_level == 'SP':
        window_partition = [scheme, plan]
        group_by_cols = [scheme, plan, batch_close_date]
        # scheme_plan wise we might need to filter out some transaction types
        ignored_tr_types = ['CNI', 'CNO', 'TRMI', 
                            'TRMO', 'TRFI', 'TRFO', 'PLDO',
                            'UPLO', 'DMT', 'RMT', 'CNIR', 'CNOR', 'TRMIR', 'TRMOR',
                            'TRFIR', 'TRFOR', 'PLDOR', 'UPLOR', 'DMTR', 'RMTR']
        data = data.filter( ~(col(transaction_type).isin(ignored_tr_types)) )
    elif groupby_level == 'SPT':
        window_partition = [scheme, plan, transaction_type]
        group_by_cols  = [scheme, plan, transaction_type, batch_close_date]
    elif groupby_level == 'SPF':
        window_partition = [scheme, plan, folio]
        group_by_cols  = [scheme, plan, folio, batch_close_date]
    elif groupby_level == 'SPFB':
            window_partition = [scheme, plan, folio, broker]
            group_by_cols  = [scheme, plan, folio, broker, batch_close_date]
    elif groupby_level == 'SPFT':
        window_partition = [scheme, plan, folio, transaction_type]
        group_by_cols = [scheme, plan, folio, transaction_type, batch_close_date]
    elif groupby_level == 'SPFTTer':
        window_partition = [scheme, plan, folio, transaction_type, ter_flag]
        group_by_cols = [scheme, plan, folio, transaction_type, ter_flag, batch_close_date]
    elif groupby_level == 'SPFTB':
        window_partition = [scheme, plan, folio, transaction_type, broker]
        group_by_cols = [scheme, plan, folio, transaction_type, broker, batch_close_date]
        
        # populate broker code
        
        
        
    
    # roll up the data
    rolledup_data = data.groupBy(group_by_cols)
    rolledup_data = rolledup_data.agg({db_units:'sum', cr_units:'sum'})
        
    rolledup_data = rolledup_data.withColumnRenamed(f"sum({db_units})", day_purchase_units).withColumnRenamed(f"sum({cr_units})", day_redemption_units)
    rolledup_data = rolledup_data.withColumn(balance_purchase_units, sum(col(day_purchase_units)).over(Window.partitionBy(window_partition).orderBy(batch_close_date).rowsBetween(-sys.maxsize, 0)))
    rolledup_data = rolledup_data.withColumn(balance_redemption_units, sum(col(day_redemption_units)).over(Window.partitionBy(window_partition).orderBy(batch_close_date).rowsBetween(-sys.maxsize, 0)))
    rolledup_data = rolledup_data.withColumn(balance_units, (col(balance_purchase_units) - col(balance_redemption_units)))
    
#     print ('rolling up done')
    # get the latest data
    latest_data = rolledup_data.filter(col(batch_close_date) <= date_str)
    w = Window.partitionBy(window_partition).orderBy(col(batch_close_date).desc())
    latest_data = latest_data.withColumn("rrn", row_number().over(w)).where(col("rrn") == 1).drop("rrn")
    
    # maintained the calculated date (the latest data upto the calculated date)
    latest_data = latest_data.withColumn(calculated_date, lit(date_str).cast('date'))
    
    
    
    
    # store in parquet file for optimization of space and only one file and multi partitions
    # but write now store in csv and maintain date wise and colesce one
    #latest_data = latest_data.cache()
    
    
#     latest_data.show()
    # latest_data.coalesce(1).write.csv(f"{table}_latest/data_{groupby_level}_{date_str}.csv",header=True, mode='overwrite')
    latest_data.write.parquet(f"{table}_latest/data_{groupby_level}_{date_str}.parquet", mode='overwrite')
    
    print (f'inital file on date {date_str} written')
    return total_count


CPU times: user 12 µs, sys: 0 ns, total: 12 µs
Wall time: 19.1 µs


In [76]:
%%time
def initialize(date_str, table, database='funds', date_column='BatchCloseDate', tenant_id='karvy',
              transaction_status='Active', purred = 'Purred', transaction_type = 'TransactionType',
              folio = 'Folio', purchase_units = 'DB_Units', redemption_units = 'Cr_Units',scheme = 'SchemeCode',
               plan = 'PlanCode',groupby_level='SP',ter_flag='TerFlag', direct_db=None,
              broker_column = 'BrokerARN',transaction_no = 'TransactionNo',purchase_transaction_no = 'PurchaseTransactionNo',
              trans_table='Trans_116'):
    """Initialization script which takes in batch_close_date and populates opening balance
    Args:
        date(str) The date upto which we need to initialize
        table(str) 
        date_column(str)
        tenant_id(str)
    
    Note: batch_close_date is (yyyy-MM-dd) (for ex. 2020-04-30), inclusive 
    """
    # fundtable column names
    purchase_units = 'DB_Units'
    redemption_units = 'Cr_Units'
    scheme = 'SchemeCode'
    plan = 'PlanCode'
    
    # configurations we use
    batch_close_date = 'batch_close_date'
    db_units = 'purchase_units'
    cr_units = 'redemption_units'
    balance_units = 'balance_units'
    day_purchase_units = 'day_pu'
    day_redemption_units = 'day_ru'
    balance_purchase_units = 'balance_pu'
    balance_redemption_units = 'balance_ru'
    calculated_date = 'calculated_date'
    
#     inflow_db_units = 'inflow_purchase_units'
#     inflow_cr_units = 'inflow_redemption_units'
#     inflow_balance_units = 'inflow_balance_units'
#     inflow_day_purchase_units = 'inflow_day_pu'
#     inflow_day_redemption_units = 'inflow_day_ru'
#     inflow_balance_purchase_units = 'inflow_balance_pu'
#     inflow_balance_redemption_units = 'inflow_balance_ru'

    
#     outflow_db_units = 'outflow_purchase_units'
#     outflow_cr_units = 'outflow_redemption_units'
#     outlfow_balance_units = 'outlfow_balance_units'
#     outlfow_day_purchase_units = 'outlfow_day_pu'
#     outlfow_day_redemption_units = 'outlfow_day_ru'
#     outlfow_balance_purchase_units = 'outlfow_balance_pu'
#     outlfow_balance_redemption_units = 'outlfow_balance_ru'

    
    
    
    
    start = time.time()

    database = direct_db or (f'{tenant_id or default_tenant_id}_{database}')
    

    # read data
    data = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()
#     data = data.filter(col(scheme) == 'TF')
    
#     data = data.cache()
    total_count = data.count()
    # debug_df(data, 10)
    
    # some preprocessings in the data, additional trimmings etc
    data = data.withColumn(transaction_status, upper(trim(col(transaction_status))))
    data = data.withColumn(purred, upper(trim(col(purred))))
    data = data.withColumn(folio, upper(trim(col(folio))))
    data = data.withColumn(scheme, upper(trim(col(scheme))))
    data = data.withColumn(plan, upper(trim(col(plan))))
    data = data.withColumn(broker_column, upper(trim(col(broker_column))))
    data = data.withColumn(transaction_type, upper(trim(col(transaction_type))))
    
#     data = data.filter((col(scheme).isin('IC','HC')))
    
#     print ('here')
    
    # cast the date column into dates, as we are concerned only with dates now
    data = data.withColumn(date_column, col(date_column).cast('date'))
    
    # filter the date till the batch_close_date (inclusive)
    data = data.filter(col(date_column) <= date_str)
        
    # filter the data according to rules
    data = data.filter((col(date_column).isNotNull()) )
    # data = data.filter((col(date_column) != '') ) # this will not work for few types
    data = data.filter( ~trim(col(date_column)).cast("string").eqNullSafe(''))
    
    # do must be rules
    data = data.filter( (trim(upper((col(transaction_status))))) == "Y")
    data = data.filter( (trim(upper(col(purred))) == "P") | (trim(upper(col(purred))) == "R") )
    data = data.withColumn(transaction_type, upper(trim(col(transaction_type))))
    
    # our configurations
    data = data.withColumn(batch_close_date, data[date_column])

    if 'B' in groupby_level:
    #     print (data.count())
        broker = 'broker_code'
    #     data.groupby(folio, scheme, plan).agg(count(broker_column), countDistinct(broker_column)).show()
        data = data.fillna({broker_column:'EMPTY_BROKER'})
        data =data.cache()
        # folios having single broker code will remain same
        multi_broker_folio = data.groupby(folio, scheme, plan).agg(countDistinct(broker_column)).filter(col(f'count({broker_column})') > 1)
#         print (multi_broker_folio.count())
# #         print (data.count())
        single_broker_folio_data = data.join(multi_broker_folio.select(folio, scheme, plan), on=[folio, scheme, plan], how='left_anti')
        single_broker_data = single_broker_folio_data.withColumn(broker, col(broker_column))
#         print (single_broker_folio_data.count())
        multi_broker_folio_data = data.join(multi_broker_folio.select(folio, scheme, plan), on=[folio, scheme, plan], how='left_semi')
#         print (multi_broker_folio_data.count())
        # bring in correct broker codes
        # for purchase p they will be same
        data_p = multi_broker_folio_data.filter(col(purred) == 'P')
        data_p = data_p.withColumn(broker, col(broker_column))
#         print (data_p.count())
        # for redemptions 
        data_r = multi_broker_folio_data.filter(col(purred) == 'R')
        print (data_r.count())
        # bring in the redemptions 
        query = f"SELECT * from {trans_table}"
        trans_data  = spark.read.format("jdbc") \
                .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") \
                .option("query", query) \
                .option("user", user) \
                .option("password", password) \
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                .load()

        # get the redemptions
        trans_data = trans_data.join(data_r, on=[scheme, plan, folio, transaction_no], how='left_semi')
        print ('total trans', trans_data.count())

        grouped = data_p.groupby([transaction_no, broker_column]).count()
        trans_data = trans_data.drop(date_column)

        data_r = data_r.drop(purchase_units)
        trans_data = trans_data.select(scheme, plan, folio, transaction_no, purchase_transaction_no, purchase_units).join(data_r,
                                     on=[scheme, plan, folio, transaction_no]).drop(transaction_no)

        trans_data = trans_data.join(data_p.select(transaction_no, broker), trans_data[purchase_transaction_no] == data_p[transaction_no], how='left').drop(purchase_transaction_no)

        data = single_broker_data.union(data_p.select(single_broker_data.columns)).union(trans_data.select(single_broker_data.columns))
#         print ('final data', data.count())
    
    
#     print ('pur, redem')
    # bring in purchase and redemption units
    data = data.withColumn(db_units, when((col(purred) == "P"), col(purchase_units)).otherwise(0))
    data = data.withColumn(cr_units, when((col(purred) == "R"), col(redemption_units)).otherwise(0))
#     data = data.withColumn(db_units, populate_db(col(purred), col(purchase_units)))
#     data = data.withColumn(cr_units, populate_cr(col(purred), col(redemption_units)))

    
    
#     print ('1')
    # get the group by and window partitions based on partitions
    group_by_cols = []
    window_partition = []
    if groupby_level == 'SP':
        window_partition = [scheme, plan]
        group_by_cols = [scheme, plan, batch_close_date]
        # scheme_plan wise we might need to filter out some transaction types
        ignored_tr_types = ['CNI', 'CNO', 'TRMI', 
                            'TRMO', 'TRFI', 'TRFO', 'PLDO',
                            'UPLO', 'DMT', 'RMT', 'CNIR', 'CNOR', 'TRMIR', 'TRMOR',
                            'TRFIR', 'TRFOR', 'PLDOR', 'UPLOR', 'DMTR', 'RMTR']
        data = data.filter( ~(col(transaction_type).isin(ignored_tr_types)) )
    elif groupby_level == 'SPT':
        window_partition = [scheme, plan, transaction_type]
        group_by_cols  = [scheme, plan, transaction_type, batch_close_date]
    elif groupby_level == 'SPF':
        window_partition = [scheme, plan, folio]
        group_by_cols  = [scheme, plan, folio, batch_close_date]
    elif groupby_level == 'SPFB':
            window_partition = [scheme, plan, folio, broker]
            group_by_cols  = [scheme, plan, folio, broker, batch_close_date]
    elif groupby_level == 'SPFT':
        window_partition = [scheme, plan, folio, transaction_type]
        group_by_cols = [scheme, plan, folio, transaction_type, batch_close_date]
    elif groupby_level == 'SPFTTer':
        data = data.fillna({ter_flag:'EMPTY_TER'})
        window_partition = [scheme, plan, folio, transaction_type, ter_flag]
        group_by_cols = [scheme, plan, folio, transaction_type, ter_flag, batch_close_date]
    elif groupby_level == 'SPFTBTer':
        data = data.fillna({ter_flag:'EMPTY_TER'})
        window_partition = [scheme, plan, folio, transaction_type, ter_flag, broker]
        group_by_cols = [scheme, plan, folio, transaction_type, ter_flag, broker, batch_close_date]
    elif groupby_level == 'SPFTB':
        window_partition = [scheme, plan, folio, transaction_type, broker]
        group_by_cols = [scheme, plan, folio, transaction_type, broker, batch_close_date]
        
        # populate broker code
        
        
        
    
    # roll up the data
    rolledup_data = data.groupBy(group_by_cols)
    rolledup_data = rolledup_data.agg({db_units:'sum', cr_units:'sum'})
        
    rolledup_data = rolledup_data.withColumnRenamed(f"sum({db_units})", day_purchase_units).withColumnRenamed(f"sum({cr_units})", day_redemption_units)
    rolledup_data = rolledup_data.withColumn(balance_purchase_units, sum(col(day_purchase_units)).over(Window.partitionBy(window_partition).orderBy(batch_close_date).rowsBetween(-sys.maxsize, 0)))
    rolledup_data = rolledup_data.withColumn(balance_redemption_units, sum(col(day_redemption_units)).over(Window.partitionBy(window_partition).orderBy(batch_close_date).rowsBetween(-sys.maxsize, 0)))
    rolledup_data = rolledup_data.withColumn(balance_units, (col(balance_purchase_units) - col(balance_redemption_units)))
    
#     print ('rolling up done')
    # get the latest data
    latest_data = rolledup_data.filter(col(batch_close_date) <= date_str)
    w = Window.partitionBy(window_partition).orderBy(col(batch_close_date).desc())
    latest_data = latest_data.withColumn("rrn", row_number().over(w)).where(col("rrn") == 1).drop("rrn")
    
    # maintained the calculated date (the latest data upto the calculated date)
    latest_data = latest_data.withColumn(calculated_date, lit(date_str).cast('date'))
    
    
    
    
    # store in parquet file for optimization of space and only one file and multi partitions
    # but write now store in csv and maintain date wise and colesce one
    # latest_data = latest_data.cache()
    
    
#     latest_data.show()
    # latest_data.coalesce(1).write.csv(f"{table}_latest/data_{groupby_level}_{date_str}.csv",header=True, mode='overwrite')
    latest_data.write.parquet(f"{table}_latest/data_{groupby_level}_{date_str}.parquet", mode='overwrite')
    
    print (f'inital file on date {date_str} written')
    return total_count

code='116'
name='AXA'
print (code, name)
start = time.time()
table = f'm_Trans_{code}'
trans_table = f'Trans_{code}'
groupby_level='SPFTBTer'
init_date = '2020-08-31'
start_date = '2020-04-01'
end_date = '2020-09-02'
#         mcr_month_date = '2020-05-01'

records = initialize(init_date, table=table,direct_db='BankRecon', groupby_level=groupby_level, trans_table=trans_table)

116 AXA
43784
total trans 170998
inital file on date 2020-08-31 written
CPU times: user 78.2 ms, sys: 27.3 ms, total: 106 ms
Wall time: 2min 17s


#### dialyjob

In [72]:
%%time
import time
def dialy_job(date_str, table='trans116', database='funds', date_column='BatchCloseDate', tenant_id='karvy',
              transaction_status='Active', purred = 'Purred', transaction_type = 'TransactionType',
              folio = 'Folio', purchase_units = 'DB_Units', redemption_units = 'Cr_Units',scheme = 'SchemeCode',
               plan = 'PlanCode',groupby_level='SP', direct_db=None, fn_fromdt = 'fn_fromdt',fn_fromdt_format = 'dd/MM/yyyy',
              fn_scheme = 'fn_scheme',fn_plan = 'fn_plan', fn_nav = 'fn_nav', nav_table='nav_master', 
              scheme_table='scheme_master',scheme_code = 'scheme_code', 
              plan_code = 'plan_code', category = 'SebiSchemeCategory',
              subcategory = 'SebiSchemeSubCategory',nature = 'nature', newmcrid='NewMCRId', ter_flag='TerFlag',
              broker_column = 'BrokerARN',transaction_no = 'TransactionNo',purchase_transaction_no = 'PurchaseTransactionNo',
              trans_table='Trans_116'
             ):
    """Dialy run this and store the latest data and aum data too"""
    
    # inflow outflow
    inflow_db_trtypes = ['NEW', 'ADD', 'IPO', 'SIN', 'LTIN', 'LTIA', 'STPN', 'STPA', 'STPI','DIR', 'DSPI', 'SWIN','SWIA']
    inflow_cr_trtypes = ['NEWR', 'ADDR', 'IPOR', 'SINR', 'LTINR', 'LTIAR', 'STPNR', 
                         'STPAR', 'STPIR','DIRR', 'DSPIR', 'SWINR','SWIAR']
    
    outflow_db_trtypes = ['FULR', 'REDR', 'LTOFR', 'LTOPR', 'STPOR', 'SWDR', 'TRGR', 'SWOPR', 'SWOFR']
    outflow_cr_trtypes = ['FUL', 'RED', 'SWD','TRG','LTOF', 'LTOP','STPO', 'SWOP', 'SWOF']
    
    
    
    # configurations we use
    batch_close_date = 'batch_close_date'
    db_units = 'purchase_units'
    cr_units = 'redemption_units'
    balance_units = 'balance_units'
    day_purchase_units = 'day_pu'
    day_redemption_units = 'day_ru'
    balance_purchase_units = 'balance_pu'
    balance_redemption_units = 'balance_ru'
    calculated_date = 'calculated_date'
    today_pu = 'today_pu'
    today_ru = 'today_ru'
    effective_nav = 'effective_nav'
    aum = 'aum'
    aaum = 'aaum'
    inflow = 'inflow'
    outflow = 'outflow'
    inflow_db_units = 'inflow_purchase_units'
    inflow_cr_units = 'inflow_redemption_units'
    outflow_db_units = 'outflow_purchase_units'
    outflow_cr_units = 'outflow_redemption_units'
    inflow_units = 'inflow_units'
    outflow_units = 'outflow_units'
    
    # get the latest data from the previously stored file
    date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d')
    day_num = date_obj.day
    previous_day = date_obj - datetime.timedelta(1)
    previous_day_str = previous_day.strftime('%Y-%m-%d')
    latest_data = spark.read.parquet(f"{table}_latest/data_{groupby_level}_{previous_day_str}.parquet")

    # debug_df(latest_data)
    
    # get  the todays data
    database = direct_db or (f'{tenant_id or default_tenant_id}_{database}')
    query = f"SELECT * from {table} where CAST({date_column} AS DATE)='{date_str}'"
    data  = spark.read.format("jdbc") \
            .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") \
            .option("query", query) \
            .option("user", user) \
            .option("password", password) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .load()
#     data = data.filter(col(scheme) == 'TF')
    
    data = data.cache()
    
#     data = data.filter((col(scheme)=='IC'))
    # latest_data = latest_data.filter((col(scheme) == 'OV') & (col(plan) == 'RG'))
    # data = data.filter((col(scheme) == 'OV') & (col(plan) == 'RG'))
    # print (data.count())
    # debug_df(data)
    
    # calculate all the steps as in initialization
    # some preprocessings in the data, additional trimmings etc
    data = data.withColumn(transaction_status, upper(trim(col(transaction_status))))
    data = data.withColumn(purred, upper(trim(col(purred))))
    data = data.withColumn(folio, upper(trim(col(folio))))
    data = data.withColumn(scheme, upper(trim(col(scheme))))
    data = data.withColumn(plan, upper(trim(col(plan))))
    data = data.withColumn(transaction_type, upper(trim(col(transaction_type))))
    
    
    day_count = data.count()
#     data = data.filter((col(scheme).isin('IC','HC')))
    

    # cast the date column into dates, as we are concerned only with dates now
    data = data.withColumn(date_column, col(date_column).cast('date'))
    
   
    # filter the date till the batch_close_date (inclusive)
    # data = data.filter(col(date_column) <= date_str)
    
        
    # filter the data according to rules
    data = data.filter((col(date_column).isNotNull()) )
    # data = data.filter((col(date_column) != '') ) # this will not work for few types
    data = data.filter( ~trim(col(date_column)).cast("string").eqNullSafe(''))
    
    # do must be rules
    data = data.filter( (trim(upper((col(transaction_status))))) == "Y")
    data = data.filter( (trim(upper(col(purred))) == "P") | (trim(upper(col(purred))) == "R") )
    data = data.withColumn(transaction_type, upper(trim(col(transaction_type))))
    
    # our configurations
    data = data.withColumn(batch_close_date, data[date_column])
    

    if 'B' in groupby_level:
    #     print (data.count())
        broker = 'broker_code'
    #     data.groupby(folio, scheme, plan).agg(count(broker_column), countDistinct(broker_column)).show()
        data = data.fillna({broker_column:'EMPTY_BROKER'})

        # folios having single broker code will remain same
        multi_broker_folio = data.groupby(folio, scheme, plan).agg(countDistinct(broker_column)).filter(col(f'count({broker_column})') > 1)

        single_broker_folio_data = data.join(multi_broker_folio.select(folio, scheme, plan), on=[folio, scheme, plan], how='left_anti')
        single_broker_data = single_broker_folio_data.withColumn(broker, col(broker_column))

        multi_broker_folio_data = data.join(multi_broker_folio.select(folio, scheme, plan), on=[folio, scheme, plan], how='left_semi')
        # bring in correct broker codes
        # for purchase p they will be same
        data_p = multi_broker_folio_data.filter(col(purred) == 'P')
        data_p = data_p.withColumn(broker, col(broker_column))

        # for redemptions 
        data_r = multi_broker_folio_data.filter(col(purred) == 'R')
        # bring in the redemptions 
        query = f"SELECT * from {trans_table}"
        trans_data  = spark.read.format("jdbc") \
                .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") \
                .option("query", query) \
                .option("user", user) \
                .option("password", password) \
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                .load()

        # get the redemptions
        trans_data = trans_data.join(data_r, on=[scheme, plan, folio, transaction_no], how='left_semi')

        grouped = data_p.groupby([transaction_no, broker_column]).count()
        trans_data = trans_data.drop(date_column)

        data_r = data_r.drop(redemption_units)
        trans_data = trans_data.select(scheme, plan, folio, transaction_no, purchase_transaction_no, redemption_units).join(data_r,
                                     on=[scheme, plan, folio, transaction_no]).drop(transaction_no)

        trans_data = trans_data.join(data_p.select(transaction_no, broker), trans_data[purchase_transaction_no] == data_p[transaction_no], how='left').drop(purchase_transaction_no)

        data = single_broker_data.union(data_p.select(single_broker_data.columns)).union(trans_data.select(single_broker_data.columns))
    
    
    
#     print ('pur, redem')
    # bring in purchase and redemption units
    data = data.withColumn(db_units, when((col(purred) == "P"), col(purchase_units)).otherwise(0))
    data = data.withColumn(cr_units, when((col(purred) == "R"), col(redemption_units)).otherwise(0))
#     data = data.withColumn(db_units, populate_db(col(purred), col(purchase_units)))
#     data = data.withColumn(cr_units, populate_cr(col(purred), col(redemption_units)))

    
    
#     print ('1')
    # get the group by and window partitions based on partitions
    group_by_cols = []
    window_partition = []
    if groupby_level == 'SP':
        window_partition = [scheme, plan]
        group_by_cols = [scheme, plan, batch_close_date]
        # scheme_plan wise we might need to filter out some transaction types
        ignored_tr_types = ['CNI', 'CNO', 'TRMI', 
                            'TRMO', 'TRFI', 'TRFO', 'PLDO',
                            'UPLO', 'DMT', 'RMT', 'CNIR', 'CNOR', 'TRMIR', 'TRMOR',
                            'TRFIR', 'TRFOR', 'PLDOR', 'UPLOR', 'DMTR', 'RMTR']
        data = data.filter( ~(col(transaction_type).isin(ignored_tr_types)) )
    elif groupby_level == 'SPT':
        window_partition = [scheme, plan, transaction_type]
        group_by_cols  = [scheme, plan, transaction_type, batch_close_date]
    elif groupby_level == 'SPF':
        window_partition = [scheme, plan, folio]
        group_by_cols  = [scheme, plan, folio, batch_close_date]
    elif groupby_level == 'SPFB':
            window_partition = [scheme, plan, folio, broker]
            group_by_cols  = [scheme, plan, folio, broker, batch_close_date]
    elif groupby_level == 'SPFT':
        window_partition = [scheme, plan, folio, transaction_type]
        group_by_cols = [scheme, plan, folio, transaction_type, batch_close_date]
    elif groupby_level == 'SPFTTer':
        data = data.fillna({ter_flag:'EMPTY_TER'})
        window_partition = [scheme, plan, folio, transaction_type, ter_flag]
        group_by_cols = [scheme, plan, folio, transaction_type, ter_flag, batch_close_date]
    elif groupby_level == 'SPFTBTer':
        window_partition = [scheme, plan, folio, transaction_type, ter_flag, broker]
        group_by_cols = [scheme, plan, folio, transaction_type, ter_flag, broker, batch_close_date]
    elif groupby_level == 'SPFTB':
        window_partition = [scheme, plan, folio, transaction_type, broker]
        group_by_cols = [scheme, plan, folio, transaction_type, broker, batch_close_date]
   
    
    
    # roll up the data
    rolledup_data = data.groupBy(group_by_cols)
    rolledup_data = rolledup_data.agg({db_units:'sum', cr_units:'sum'})
        
    rolledup_data = rolledup_data.withColumnRenamed(f"sum({db_units})", day_purchase_units).withColumnRenamed(f"sum({cr_units})", day_redemption_units)

    # inflow outflow units
    
    
    latest_data = latest_data.drop(day_purchase_units, day_redemption_units, batch_close_date, balance_units)
    latest_data = latest_data.withColumnRenamed(balance_purchase_units, day_purchase_units)
    latest_data = latest_data.withColumnRenamed(balance_redemption_units, day_redemption_units)
    latest_data = latest_data.withColumnRenamed(calculated_date, batch_close_date)
    
    # debug_df(rolledup_data)
    # debug_df(latest_data)
    combined_data = latest_data.union(rolledup_data.select(latest_data.columns))
    combined_data = combined_data.cache()
    combined_count = combined_data.count()
    # debug_df(rolledup_data)
    # debug_df(latest_data)
    # debug_df(combined_data)
    combined_data = combined_data.withColumn(balance_purchase_units, sum(col(day_purchase_units)).over(Window.partitionBy(window_partition).orderBy(batch_close_date).rowsBetween(-sys.maxsize, 0)))
    combined_data = combined_data.withColumn(balance_redemption_units, sum(col(day_redemption_units)).over(Window.partitionBy(window_partition).orderBy(batch_close_date).rowsBetween(-sys.maxsize, 0)))
    combined_data = combined_data.withColumn(balance_units, (col(balance_purchase_units) - col(balance_redemption_units)))
    # debug_df(combined_data)
    
    
    # store the latest day data again
    # get the latest data
    combined_data = combined_data.filter(col(batch_close_date) <= date_str)
    w = Window.partitionBy(window_partition).orderBy(col(batch_close_date).desc())
    combined_data = combined_data.withColumn("rrn", row_number().over(w)).where(col("rrn") == 1).drop("rrn")
    
    
    # maintained the calculated date (the latest data upto the calculated date)
    combined_data = combined_data.withColumn(calculated_date, lit(date_str).cast('date'))
    
    # debug_df(combined_data)
    # store in parquet file for optimization of space and only one file and multi partitions
    # but write now store in csv and maintain date wise and colesce one
    combined_data = combined_data.cache()
    # combined_data.coalesce(1).write.csv(f"{table}_latest/data_{groupby_level}_{date_str}.csv",header=True, mode='overwrite')
    combined_data.write.parquet(f"{table}_latest/data_{groupby_level}_{date_str}.parquet", mode='overwrite')
    
    
    # join the nav, scheme_master data
    nav_data = read_df(nav_table, '*', database)
    nav_data = nav_data.withColumnRenamed(fn_scheme, scheme)
    nav_data = nav_data.withColumnRenamed(fn_plan, plan)
    nav_data = nav_data.withColumn(fn_fromdt, col(fn_fromdt).cast('date'))

    scheme_master = read_df(scheme_table, [scheme_code, plan_code, nature, category, subcategory, newmcrid], database)
    scheme_master = scheme_master.withColumnRenamed(scheme_code, scheme)
    scheme_master = scheme_master.withColumnRenamed(plan_code, plan)

    scheme_master_ = scheme_master.dropDuplicates([scheme, plan])
    nav_scheme = nav_data.join(scheme_master_, on=[scheme, plan], how='left')
    nav_data = nav_scheme
    
    # debug_df(nav_data)
    
    # calculate the aum 
    combined_data = combined_data.withColumn(effective_nav, date_sub(col(calculated_date), 1))
    combined_data = combined_data.withColumn(today_pu, when((col(calculated_date) == col(batch_close_date)), col(day_purchase_units)).otherwise(0))
    combined_data = combined_data.withColumn(today_ru, when((col(calculated_date) == col(batch_close_date)), col(day_redemption_units)).otherwise(0))
    
    # inflow outflow addition
    inflow_db_condition = col(transaction_type).isin(inflow_db_trtypes)
    inflow_cr_condition = col(transaction_type).isin(inflow_cr_trtypes)
    combined_data = combined_data.withColumn(inflow_db_units, when(inflow_db_condition, col(today_pu)).otherwise(0))
    combined_data = combined_data.withColumn(inflow_cr_units, when(inflow_cr_condition, col(today_ru)).otherwise(0))
    
    outflow_db_condition = col(transaction_type).isin(outflow_db_trtypes)
    outflow_cr_condition = col(transaction_type).isin(outflow_cr_trtypes)
    combined_data = combined_data.withColumn(outflow_db_units, when(outflow_db_condition, col(today_pu)).otherwise(0))
    combined_data = combined_data.withColumn(outflow_cr_units, when(outflow_cr_condition, col(today_ru)).otherwise(0))
    
    combined_data = combined_data.withColumn(inflow_units, col(inflow_db_units) - col(inflow_cr_units))
    combined_data = combined_data.withColumn(outflow_units, col(outflow_cr_units) - col(outflow_db_units))
    
    # debug_df(combined_data)
    
    nav_filteredFT = nav_data.filter(col(fn_fromdt) < date_str)
    navw = Window.partitionBy([scheme, plan]).orderBy(col(fn_fromdt).desc())
    nav_populate = nav_filteredFT.withColumn("rrn", row_number().over(navw)).where(col("rrn") == 1).drop("rrn")
    nav_populate = nav_populate.withColumn(calculated_date, lit(date_str))
    nav_populate = nav_populate.select([scheme, plan, fn_fromdt, fn_nav, calculated_date, category, subcategory, nature, newmcrid])
    
#     debug_df(nav_populate)
    
    joined = combined_data.join(nav_populate, on=[scheme, plan, calculated_date], how='left')
    joined = joined.withColumn(aum, col(fn_nav) * col(balance_units))
    joined = joined.withColumn(inflow, col(fn_nav) * col(inflow_units))
    joined = joined.withColumn(outflow, col(fn_nav) * col(outflow_units))
    
#     aum_dummy = f'{aum}_d'
#     final_joined = joined.withColumn(aum_dummy, col(aum))
#     final_joined = final_joined.fillna({aum_dummy: 0})
    
#     # moving average logic
#     print (day_num)
#     if day_num == 1:
#         final_joined = final_joined.withColumn(f'pre_{aum_dummy}', lit(0))
#         final_joined = final_joined.withColumn(aaum, col(aum_dummy))
#     elif day_num == 2:
#         final_joined = final_joined.withColumn(f'pre_{aum_dummy}', lit(0))
#         final_joined = final_joined.withColumn(aaum, col(aum_dummy))
#     else:
#         previous_day_aum = spark.read.parquet(f"{table}_dialy/aaum_data_{groupby_level}_{previous_day_str}.parquet")
#         previous_day_aum.withColumnRenamed(aum_dummy, f'pre_{aum_dummy}')
#         final_joined = final_joined.join(previous_day_aum.select(window_partition + [f'pre_{aum_dummy}']), on=window_partition, how='left')
#         final_joined = final_joined.fillna({f'pre_{aum_dummy}': 0})
#         final_joined = final_joined.withColumn(aaum, (col(aum_dummy) + col(f'pre_{aum_dummy}') / day_num))
        
    
#     debug_df(joined)
    #joined = joined.cache()
    # store the data in the files
#     joined.coalesce(1).write.csv(f"{table}_dialy/data_{groupby_level}_{date_str}.csv",header=True, mode='overwrite')
    joined.write.parquet(f"{table}_dialy/data_{groupby_level}_{date_str}.parquet", mode='overwrite')
    
#     final_joined = final_joined.cache()
#     final_joined.coalesce(1).write.csv(f"{table}_dialy/aaum_data_{groupby_level}_{date_str}.csv",header=True, mode='overwrite')
#     final_joined.coalesce(1).write.parquet(f"{table}_dialy/aaum_data_{groupby_level}_{date_str}.parquet", mode='overwrite')
    
    
    # upload the data if needed
    # Done
    print (f'dialy file on date {date_str} generated')
    return day_count, combined_count

# dialy_job('2020-05-01', groupby_level='SPFT', table='Trans_116', direct_db='BankRecon', nav_table='fund_navreg_AXA_29072020',
#         scheme_table='Fund_Master_AXA_29072020', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
#          category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory'
#         )
# dialy_job('2020-06-01', groupby_level='SP', table='Trans_128', direct_db='BankRecon', nav_table='fund_navreg_axismf',
#         scheme_table='fund_master_axismf', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
#          category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory'
#         )
# dialy_job('2020-06-01', groupby_level='SP', table='Trans_120', direct_db='BankRecon', nav_table='fund_navreg_invesco',
#         scheme_table='fund_master_INVESCO', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
#          category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory'
#         )
# dialy_job('2020-06-01', groupby_level='SP', table='m_Trans_116', direct_db='BankRecon', nav_table='fund_navreg_AXA_29072020',
#         scheme_table='Fund_Master_AXA_29072020', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
#          category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
#         )

# dialy_job('2020-06-01', groupby_level='SP', table='m_Trans_117', direct_db='BankRecon', nav_table='fund_navreg_MIRAE',
#         scheme_table='fund_master_MIRAE', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
#          category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
#         )
# print ('done')
# import time
# for i,ele in enumerate(list(daterange('2020-05-01', '2020-06-02'))):
#     s = time.time()
#     dialy_job(ele, groupby_level='SPFT', table='m_Trans_117', direct_db='BankRecon', nav_table='fund_navreg_MIRAE',
#         scheme_table='fund_master_MIRAE', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
#          category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
#         )
#     generate_mcr_report(table=f'm_Trans_{code}', groupby_level='SP', start_date = '2020-05-02', end_date = '2020-06-02')

#     print (i, ele, time.time() - s)

# day_records, combined_records = dialy_job('2020-04-02', groupby_level=groupby_level, table=table, direct_db='BankRecon', nav_table=f'fund_navreg_{name}',
#             scheme_table=f'fund_master_{name}', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
#              category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
#             )

CPU times: user 22 µs, sys: 0 ns, total: 22 µs
Wall time: 29.6 µs


#### run script

In [73]:
#
table_codes = {117: 'MIRAE'}

table_codes = {"116": "AXA","117": "MIRAE","107": "BOB","120": "INVESCO","RMF": "Reliance",
"118": "Edelweiss","135": "IDBIMF","125": "IBMF","128": "AXISMF","178": "BNPMF","152": "ITI",
"105": "JMMF","103": "PMF","166": "Quant","130": "PeerlessMF","104": "TAURUS","108": "UTI",
"123": "Quantum","127": "MOTILAL","102": "LIC","176": "SundaramMF","101": "canrobeco","129": "DLFPramerica"}

table_codes = {"117": "MIRAE","120": "INVESCO","RMF": "Reliance",
"118": "Edelweiss","135": "IDBIMF","125": "IBMF","128": "AXISMF","178": "BNPMF","152": "ITI",
"105": "JMMF","103": "PMF","166": "Quant","130": "PeerlessMF","104": "TAURUS","108": "UTI",
"123": "Quantum","127": "MOTILAL","102": "LIC","176": "SundaramMF","101": "canrobeco","129": "DLFPramerica"}

table_codess ={130:"peerless", 120:"INVESCO", }

table_codes = {125: 'IBMF', 152: "ITI", 123: "Quantum",}

table_codes = {  107: "BOB",135: "IDBIMF", 178: "BNPMF", 103: "PMF"}

table_codes = { 103:"peerless", 118:"edelwwise"}
table_codes = {"129": "DLFPramerica","120": "INVESCO"}
table_codes = {"RMF": "Reliance" }
table_codes = {"116": "AXA"}
# table_codes = {"135": 'IDBIMF'}
# table_codes = {118:"edelwwise"}
# exception taurus
for code,name in (table_codes.items()):
    try:
        
        try:
            spark.catalog.clearCache()
        except:
            pass
#         try:
#             sc.stop()
#         except:

#             print ("error no sc")
#         # intialize spark again
#         conf = SparkConf()
#         conf.setMaster("local[*]").setAppName("My app").set("spark.sql.shuffle.partitions", 16)

#         #Create spark context and sparksession
#         sc = SparkContext.getOrCreate(conf=conf)
#         SparkContext.setSystemProperty("spark.driver.memory", "60g")
#         SparkContext.setSystemProperty("spark.executor.memory", "60g")
#         SparkContext.setSystemProperty("spark.executor.offHeap.enabled", "true")
#         SparkContext.setSystemProperty("spark.executor.offHeap.size", "100g")
#         spark = SparkSession(sc)




        print (code, name)
        start = time.time()
        table = f'm_Trans_{code}'
        trans_table = f'Trans_{code}'
        groupby_level='SPFTBTer'
        init_date = '2020-08-31'
        start_date = '2020-09-01'
        end_date = '2020-09-02'
#         mcr_month_date = '2020-05-01'

        records = initialize(init_date, table=table,direct_db='BankRecon', groupby_level=groupby_level, trans_table=trans_table)
#         save_metric(init_date, 'records_processed', records, name, groupby_level, table)

        print (f'initialization time {time.time() - start}')
#         save_metric(init_date, 'intialization_time', time.time() - start, name, groupby_level, table)
        job_start = time.time()


        for i,ele in enumerate(list(daterange(start_date, end_date))):
#             try:
#                 spark.catalog.clearCache()
#             except:
#                 pass
#             try:
#                 sc.stop()
#             except:

#                 print ("error no sc")
#             # intialize spark again
#             conf = SparkConf()
#             conf.setMaster("local[*]").setAppName("My app").set("spark.sql.shuffle.partitions", 16)

#             #Create spark context and sparksession
#             sc = SparkContext.getOrCreate(conf=conf)
#             SparkContext.setSystemProperty("spark.driver.memory", "120g")
#             SparkContext.setSystemProperty("spark.executor.memory", "120g")
#             SparkContext.setSystemProperty("spark.executor.offHeap.enabled", "true")
#             SparkContext.setSystemProperty("spark.executor.offHeap.size", "100g")
#             spark = SparkSession(sc)
            
            s = time.time()
            day_records, combined_records = dialy_job(ele, groupby_level=groupby_level, table=table, direct_db='BankRecon', nav_table=f'fund_navreg_{name}',
            scheme_table=f'fund_master_{name}_{code}', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
             category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId',
                                                      trans_table=trans_table
            )
#             save_metric(ele, 'day_records', day_records, name, groupby_level, table)
#             save_metric(ele, 'combined_records', combined_records, name, groupby_level, table)
#             save_metric(ele, 'dialy_job_time', time.time() - s, name, groupby_level, table)
            print ("    ",i, ele, time.time() - s)

        s = time.time() 
#         generate_mcr_report(table=table, groupby_level=groupby_level, start_date = '2020-06-02', end_date = '2020-07-02')
#         save_metric(mcr_month_date, 'mcr_generate_time', time.time() - s, name, groupby_level)

        print (f'job time is {time.time() - job_start}')
        print (f'overall time is {time.time() - start}')
#         save_metric(mcr_month_date, 'overall_time', time.time() - start, name, groupby_level)

        print ()
    except Exception as e:
        print (traceback.print_exc())
        print (str(e))

116 AXA
inital file on date 2020-08-31 written
initialization time 111.50154161453247
dialy file on date 2020-09-01 generated
     0 2020-09-01 21.588369369506836
job time is 21.589236736297607
overall time is 133.0912094116211



In [None]:
#
table_codes = {117: 'MIRAE'}

table_codes = {"116": "AXA","117": "MIRAE","107": "BOB","120": "INVESCO","RMF": "Reliance",
"118": "Edelweiss","135": "IDBIMF","125": "IBMF","128": "AXISMF","178": "BNPMF","152": "ITI",
"105": "JMMF","103": "PMF","166": "Quant","130": "PeerlessMF","104": "TAURUS","108": "UTI",
"123": "Quantum","127": "MOTILAL","102": "LIC","176": "SundaramMF","101": "canrobeco","129": "DLFPramerica"}

table_codes = {"117": "MIRAE","120": "INVESCO","RMF": "Reliance",
"118": "Edelweiss","135": "IDBIMF","125": "IBMF","128": "AXISMF","178": "BNPMF","152": "ITI",
"105": "JMMF","103": "PMF","166": "Quant","130": "PeerlessMF","104": "TAURUS","108": "UTI",
"123": "Quantum","127": "MOTILAL","102": "LIC","176": "SundaramMF","101": "canrobeco","129": "DLFPramerica"}

table_codess ={130:"peerless", 120:"INVESCO", }

table_codes = {125: 'IBMF', 152: "ITI", 123: "Quantum",}

table_codes = {  107: "BOB",135: "IDBIMF", 178: "BNPMF", 103: "PMF"}

table_codes = { 103:"peerless", 118:"edelwwise"}
table_codes = {"129": "DLFPramerica","120": "INVESCO"}
table_codes = {"RMF": "Reliance" }
# table_codes = {"116": "AXA"}
# exception taurus
for code,name in (table_codes.items()):
    try:
        
        try:
            spark.catalog.clearCache()
        except:
            pass
        try:
            sc.stop()
        except:

            print ("error no sc")
        # intialize spark again
        conf = SparkConf()
        conf.setMaster("local[*]").setAppName("My app").set("spark.sql.shuffle.partitions", 16)

        #Create spark context and sparksession
        sc = SparkContext.getOrCreate(conf=conf)
        SparkContext.setSystemProperty("spark.driver.memory", "60g")
        SparkContext.setSystemProperty("spark.executor.memory", "60g")
        SparkContext.setSystemProperty("spark.executor.offHeap.enabled", "true")
        SparkContext.setSystemProperty("spark.executor.offHeap.size", "100g")
        spark = SparkSession(sc)




        print (code, name)
        start = time.time()
        table = f'm_Trans_{code}'
        groupby_level='SPFTTer'
        init_date = '2020-03-31'
        start_date = '2020-04-07'
        end_date = '2020-09-02'
#         mcr_month_date = '2020-05-01'

#         records = initialize(init_date, table=table,direct_db='BankRecon', groupby_level=groupby_level)
#         save_metric(init_date, 'records_processed', records, name, groupby_level, table)

        print (f'initialization time {time.time() - start}')
#         save_metric(init_date, 'intialization_time', time.time() - start, name, groupby_level, table)
        job_start = time.time()


        for i,ele in enumerate(list(daterange(start_date, end_date))):
            try:
                spark.catalog.clearCache()
            except:
                pass
            try:
                sc.stop()
            except:

                print ("error no sc")
            # intialize spark again
            conf = SparkConf()
            conf.setMaster("local[*]").setAppName("My app").set("spark.sql.shuffle.partitions", 16)

            #Create spark context and sparksession
            sc = SparkContext.getOrCreate(conf=conf)
            SparkContext.setSystemProperty("spark.driver.memory", "120g")
            SparkContext.setSystemProperty("spark.executor.memory", "120g")
            SparkContext.setSystemProperty("spark.executor.offHeap.enabled", "true")
            SparkContext.setSystemProperty("spark.executor.offHeap.size", "100g")
            spark = SparkSession(sc)
            
            s = time.time()
            day_records, combined_records = dialy_job(ele, groupby_level=groupby_level, table=table, direct_db='BankRecon', nav_table=f'fund_navreg_{name}',
            scheme_table=f'fund_master_{name}_{code}', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
             category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
            )
            save_metric(ele, 'day_records', day_records, name, groupby_level, table)
            save_metric(ele, 'combined_records', combined_records, name, groupby_level, table)
            save_metric(ele, 'dialy_job_time', time.time() - s, name, groupby_level, table)
            print ("    ",i, ele, time.time() - s)

        s = time.time() 
#         generate_mcr_report(table=table, groupby_level=groupby_level, start_date = '2020-06-02', end_date = '2020-07-02')
#         save_metric(mcr_month_date, 'mcr_generate_time', time.time() - s, name, groupby_level)

        print (f'job time is {time.time() - job_start}')
        print (f'overall time is {time.time() - start}')
#         save_metric(mcr_month_date, 'overall_time', time.time() - start, name, groupby_level)

        print ()
    except Exception as e:
        print (traceback.print_exc())
        print (str(e))

RMF Reliance
initialization time 4.76837158203125e-06


In [None]:
#
table_codes = {117: 'MIRAE'}

table_codes = {"116": "AXA","117": "MIRAE","107": "BOB","120": "INVESCO","RMF": "Reliance",
"118": "Edelweiss","135": "IDBIMF","125": "IBMF","128": "AXISMF","178": "BNPMF","152": "ITI",
"105": "JMMF","103": "PMF","166": "Quant","130": "PeerlessMF","104": "TAURUS","108": "UTI",
"123": "Quantum","127": "MOTILAL","102": "LIC","176": "SundaramMF","101": "canrobeco","129": "DLFPramerica"}

table_codes = {"117": "MIRAE","120": "INVESCO","RMF": "Reliance",
"118": "Edelweiss","135": "IDBIMF","125": "IBMF","128": "AXISMF","178": "BNPMF","152": "ITI",
"105": "JMMF","103": "PMF","166": "Quant","130": "PeerlessMF","104": "TAURUS","108": "UTI",
"123": "Quantum","127": "MOTILAL","102": "LIC","176": "SundaramMF","101": "canrobeco","129": "DLFPramerica"}

table_codess ={130:"peerless", 120:"INVESCO", }

table_codes = {125: 'IBMF', 152: "ITI", 123: "Quantum",}

table_codes = {  107: "BOB",135: "IDBIMF", 178: "BNPMF", 103: "PMF"}

table_codes = { 103:"peerless", 118:"edelwwise"}
table_codes = {"129": "DLFPramerica","120": "INVESCO"}
table_codes = {"RMF": "Reliance" }
table_codes = {"116": "AXA"}
# exception taurus
for code,name in (table_codes.items()):
    try:
        
        try:
            spark.catalog.clearCache()
        except:
            pass
        try:
            sc.stop()
        except:

            print ("error no sc")
        # intialize spark again
        conf = SparkConf()
        conf.setMaster("local[*]").setAppName("My app").set("spark.sql.shuffle.partitions", 16)

        #Create spark context and sparksession
        sc = SparkContext.getOrCreate(conf=conf)
        SparkContext.setSystemProperty("spark.driver.memory", "60g")
        SparkContext.setSystemProperty("spark.executor.memory", "60g")
        SparkContext.setSystemProperty("spark.executor.offHeap.enabled", "true")
        SparkContext.setSystemProperty("spark.executor.offHeap.size", "100g")
        spark = SparkSession(sc)




        print (code, name)
        start = time.time()
        table = f'm_Trans_{code}'
        groupby_level='SPFTTer'
        init_date = '2020-03-31'
        start_date = '2020-04-01'
        end_date = '2020-09-02'
#         mcr_month_date = '2020-05-01'

        records = initialize(init_date, table=table,direct_db='BankRecon', groupby_level=groupby_level)
        #save_metric(init_date, 'records_processed', records, name, groupby_level, table)

        print (f'initialization time {time.time() - start}')
#         save_metric(init_date, 'intialization_time', time.time() - start, name, groupby_level)
        job_start = time.time()


        for i,ele in enumerate(list(daterange(start_date, end_date))):
            try:
                spark.catalog.clearCache()
            except:
                pass
            try:
                sc.stop()
            except:

                print ("error no sc")
            # intialize spark again
            conf = SparkConf()
            conf.setMaster("local[*]").setAppName("My app").set("spark.sql.shuffle.partitions", 16)

            #Create spark context and sparksession
            sc = SparkContext.getOrCreate(conf=conf)
            SparkContext.setSystemProperty("spark.driver.memory", "120g")
            SparkContext.setSystemProperty("spark.executor.memory", "120g")
            SparkContext.setSystemProperty("spark.executor.offHeap.enabled", "true")
            SparkContext.setSystemProperty("spark.executor.offHeap.size", "100g")
            spark = SparkSession(sc)
            
            s = time.time()
            day_records, combined_records = dialy_job(ele, groupby_level=groupby_level, table=table, direct_db='BankRecon', nav_table=f'fund_navreg_{name}',
            scheme_table=f'fund_master_{name}_{code}', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
             category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
            )
            #save_metric(ele, 'day_records', day_records, name, groupby_level, table)
            #save_metric(ele, 'combined_records', combined_records, name, groupby_level, table)
            #save_metric(ele, 'dialy_job_time', time.time() - s, name, groupby_level, table)
            print ("    ",i, ele, time.time() - s)

        s = time.time() 
#         generate_mcr_report(table=table, groupby_level=groupby_level, start_date = '2020-06-02', end_date = '2020-07-02')
#         save_metric(mcr_month_date, 'mcr_generate_time', time.time() - s, name, groupby_level)

        print (f'job time is {time.time() - job_start}')
        print (f'overall time is {time.time() - start}')
#         save_metric(mcr_month_date, 'overall_time', time.time() - start, name, groupby_level)

        print ()
    except Exception as e:
        print (traceback.print_exc())
        print (str(e))

In [None]:
#
table_codes = {117: 'MIRAE'}

table_codes = {"116": "AXA","117": "MIRAE","107": "BOB","120": "INVESCO","RMF": "Reliance",
"118": "Edelweiss","135": "IDBIMF","125": "IBMF","128": "AXISMF","178": "BNPMF","152": "ITI",
"105": "JMMF","103": "PMF","166": "Quant","130": "PeerlessMF","104": "TAURUS","108": "UTI",
"123": "Quantum","127": "MOTILAL","102": "LIC","176": "SundaramMF","101": "canrobeco","129": "DLFPramerica"}

table_codes = {"117": "MIRAE","120": "INVESCO","RMF": "Reliance",
"118": "Edelweiss","135": "IDBIMF","125": "IBMF","128": "AXISMF","178": "BNPMF","152": "ITI",
"105": "JMMF","103": "PMF","166": "Quant","130": "PeerlessMF","104": "TAURUS","108": "UTI",
"123": "Quantum","127": "MOTILAL","102": "LIC","176": "SundaramMF","101": "canrobeco","129": "DLFPramerica"}

table_codes = {125: 'IBMF', 152: "ITI", 123: "Quantum",}
table_codes = {125:'IBMF', 104:'TARUS', 103:"peerless", 123:"Quantum", 118:"edelwwise"}

table_codes = {"RMF": "Reliance" }

# exception taurus
for code,name in (table_codes.items()):
    try:
        
        try:
            spark.catalog.clearCache()
        except:
            pass
        try:
            sc.stop()
        except:

            print ("error no sc")
        # intialize spark again
        conf = SparkConf()
        conf.setMaster("local[*]").setAppName("My app").set("spark.sql.shuffle.partitions", 16)

        #Create spark context and sparksession
        
        SparkContext.setSystemProperty("spark.driver.memory", "60g")
        SparkContext.setSystemProperty("spark.executor.memory", "60g")
        SparkContext.setSystemProperty("spark.executor.offHeap.enabled", "true")
        SparkContext.setSystemProperty("spark.executor.offHeap.size", "200g")
        sc = SparkContext.getOrCreate(conf=conf)
        spark = SparkSession(sc)




        print (code, name)
        start = time.time()
        table = f'm_Trans_{code}'
        groupby_level='SPFT'
        init_date = '2020-03-31'
        start_date = '2020-06-05'
        end_date = '2020-08-02'
#         mcr_month_date = '2020-05-01'

        #records = initialize(init_date, table=table,direct_db='BankRecon', groupby_level=groupby_level)
        #save_metric(init_date, 'records_processed', records, name, groupby_level, table)

        print (f'initialization time {time.time() - start}')
#         save_metric(init_date, 'intialization_time', time.time() - start, name, groupby_level)
        job_start = time.time()


        for i,ele in enumerate(list(daterange(start_date, end_date))):
            try:
                spark.catalog.clearCache()
            except:
                pass
            try:
                sc.stop()
            except:

                print ("error no sc")
            # intialize spark again
            conf = SparkConf()
            conf.setMaster("local[*]").setAppName("My app").set("spark.sql.shuffle.partitions", 16)

            #Create spark context and sparksession
            
            SparkContext.setSystemProperty("spark.driver.memory", "60g")
            SparkContext.setSystemProperty("spark.executor.memory", "60g")
            SparkContext.setSystemProperty("spark.executor.offHeap.enabled", "true")
            SparkContext.setSystemProperty("spark.executor.offHeap.size", "200g")
            sc = SparkContext.getOrCreate(conf=conf)
            spark = SparkSession(sc)
            
            s = time.time()
            day_records, combined_records = dialy_job(ele, groupby_level=groupby_level, table=table, direct_db='BankRecon', nav_table=f'fund_navreg_{name}',
            scheme_table=f'fund_master_{name}_{code}', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
             category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
            )
            save_metric(ele, 'day_records', day_records, name, groupby_level, table)
            save_metric(ele, 'combined_records', combined_records, name, groupby_level, table)
            save_metric(ele, 'dialy_job_time', time.time() - s, name, groupby_level, table)
            print ("    ",i, ele, time.time() - s)

        s = time.time() 
#         generate_mcr_report(table=table, groupby_level=groupby_level, start_date = '2020-06-02', end_date = '2020-07-02')
#         save_metric(mcr_month_date, 'mcr_generate_time', time.time() - s, name, groupby_level)

        print (f'job time is {time.time() - job_start}')
        print (f'overall time is {time.time() - start}')
#         save_metric(mcr_month_date, 'overall_time', time.time() - start, name, groupby_level)

        print ()
    except Exception as e:
        print (traceback.print_exc())
        print (str(e))

In [None]:
#
init_date = '2020-04-01'
groupby_level = 'SPFT'
table = 'trans116'
direct_db = 'kfintech_funds'
nav_table = 'nav_master'
scheme_table = 'scheme_master'

init_date = '2020-04-01'
groupby_level = 'SPFT'
table = 'trans116'
direct_db = 'kfintech_funds'
nav_table = 'nav_master'
scheme_table = 'scheme_master'

initialize(init_date, table=table,direct_db=direct_db, groupby_level=groupby_level)
# init_date = '2020-04-30'
# initialize(init_date, table=table,direct_db=direct_db, groupby_level=groupby_level)


for ele in daterange('2020-04-02', '2020-04-03'):
    dialy_job(ele, groupby_level=groupby_level, table=table, direct_db=direct_db, nav_table=nav_table,
                scheme_table=scheme_table, scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
                 category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
                )

In [54]:
latest_data = spark.read.parquet(f"{'m_Trans_116'}_dialy/data_{'SPFTTer'}_{'2020-09-01'}.parquet")
latest_data_ = spark.read.parquet(f"{'m_Trans_116'}_dialy/data_{'SPFT'}_{'2020-09-01'}.parquet")
# latest_data.coalesce(1).write.csv('axa_calculations_ter.csv', header=True,mode='overwrite')
newmcrid='fm_NewMCRId'
latest_data.groupby(newmcrid).agg(sum('aum')).show()
latest_data_.groupby(newmcrid).agg(sum('aum')).show()

+-----------+--------------------+
|fm_NewMCRId|            sum(aum)|
+-----------+--------------------+
|    AIII_29| 9.175777190211585E8|
|     AII_25|3.7345910768308026E8|
|     AII_26| 4.441361186929758E9|
|      AI_12| 3.734928158315652E8|
|     AII_21| 7.246340465773715E8|
|       AI_1| 6.771047297224059E8|
|    AIII_31| 3.005910526386833E8|
|    AIII_28| 3.176874591605646E9|
|       AI_3| 2.529782207578232E9|
|       null|                null|
|     AII_19| 1.711511444015332E9|
|     AII_17|   4.0515296491536E8|
|       AI_2| 2.331079486276221E9|
|       AI_6|3.2928791216110605E8|
|    AIII_27| 8.683341086366962E8|
+-----------+--------------------+

+-----------+--------------------+
|fm_NewMCRId|            sum(aum)|
+-----------+--------------------+
|    AIII_29| 9.175777190211582E8|
|     AII_25|3.7345910768307984E8|
|     AII_26| 4.441361186929739E9|
|      AI_12|3.7349281583156604E8|
|     AII_21|  7.24634046577371E8|
|       AI_1| 6.771047297224042E8|
|    AIII_31| 3.005

In [55]:
llatest_data = spark.read.parquet(f"{'m_Trans_116'}_dialy/data_{'SPFTTer'}_{'2020-09-01'}.parquet")
llatest_data.groupby(newmcrid).agg(sum('aum')).show()

+-----------+--------------------+
|fm_NewMCRId|            sum(aum)|
+-----------+--------------------+
|    AIII_29| 9.175777190211585E8|
|     AII_25|3.7345910768308026E8|
|     AII_26| 4.441361186929758E9|
|      AI_12| 3.734928158315652E8|
|     AII_21| 7.246340465773715E8|
|       AI_1| 6.771047297224059E8|
|    AIII_31| 3.005910526386833E8|
|    AIII_28| 3.176874591605646E9|
|       AI_3| 2.529782207578232E9|
|       null|                null|
|     AII_19| 1.711511444015332E9|
|     AII_17|   4.0515296491536E8|
|       AI_2| 2.331079486276221E9|
|       AI_6|3.2928791216110605E8|
|    AIII_27| 8.683341086366962E8|
+-----------+--------------------+



In [74]:
blatest_data = spark.read.parquet(f"{'m_Trans_116'}_dialy/data_{'SPFTBTer'}_{'2020-09-01'}.parquet")
blatest_data.groupby(newmcrid).agg(sum('aum')).show()

+-----------+--------------------+
|fm_NewMCRId|            sum(aum)|
+-----------+--------------------+
|    AIII_29| 7.691624059313667E8|
|     AII_25|-1.23559474364505...|
|     AII_26| 4.130736985815619E9|
|      AI_12|-1.98018786825583...|
|     AII_21| 7.586508808656341E8|
|       AI_1| 6.771047297223973E8|
|    AIII_31| 2.874654285550581E7|
|    AIII_28| 3.038452141457969E9|
|       AI_3|-1.17023161735102...|
|       null|                null|
|     AII_19|-3.06165551048728...|
|     AII_17|4.0515296491536057E8|
|       AI_2|-7.02685621425754...|
|       AI_6|-1.24080189235766...|
|    AIII_27| 6.237194375079607E8|
+-----------+--------------------+



In [None]:
def get_data(start_date, end_date, groupby_level='SPT', table='m_Trans_116'):
    
    final_data = None

    dates_list = list(daterange(start_date, end_date))
    for date in dates_list:
        date_obj = datetime.datetime.strptime(date, '%Y-%m-%d')
        day_num = date_obj.day
        latest_data = spark.read.parquet(f"{table}_dialy/data_{groupby_level}_{date}.parquet")
        if final_data:
            final_data = final_data.union(latest_data)
        else:
            final_data = latest_data
    return final_data

def generate_mcr_report(table='m_Trans_116', ignored_tr_types = ['CNI', 'CNO', 'TRMI', 'TRMO', 
                                                                 'TRFI', 'TRFO', 'PLDO', 'UPLO', 'DMT',
                                                                 'RMT', 'CNIR', 'CNOR', 'TRMIR', 'TRMOR',
                                                                 'TRFIR', 'TRFOR', 'PLDOR', 'UPLOR', 'DMTR', 'RMTR'],
        liquid_fund_tr_types = ['NEW', 'ADD', 'IPO', 'SIN', 'NEWR', 'ADDR', 'IPOR', 'SINR'],
                       start_date = '2020-05-02', end_date = '2020-06-02', groupby_level='SPT',
                        transaction_type='TransactionType',folio='Folio',folio_ignore_types = ['PLDO', 'UPLO', 'DMT', 'RMT', 'PLDOR', 'UPLOR', 'DMTR', 'RMTR'], 
                      fn_nav = 'fn_nav', newmcrid='fm_NewMCRId', today_pu = 'today_pu', today_ru = 'today_ru', scheme='SchemeCode', aum='aum', plan='PlanCode'):
    
    till_but_one_day_data = None
    inflow = 'inflow'
    outflow = 'outflow'
    calculated_date = 'calculated_date'
    batch_close_date = 'batch_close_date'
    balance_pu = 'balance_pu'
    balance_ru = 'balance_ru'
    balance_units = 'balance_units'
    
    
    dates_list = list(daterange(start_date, end_date))
    
    for date in dates_list[:-1]:
        date_obj = datetime.datetime.strptime(date, '%Y-%m-%d')
        day_num = date_obj.day
        latest_data = spark.read.parquet(f"{table}_dialy/data_{groupby_level}_{date}.parquet")
#         print (ele, latest_data.count())
        if till_but_one_day_data:
            till_but_one_day_data = till_but_one_day_data.union(latest_data)
        else:
            till_but_one_day_data = latest_data
            
    
    date_obj = datetime.datetime.strptime(date, '%Y-%m-%d')
    final_day = date_obj + datetime.timedelta(1)
    final_day_str = final_day.strftime('%Y-%m-%d')
    final_day_data = spark.read.parquet(f"{table}_dialy/data_{groupby_level}_{final_day_str}.parquet")
    last_but_one_day_data = latest_data

    till_but_one_day_data = till_but_one_day_data.filter( ~(col(transaction_type).isin(ignored_tr_types)) ).fillna({today_pu: 0, today_ru: 0, aum: 0, newmcrid: 'Others'})
    final_day_data = final_day_data.filter( ~(col(transaction_type).isin(ignored_tr_types)) ).fillna({today_pu: 0, today_ru: 0, aum: 0, newmcrid: 'Others'})
    
    
    all_data = till_but_one_day_data.union(final_day_data)
    sp_inf_ouf_data = all_data

    liquid_condition = ( (col(newmcrid) == 'A1b') & (col(calculated_date) == final_day_str) & (col(batch_close_date) == final_day_str) & (col(transaction_type).isin(liquid_fund_tr_types)) )
    
    final_day_data = final_day_data.withColumn(balance_pu,    when(liquid_condition, col(balance_pu) - col(today_pu)).otherwise(col(balance_pu)))
    final_day_data = final_day_data.withColumn(balance_ru,    when(liquid_condition, col(balance_ru) - col(today_ru)).otherwise(col(balance_ru)))
    final_day_data = final_day_data.withColumn(balance_units, when(liquid_condition, col(balance_pu) - col(balance_ru)).otherwise(col(balance_units)) )
    final_day_data = final_day_data.withColumn(aum,           when(liquid_condition,  col(balance_units) * col(fn_nav)).otherwise(col(aum))    )

    net_aum = final_day_data.groupby([newmcrid]).agg(sum(aum))
    avg_data = till_but_one_day_data.union(final_day_data)
#     avg_data = all_data
    
    # inflow, outflow logic change
    sp_data = avg_data
    # sp_inf_ouf_data = sp_inf_ouf_data.withColumn(inflow, (col(today_pu)*col(fn_nav)))
    # sp_inf_ouf_data = sp_inf_ouf_data.withColumn(outflow, (col(today_ru)*col(fn_nav)))
    sp = sp_data.groupby([newmcrid]).agg(countDistinct(scheme),countDistinct(plan))
    
#     inf_ouf_data = get_data(datetime.datetime(date_obj.year, 4, 2).strftime('%Y-%m-%d'), start_date, groupby_level, table).union(sp_data)
#     inf_ouf_data = inf_ouf_data.withColumn(inflow, (col(today_pu)*col(fn_nav)))
#     inf_ouf_data = inf_ouf_data.withColumn(outflow, (col(today_ru)*col(fn_nav)))
#     inf_ouf_data = inf_ouf_data.groupby([newmcrid]).agg(sum(col(inflow)),sum(col(outflow)))
    
#     spinout = sp.join(inf_ouf_data, on=[newmcrid], how='left')
#     spinout.show()
    spinout = sp
    # sp_inf_ouf_data = sp_inf_ouf_data.withColumn(inflow, (col(today_pu)*col(fn_nav)))
    # sp_inf_ouf_data = sp_inf_ouf_data.withColumn(outflow, (col(today_ru)*col(fn_nav)))

    folio_count = avg_data.groupby(folio, scheme, plan, newmcrid).agg(sum('aum')).filter(col('sum(aum)') - 0 > 0.1).groupby(newmcrid).agg(countDistinct(folio))

    avg_aum = avg_data.groupby([newmcrid]).agg(sum(aum))
    avg_aum = avg_aum.withColumn('avg_aum', col(f'sum({aum})')/(len(list(daterange(start_date, end_date))))).drop(f'sum({aum})')
    

    mcr_net_aum = spinout.join(net_aum, on=[newmcrid], how='left')
    mcr = mcr_net_aum.join(avg_aum, on=[newmcrid], how='left')
    mcr = mcr.join(folio_count, on=[newmcrid], how='left')
    
    # inflow outflow
    all_data = get_data('2020-04-02', end_date, groupby_level=groupby_level, table=table)
    all_data = all_data.filter(~liquid_condition)
    all_data = all_data.fillna({today_pu: 0, today_ru: 0, aum: 0, newmcrid: 'Others', inflow:0, outflow:0})
#     all_data = all_data.filter( ~(col(transaction_type).isin(ignored_tr_types)) ).fillna({today_pu: 0, today_ru: 0, aum: 0, newmcrid: 'Others', inflow:0, outflow:0})
    inout = all_data.groupby([newmcrid]).agg(sum(inflow), sum(outflow))
    mcr = mcr.join(inout, on=[newmcrid], how='left')
    
#     inout.show(1000)
    mcr.show(1000)
    
    mcr.coalesce(1).write.csv(f"{table}_mcr/mcr_{groupby_level}_{final_day_str}.csv",header=True, mode='overwrite')
#     mcr.coalesce(1).write.parquet(f"{table}_mcr/data_{groupby_level}_{date_str}.parquet", mode='overwrite')
    
    
    return mcr,avg_data, sp_inf_ouf_data, all_data
 
    
table = 'trans116'
table = 'm_Trans_116'
groupby_level = 'SPFT'
start_date = '2020-04-02'
end_date = '2020-05-02'
mcr, _,_,_ = generate_mcr_report(table=table, groupby_level=groupby_level, start_date=start_date, end_date=end_date)




In [None]:
table_codes = {117: 'MIRAE'}







# exception taurus
for code,name in (table_codes.items()):
    try:
        
        try:
            spark.catalog.clearCache()
        except:
            pass
        try:
            sc.stop()
        except:

            print ("error no sc")
        # intialize spark again
        conf = SparkConf()
        conf.setMaster("local[*]").setAppName("My app")

        #Create spark context and sparksession
        sc = SparkContext.getOrCreate(conf=conf)
        SparkContext.setSystemProperty("spark.driver.memory", "40g")
        spark = SparkSession(sc)




        print (code, name)
        start = time.time()
        table = f'm_Trans_{code}'
        groupby_level='SPT'
        init_date = '2020-06-30'
        mcr_month_date = '2020-05-01'

        records = initialize(init_date, table=table,direct_db='BankRecon', groupby_level=groupby_level)
#         save_metric(init_date, 'records_processed', records, name, groupby_level)

        print (f'initialization time {time.time() - start}')
#         save_metric(init_date, 'intialization_time', time.time() - start, name, groupby_level)
        job_start = time.time()


        for i,ele in enumerate(list(daterange('2020-07-01', '2020-08-02'))):
            s = time.time()
            day_records, combined_records = dialy_job(ele, groupby_level=groupby_level, table=table, direct_db='BankRecon', nav_table=f'fund_navreg_{name}',
            scheme_table=f'fund_master_{name}', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
             category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
            )
#             save_metric(ele, 'day_records', day_records, name, groupby_level)
#             save_metric(ele, 'combined_records', combined_records, name, groupby_level)
#             save_metric(ele, 'dialy_job_time', time.time() - s, name, groupby_level)
            print ("    ",i, ele, time.time() - s)

        s = time.time() 
#         generate_mcr_report(table=table, groupby_level=groupby_level, start_date = '2020-06-02', end_date = '2020-07-02')
#         save_metric(mcr_month_date, 'mcr_generate_time', time.time() - s, name, groupby_level)

        print (f'job time is {time.time() - job_start}')
        print (f'overall time is {time.time() - start}')
#         save_metric(mcr_month_date, 'overall_time', time.time() - start, name, groupby_level)

        print ()
    except Exception as e:
        print (traceback.print_exc())
        print (str(e))

In [None]:
table_codes = {'RMF': 'Reliance'}

for code,name in (table_codes.items()):
    try:
        
        try:
            spark.catalog.clearCache()
        except:
            pass
        try:
            sc.stop()
        except:

            print ("error no sc")
        # intialize spark again
        conf = SparkConf()
        conf.setMaster("local[*]").setAppName("My app")

        #Create spark context and sparksession
        sc = SparkContext.getOrCreate(conf=conf)
        SparkContext.setSystemProperty("spark.driver.memory", "40g")
        spark = SparkSession(sc)




        print (code, name)
        start = time.time()
        table = f'm_Trans_{code}'
        groupby_level='SPT'
        init_date = '2020-06-30'
        mcr_month_date = '2020-05-01'

        records = initialize(init_date, table=table,direct_db='BankRecon', groupby_level=groupby_level)
#         save_metric(init_date, 'records_processed', records, name, groupby_level)

        print (f'initialization time {time.time() - start}')
#         save_metric(init_date, 'intialization_time', time.time() - start, name, groupby_level)
        job_start = time.time()


        for i,ele in enumerate(list(daterange('2020-07-01', '2020-08-02'))):
            s = time.time()
            day_records, combined_records = dialy_job(ele, groupby_level=groupby_level, table=table, direct_db='BankRecon', nav_table=f'fund_navreg_{name}',
            scheme_table=f'fund_master_{name}', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
             category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
            )
#             save_metric(ele, 'day_records', day_records, name, groupby_level)
#             save_metric(ele, 'combined_records', combined_records, name, groupby_level)
#             save_metric(ele, 'dialy_job_time', time.time() - s, name, groupby_level)
            print ("    ",i, ele, time.time() - s)

        s = time.time() 
#         generate_mcr_report(table=table, groupby_level=groupby_level, start_date = '2020-06-02', end_date = '2020-07-02')
#         save_metric(mcr_month_date, 'mcr_generate_time', time.time() - s, name, groupby_level)

        print (f'job time is {time.time() - job_start}')
        print (f'overall time is {time.time() - start}')
#         save_metric(mcr_month_date, 'overall_time', time.time() - start, name, groupby_level)

        print ()
    except Exception as e:
        print (traceback.print_exc())
        print (str(e))

In [None]:
import datetime
# datetime.datetime.year('2020-08-04')

date_obj = datetime.datetime.strptime('2020-08-05', '%Y-%m-%d')
date_obj.year

datetime.datetime(date_obj.year, 4, 2).strftime('%Y-%m-%d')

In [None]:
{"116": "AXA",
"117": "MIRAE",
"107": "BOB",
"120": "INVESCO",
"RMF": "Reliance",
"118": "Edelweiss",
"135": "IDBIMF",
"125": "IBMF",
"128": "AXISMF",
"178": "BNPMF",
"152": "ITI",
"105": "JMMF",
"103": "PMF",
"166": "Quant",
"130": "PeerlessMF",
"104": "TAURUS",
"108": "UTI",
"123": "Quantum",
"127": "MOTILAL",
"102": "LIC",
"176": "SundaramMF",
"101": "canrobeco",
"129": "DLFPramerica"}

In [None]:
#### all the script exaaecution for all the funds
table_codes = {
               102:'LIC',103:'pmf',104:'tarus',105:'JMMF',107:'BOB',108:'uti',
               116:'AXA',117:'mirae',118:'edelwwise',120:'invesco',123:'quantum'
               ,125:'IBMF',127:'motilal',128:'axismf',130:'peerless',135:'IDBIMF',152:'ITI',166:'quant',
               176:'sundaram',178:'BNPMF', 'RMF':'reliance'}
table_codes = {104:'taurus',105:'JMMF',107:'BOB',108:'uti',
               116:'AXA',117:'mirae',118:'edelwwise',120:'invesco',123:'quantum'
               ,125:'IBMF',127:'motilal',128:'axismf',130:'peerless',135:'IDBIMF',152:'ITI',166:'quant',
               176:'sundaram',178:'BNPMF', 'RMF':'reliance'}

table_codes = {101:'canrobeco',
               102:'LIC',103:'pmf',104:'tarus',105:'JMMF',107:'BOB',108:'uti',
               116:'AXA',117:'mirae',118:'edelwwise',120:'invesco',123:'quantum'
               ,125:'IBMF',127:'motilal',128:'axismf',129:'pgim',130:'peerless',135:'IDBIMF',152:'ITI',166:'quant'
               ,178:'BNPMF', 'RMF':'reliance', '129': 'dlfpramerica', 118:'edelweiss', 130:'peerlessMF', '176':'sundaramMF'}
table_codes = {116: 'AXA'}

# exception taurus
for code,name in (table_codes.items()):
    try:
        
        try:
            spark.catalog.clearCache()
        except:
            pass
        try:
            sc.stop()
        except:

            print ("error no sc")
        # intialize spark again
        conf = SparkConf()
        conf.setMaster("local[*]").setAppName("My app")

        #Create spark context and sparksession
        sc = SparkContext.getOrCreate(conf=conf)
        SparkContext.setSystemProperty("spark.driver.memory", "40g")
        spark = SparkSession(sc)




        print (code, name)
        start = time.time()
        table = f'm_Trans_{code}'
        groupby_level='SPFT'
        init_date = '2020-04-30'
        mcr_month_date = '2020-05-01'

        records = initialize(init_date, table=table,direct_db='BankRecon', groupby_level=groupby_level)
#         save_metric(init_date, 'records_processed', records, name, groupby_level)

        print (f'initialization time {time.time() - start}')
#         save_metric(init_date, 'intialization_time', time.time() - start, name, groupby_level)
        job_start = time.time()


        for i,ele in enumerate(list(daterange('2020-05-01', '2020-06-02'))):
            s = time.time()
            day_records, combined_records = dialy_job(ele, groupby_level=groupby_level, table=table, direct_db='BankRecon', nav_table=f'fund_navreg_{name}',
            scheme_table=f'fund_master_{name}', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
             category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
            )
            save_metric(ele, 'day_records', day_records, name, groupby_level)
            save_metric(ele, 'combined_records', combined_records, name, groupby_level)
            save_metric(ele, 'dialy_job_time', time.time() - s, name, groupby_level)
            print ("    ",i, ele, time.time() - s)

        s = time.time() 
#         generate_mcr_report(table=table, groupby_level=groupby_level, start_date = '2020-06-02', end_date = '2020-07-02')
#         save_metric(mcr_month_date, 'mcr_generate_time', time.time() - s, name, groupby_level)

        print (f'job time is {time.time() - job_start}')
        print (f'overall time is {time.time() - start}')
#         save_metric(mcr_month_date, 'overall_time', time.time() - start, name, groupby_level)

        print ()
    except Exception as e:
        print (traceback.print_exc())
        print (str(e))


In [None]:
#### generate mcr for august for all the funds
# table_codes = { '129': 'dlfpramerica', 118:'edelweiss', 130:'peerlessMF', '176':'sundaramMF'}

table_codes = {101:'canrobeco',
               102:'LIC',103:'pmf',104:'tarus',105:'JMMF',107:'BOB',108:'uti',
               116:'AXA',117:'mirae',118:'edelwwise',120:'invesco',123:'quantum'
               ,125:'IBMF',127:'motilal',128:'axismf',129:'pgim',130:'peerless',135:'IDBIMF',152:'ITI',166:'quant'
               ,178:'BNPMF', 'RMF':'reliance', '129': 'dlfpramerica', 118:'edelweiss', 130:'peerlessMF', '176':'sundaramMF'}
# exception taurus
for code,name in (table_codes.items()):
    try:
        
        try:
            spark.catalog.clearCache()
        except:
            pass
        try:
            sc.stop()
        except:

            print ("error no sc")
        # intialize spark again
        conf = SparkConf()
        conf.setMaster("local[*]").setAppName("My app")

        #Create spark context and sparksession
        sc = SparkContext.getOrCreate(conf=conf)
        SparkContext.setSystemProperty("spark.driver.memory", "40g")
        spark = SparkSession(sc)




        print (code, name)
        start = time.time()
        table = f'm_Trans_{code}'
        groupby_level='SPT'
        init_date = '2020-07-01'
        mcr_month_date = '2020-07-01'

#         records = initialize(init_date, table=table,direct_db='BankRecon', groupby_level=groupby_level)
#         save_metric(init_date, 'records_processed', records, name, groupby_level)

#         print (f'initialization time {time.time() - start}')
#         save_metric(init_date, 'intialization_time', time.time() - start, name, groupby_level)
#         job_start = time.time()


#         for i,ele in enumerate(list(daterange('2020-07-02', '2020-08-02'))):
#             s = time.time()
#             day_records, combined_records = dialy_job(ele, groupby_level=groupby_level, table=table, direct_db='BankRecon', nav_table=f'fund_navreg_{name}',
#             scheme_table=f'fund_master_{name}', scheme_code='fm_scheme', plan_code='fm_plan', nature='fm_nature',
#              category = 'fm_SebiSchemeCategory',subcategory = 'fm_SebiSchemeSubCategory', newmcrid='fm_NewMCRId'
#             )
#             save_metric(ele, 'day_records', day_records, name, groupby_level)
#             save_metric(ele, 'combined_records', combined_records, name, groupby_level)
#             save_metric(ele, 'dialy_job_time', time.time() - s, name, groupby_level)
#             print ("    ",i, ele, time.time() - s)

#         s = time.time() 
        generate_mcr_report(table=table, groupby_level=groupby_level, start_date = '2020-07-02', end_date = '2020-08-02')
#         save_metric(mcr_month_date, 'mcr_generate_time', time.time() - s, name, groupby_level)

#         print (f'job time is {time.time() - job_start}')
#         print (f'overall time is {time.time() - start}')
#         save_metric(mcr_month_date, 'overall_time', time.time() - start, name, groupby_level)

        print ()
    except Exception as e:
        print (traceback.print_exc())
        print (str(e))
