In [None]:
from sqlalchemy import create_engine, event
import pyodbc
import urllib.parse
import time
import pandas as pd
import numpy as np
import datetime as dt

# Inputs & Outputs

In [None]:
# SQL database
server = 'CSKMA0400\RDB_Data'
db = 'JLDJobPath'
odbc_connection_string = 'DRIVER={SQL Server Native Client 11.0};SERVER='+server+';DATABASE='+db+';Trusted_Connection=yes'

# Set query date to roll-up to and number of episodes to include
query_date = '20160401'
n_episodes = 5
n_years = 0

#input table
jld_sql_table = "linkedclaims_casuals_2018m04_v2"
#output table
flat_jld_sql_table = jld_sql_table+"_flat_" + query_date

# Data dictionary below MUST include the variables and selection for jld_sql_table above!
ddictionaryfilename = "D:/Data/linkedclaims_casuals_2018m04_v2_variables_types.csv"

# Set the below to True to extract total duration across episodes of levels of nominal variables
# Note the process is slow 
calculate_total_duration_days_of_levels = True

# set variable and its value to exlude episodes in the query
# variable_event_type = 'hist_lr'
# variable_event_type_value = 'EMPL'
# variable_event_type_condition = '!='
variable_event_type = 'lr_flag'
variable_event_type_value = '1'
variable_event_type_condition = '='

# Select PPSN, Flat/Transpose & SQL Upload

In [None]:
# Helper Functions

def batch(iterable, n = 1):
    current_batch = []
    for item in iterable:
        current_batch.append(item)
        if len(current_batch) == n:
            yield current_batch
            current_batch = []
    if current_batch:
        yield current_batch

def sanitize_varchar(varchar):
    sanitized = varchar.replace(' ','_')
    sanitized = sanitized.replace(',','_')
    sanitized = sanitized.replace('.','_')
    sanitized = sanitized.replace('-','_')
    sanitized = sanitized.replace('/','_')
    return sanitized
        
def values_from_list_to_dict (values, base_key, max_n_values):
    mydict = {}
    for i in range(max_n_values):
        key_value = base_key + '_' + str(i)
        try:
            mydict[key_value] = values[i]
        except:
            mydict[key_value] = None
    return mydict

def values_from_list_to_dict_as_string (values, base_key, max_n_values):
    mydict = {}
    for i in range(max_n_values):
        key_value = base_key + '_' + str(i)
        try:
            mydict[key_value] = sanitize_varchar(str(values[i]))
        except:
            mydict[key_value] = 'NULL'
    return mydict

def unique_value_from_nparray(values):
    values = pd.unique(values)
    values = values[pd.notnull(values)]
    value = np.nan
    if len(values) > 0:
        value = values[0]
    return value

def transpose_jld(data, dictionary, 
                  total_summary_episodes=0, 
                  total_number_of_years=0, 
                  max_date = None,
                  calculate_total_duration_days_of_levels = False):
    
    variable_uid = (datadictionary[datadictionary.UID == 1].Variable).tolist()[0]
    variable_event_start = (datadictionary[datadictionary.EventStart == 1].Variable).tolist()[0]
    variable_event_end = (datadictionary[datadictionary.EventEnd == 1].Variable).tolist()[0]
    
    variable_date_of_birth = None
    tmp = (datadictionary[datadictionary.DOB == 1].Variable).tolist()
    if len (tmp) > 0:
        variable_date_of_birth = tmp[0]
    
    # Get the list of variables that are Pinfo
    pii_variables = (datadictionary[datadictionary.Pinfo == 1].Variable).tolist()
    
    # Get the list of variables that are total_summary
    total_summary_variables = (datadictionary[datadictionary.TotalSummary == 1])
    total_summary_variables_num = (total_summary_variables[total_summary_variables.Type == 'Num'])
    total_summary_variables_char = (total_summary_variables[total_summary_variables.Type == 'Char'])
    total_summary_variables = total_summary_variables.Variable.tolist()
    total_summary_variables_num = total_summary_variables_num.Variable.tolist()
    total_summary_variables_char = total_summary_variables_char.Variable.tolist()

    # Get the list of variables that are episode_summary
    episode_summary_variables = (datadictionary[datadictionary.EpisodeSummary == 1])
    episode_summary_variables_char = (episode_summary_variables[episode_summary_variables.Type == 'Char'])
    episode_summary_variables = episode_summary_variables.Variable.tolist()
    episode_summary_variables_char = episode_summary_variables_char.Variable.tolist()
    
    # remove spurious rows and fix NaT in dates 
    # NaT are due to previous CVS 2 SQL step when processing original date > 2262-04-11 or original date < 1677-09-21
    # see: http://pandas.pydata.org/pandas-docs/stable/timeseries.html#timeseries-timestamp-limits
    if max_date is not None:
        data.drop(data[data[variable_event_start] >= max_date].index, inplace=True)
        data.loc[data[variable_event_end] > max_date, variable_event_end] = max_date
        data[variable_event_end].fillna(max_date, inplace=True)
    
    # Calculate year value for per-year summaries
    max_year = np.max(data[variable_event_end])
    year_range = [x for x in range(max_year.year, max_year.year-total_number_of_years,-1)]
    
    ret =[]
    grp_data = data.groupby(variable_uid)
    for uid, group in grp_data:
        mygroup = group.sort_values([variable_event_end, 'lr_flag', 'hist_lr', variable_event_start], 
                                    ascending = [False,False,False,False])
        mydict = {}
#         print (mygroup[['ppsn','StartDate','EndDate','hist_lr','lr_flag']])
        
        # save uid value if uid is not indicated in pii variables
        if len (pii_variables) == 0 or variable_uid not in pii_variables:
            myname = sanitize_varchar(variable_uid)
            mydict[myname] = uid
        
        # save total number of episodes
        mydict['total_n_episodes'] = float(len(mygroup))
        
        # save total duration of all episodes
        starts = np.array(mygroup[variable_event_start])
        ends = np.array(mygroup[variable_event_end])
        diff = (ends.astype('datetime64[us]') - starts.astype('datetime64[us]')).astype('timedelta64[D]')
        total_duration_days = np.nansum (diff)
        mydict ['total_duration_days'] = total_duration_days.astype(int)
        
        # save all pii variables exept DOB
        for elem in pii_variables:
                values = np.array(mygroup[elem])
                value = unique_value_from_nparray(values)
                myname = sanitize_varchar(elem)
                mydict[myname] = value

        # Calculate Age if DOB availabe and max_date provided
        mydict['age'] = np.nan
        if variable_date_of_birth is not None : 
            elem = variable_date_of_birth
            values = np.array(mygroup[elem])
            value = unique_value_from_nparray(values)
            myname = sanitize_varchar(elem)
            mydict[myname] = value
            if max_date is not None and pd.isnull(value) == False:
                max_date_np = np.array(max_date)
                age = (max_date_np.astype('datetime64[us]') - value.astype('datetime64[us]')).astype('timedelta64[Y]')
                mydict['age'] = age.astype(int)

        
        # Calculate sum, mean, min and max of all numeric variables in total_summary_variables
        for elem in total_summary_variables_num:
            values = np.array(mygroup[elem])
            values = values[~pd.isnull(values)]
            myname = sanitize_varchar(str(elem))
            if len(values) > 0:
                values = values.astype(float)
                mydict['total_sum_'+myname] = np.sum(values)
                mydict['total_mean_'+myname] = np.mean(values)
                mydict['total_max_'+myname] = np.max(values)
                mydict['total_min_'+myname] = np.min(values)
            else:
                mydict['total_sum_'+myname] = np.nan
                mydict['total_mean_'+myname] = np.nan
                mydict['total_max_'+myname] = np.nan
                mydict['total_min_'+myname] = np.nan
                
        # Calculate count and duration in days of levels of all character (aka nominal) variables in total_summary_variables
        for elem in total_summary_variables_char:
            res = mygroup[elem].value_counts()
            for value in res.index:
                if value != 'NULL':
                    myname = sanitize_varchar(str(elem))
                    myvalue = sanitize_varchar(str(value))
                    mydict ['total_n_'+myname+'_'+myvalue] = res.loc[value]
                    
                    # this is slow
                    if calculate_total_duration_days_of_levels == True:
                        mydf = mygroup[mygroup[elem] == value]
                        starts = np.array(mydf[variable_event_start])
                        ends = np.array(mydf[variable_event_end])
                        diff = (ends.astype('datetime64[us]') - starts.astype('datetime64[us]')).astype('timedelta64[D]')
                        diff = np.nansum (diff)
                        mydict ['total_duration_days_'+myname+'_'+myvalue] = diff.astype(int)
        
        
        if total_summary_episodes >0:
            for elem in episode_summary_variables:
                # Calculate age at beginning and end episode
                if variable_date_of_birth is not None and elem == variable_date_of_birth:
                    starts = np.array(mygroup[variable_event_start])
                    ends = np.array(mygroup[variable_event_end])
                    bdate = mydict[variable_date_of_birth]
                    if pd.isnull(bdate) == False:
                        bdates = np.array([bdate] * len(starts))
                        age_start = (starts.astype('datetime64[us]') - bdates.astype('datetime64[us]')).astype('timedelta64[Y]')
                        age_start = age_start.astype(int)
                        tmp = values_from_list_to_dict (age_start, 'age_start', total_summary_episodes)
                        mydict.update(tmp)

                        age_end = (ends.astype('datetime64[us]') - bdates.astype('datetime64[us]')).astype('timedelta64[Y]')
                        age_end = age_end.astype(int)
                        tmp = values_from_list_to_dict (age_end, 'age_end', total_summary_episodes)
                        mydict.update(tmp)
                else:
                    if elem in episode_summary_variables_char:
                        values = list(mygroup[elem])
                        myname = sanitize_varchar(elem)
                        tmp = values_from_list_to_dict_as_string (values, myname, total_summary_episodes)
                        mydict.update(tmp)
                    else:
                        values = np.array(mygroup[elem])
                        myname = sanitize_varchar(elem)
                        tmp = values_from_list_to_dict (values, myname, total_summary_episodes)
                        mydict.update(tmp)

                    starts = np.array(mygroup[variable_event_start])
                    ends = np.array(mygroup[variable_event_end])
                    diff = (ends.astype('datetime64[us]') - starts.astype('datetime64[us]')).astype('timedelta64[D]')
                    diff = diff.astype(int)
                    tmp = values_from_list_to_dict (diff, 'duration_days', total_summary_episodes)
                    mydict.update(tmp)
        
        
        if total_number_of_years > 0:
            for year_index in range(len(year_range)):
                year = year_range[year_index]
                year_min_date = pd.to_datetime(dt.datetime(year,1,1))
                year_max_date = pd.to_datetime(dt.datetime(year,12,31))
                if max_date is not None:
                    year_max_date = min(year_max_date, max_date)

                # Select episodes that end within or after this year
                mygroup_year = mygroup.loc[mygroup[variable_event_end] >= year_min_date].copy()

                # crop start and end dates of episodes that are extend outside the year
                mygroup_year.loc[mygroup[variable_event_end] > year_max_date, variable_event_end]  = year_max_date
                mygroup_year.loc[mygroup[variable_event_start] < year_min_date, variable_event_start]  = year_min_date

                # save total number of episodes
                mydict['year_'+str(year_index)+'_n_episodes'] = float(len(mygroup_year))

                # save total duration of all episodes
                starts = np.array(mygroup_year[variable_event_start])
                ends = np.array(mygroup_year[variable_event_end])
                diff = (ends.astype('datetime64[us]') - starts.astype('datetime64[us]')).astype('timedelta64[D]')
                total_duration_days = np.nansum (diff)
                mydict ['year_'+str(year_index)+'_duration_days'] = total_duration_days.astype(int)

                # Calculate sum, mean, min and max of all numeric variables in total_summary_variables
                for elem in total_summary_variables_num:
                    values = np.array(mygroup_year[elem])
                    values = values[~pd.isnull(values)]
                    myname = sanitize_varchar(str(elem))
                    if len(values) > 0:
                        values = values.astype(float)
                        mydict['year_'+str(year_index)+'_sum_'+myname] = np.sum(values)
                        mydict['year_'+str(year_index)+'_mean_'+myname] = np.mean(values)
                        mydict['year_'+str(year_index)+'_max_'+myname] = np.max(values)
                        mydict['year_'+str(year_index)+'_min_'+myname] = np.min(values)
                    else:
                        mydict['year_'+str(year_index)+'_sum_'+myname] = np.nan
                        mydict['year_'+str(year_index)+'_mean_'+myname] = np.nan
                        mydict['year_'+str(year_index)+'_max_'+myname] = np.nan
                        mydict['year_'+str(year_index)+'_min_'+myname] = np.nan

                # Calculate count and duration in days of levels of all character (aka nominal) variables in total_summary_variables
                for elem in total_summary_variables_char:
                    res = mygroup_year[elem].value_counts()
                    for value in res.index:
                        if value != 'NULL':
                            myname = sanitize_varchar(str(elem))
                            myvalue = sanitize_varchar(str(value))
                            mydict ['year_'+str(year_index)+'_n_'+myname+'_'+myvalue] = res.loc[value]

                            # this is slow
                            if calculate_total_duration_days_of_levels == True:
                                mydf = mygroup[mygroup[elem] == value]
                                starts = np.array(mydf[variable_event_start])
                                ends = np.array(mydf[variable_event_end])
                                diff = (ends.astype('datetime64[us]') - starts.astype('datetime64[us]')).astype('timedelta64[D]')
                                diff = np.nansum (diff)
                                mydict ['year_'+str(year_index)+'_duration_days_'+myname+'_'+myvalue] = diff.astype(int)
                    
        ret.append(mydict)
    ret = pd.DataFrame(ret)
    return ret

# Select PPSN, Flat/Transpose & SQL Upload

## Select PPSN
datadictionary = pd.read_csv(ddictionaryfilename)

variable_event_start = (datadictionary[datadictionary.EventStart == 1].Variable).tolist()[0]
variable_event_end = (datadictionary[datadictionary.EventEnd == 1].Variable).tolist()[0]
variable_uid = (datadictionary[datadictionary.UID == 1].Variable).tolist()[0]

print ('Select %s in date %s' %(variable_uid , query_date))
start_time = time.time()

# Select UIDs (PPSNs) that have open open episode with variable_event_type (hist_lr) not variable_event_type_value (EMPL)
sql_query_string = "SELECT DISTINCT " + variable_uid + " FROM " + jld_sql_table 
if query_date not in ['', None] and variable_event_type not in ['',None]  and variable_event_type_value not in ['',None]:
    sql_query_string = sql_query_string + " WHERE " + variable_event_start + " < \'" + query_date + "\'"
    sql_query_string = sql_query_string + " and " + variable_event_end + " > \'" + query_date + "\'"
    sql_query_string = sql_query_string + " and " + variable_event_type + " "+ variable_event_type_condition + "\'" + str(variable_event_type_value) + "\'"

print ('\n%s\n' %(sql_query_string))

# Connect to SQL
params = urllib.parse.quote_plus(odbc_connection_string)
engine = create_engine('mssql+pyodbc:///?odbc_connect=%s' % params)
conn = engine.connect().connection

selected_uids = pd.read_sql_query(sql_query_string, engine)

conn.close()

selected_uids = selected_uids.ppsn.tolist()
print ('Selected %d in date %s' %(len(selected_uids), query_date))
elapsed_time = time.time() - start_time
print ('Elapsed time: '+ time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))

# Transpose/Flat
selected_variables= []
selected_variables = datadictionary[datadictionary.EventStart == 1].Variable
selected_variables = selected_variables.append(datadictionary[datadictionary.EventEnd == 1].Variable)
selected_variables = selected_variables.append(datadictionary[datadictionary.DOB == 1].Variable)
selected_variables = selected_variables.append(datadictionary[datadictionary.UID == 1].Variable)
selected_variables = selected_variables.append(datadictionary[datadictionary.Pinfo == 1].Variable)
selected_variables = selected_variables.append(datadictionary[datadictionary.TotalSummary == 1].Variable)
selected_variables = selected_variables.append(datadictionary[datadictionary.EpisodeSummary == 1].Variable)
selected_variables = list(set(selected_variables.tolist()))
selected_variables = [x for x in selected_variables if x is not None]

# Add variables used to sort episodes, if not present already
if 'hist_lr' not in  selected_variables:
    selected_variables.append('hist_lr')
if 'lr_flag' not in  selected_variables:
    selected_variables.append('lr_flag')

selected_variables = ','.join(selected_variables)

batch_size = int(len(selected_uids)/50)
count = 0
processed = 0
flat_jld = pd.DataFrame()
start_time = time.time()
for batch_uids in batch(selected_uids, n=batch_size):
    looptime = time.time()
    uids = tuple(batch_uids)
    
    count = count + 1
    print ('\nIteration %d, processing %d %s' %(count, len(uids), variable_uid))
    
    # Retrieve data for selected UIDs
    # Drop rows that have start_date >= end_date and start_date > query_date
    print ("\tQuery Data")
    proctime = time.time()
    sql_query_string = "SELECT "+ selected_variables + " FROM " + jld_sql_table 
    sql_query_string = sql_query_string + " WHERE " + variable_uid + " IN " + str(uids)
    if query_date not in ['', None]:
        sql_query_string = sql_query_string + " and " + variable_event_start + " < \'" + query_date + "\'"
    sql_query_string = sql_query_string + " and " + variable_event_start + " < " + variable_event_end 
    
    # Connect to SQL and retrieve data
    params = urllib.parse.quote_plus(odbc_connection_string)
    engine = create_engine('mssql+pyodbc:///?odbc_connect=%s' % params)
    conn = engine.connect().connection

    data = pd.read_sql_query(sql_query_string, engine)
    
    # Close SQL connection
    conn.close()
    elapsed_time = time.time() - proctime
    print ('\t\tProcedure time: '+ time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))
    
    # Flat/Transpose JLD
    print ("\tTranspose Data")
    proctime = time.time()
    mydate =  None
    if query_date not in ['', None]:
        mydate = dt.datetime.strptime(query_date, '%Y%m%d')
        
    flat_df =  transpose_jld(data = data, 
                             dictionary= datadictionary, 
                             total_summary_episodes= n_episodes, 
                             total_number_of_years = n_years,
                             max_date = mydate,
                             calculate_total_duration_days_of_levels = calculate_total_duration_days_of_levels)
    elapsed_time = time.time() - proctime
    print ('\t\tProcedure time: '+ time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))
    
    # Append data
    print ("\tAppend Data")
    proctime = time.time()
    flat_jld = flat_jld.append(flat_df, ignore_index=True)
    elapsed_time = time.time() - proctime
    print ('\t\tProcedure time: '+ time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))

    # Print info
    processed = processed + len(uids)
    print ('Processed %d/%d %s' %(processed, len(selected_uids), variable_uid))
    elapsed_time = time.time() - looptime
    print ('Iteration time: '+ time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))
    

elapsed_time = time.time() - start_time
print ('Transpose/Flat Total Elapsed time: '+ time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))

# Load into SQL
# Connect to SQL
mytime = time.time()
params = urllib.parse.quote_plus(odbc_connection_string)
engine = create_engine('mssql+pyodbc:///?odbc_connect=%s' % params)
conn = engine.connect().connection
cursor = conn.cursor()

# Drop table if exists
sql_string_drop = "IF OBJECT_ID('"+ flat_jld_sql_table + "', 'U') IS NOT NULL" +'\n'+ "DROP TABLE " + flat_jld_sql_table
cursor.execute(sql_string_drop)
conn.commit()
conn.close()

# Connect to SQL
params = urllib.parse.quote_plus(odbc_connection_string)
engine = create_engine('mssql+pyodbc:///?odbc_connect=%s' % params)
conn = engine.connect().connection

# SpeedUp For fast execution of mutiple row 
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

#upload data
print('\nUploading to SQL')
sql_chunksize = 10000
flat_jld.to_sql(flat_jld_sql_table, engine, if_exists='append', index=False, chunksize=sql_chunksize)
#Close SQL Connection
conn.close()

elapsed_time = time.time() - mytime
print ('Upload time: '+ time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))

print ('\nALL DONE')