In [31]:
import dask.dataframe as dd
import dask.bag as db
import pandas as pd
import json
import glob

In [4]:
# Set datapaths

dicepath = '/Users/daniellee/Jupyter/utils/test_data/dice/05'
hapath = '/Users/daniellee/Jupyter/utils/test_data/home_advisor'
drspath = '/Users/daniellee/Data/device_risk_score/data_v1/raw/raw.csv'

In [431]:
import pandas as pd
import numpy as np
from glob import glob
    
class CSVCreateConfig:
    
    def __init__(self, path):
        """
        :param path:
        :parm nrows:
        
        """ 
        
        from glob import glob
        
        self.path = path
        
        self.multiplefiles = False
        filepath = glob(path)
        
        if len(filepath) > 1: 
            print('Multiple files detected...')
            self.multiplefiles = True
            
    def infer_dt(self, column, dtype):
        
        if dtype in ['int','float']:
            return False
        
        size = column.shape[0]
        sample_size = int(size * 0.05)
        sampled_rows = column.sample(sample_size, replace=False)
        
        total = 0
        
        for v in sampled_rows.values:
            if isinstance(v, (float, int)):
                v = str(v)
            
            if column.dtype == 'object':
                total += 1
            if '-' in v or ':' in v:
                total += 1
            if len(v) < 30:
                total += 1
            if len(v.split('.')) > 1:
                total += 1
            if len(v.split('-')) == 3:
                total += 1
            if len(v.split(':')) == 3:
                total += 1
                
        if total / (sample_size * 6) > 0.5:
            return True
        return False
    

    def infer_special_case(self, column, name, nunique, dtype):
        
        if dtype in ['int','float']:
            return False
        
        size = column.shape[0]
        sample_size = int(size * 0.05)
        sampled_rows = column.sample(sample_size, replace=False)
        
        total = 0
        for v in sampled_rows.values:
            if column.dtype == 'object':
                total += 1
            if isinstance(v, (list, dict)):
                total += 1
            if nunique > 500:
                total += 1
            if len(str(v)) > 30:
                total += 1
        
        if total / (sample_size * 4) > 0.5:
            return True
        return False
    
    def infer_cat(self, column, name, nunique):
        
        size = column.shape[0]
        sample_size = int(size * 0.05)
        sampled_rows = column.sample(sample_size, replace=False)
        
        total = 0
        for v in sampled_rows.values:
            if isinstance(v, int) or isinstance(v, float):
                v = str(v)
            if column.dtype == 'object':
                total += 1
            if isinstance(v, (str)):
                total += 1
            if nunique < 500:
                total += 1
            if len(str(v)) < 5:
                total += 1
        
        if total / (sample_size * 4) > 0.5:
            return True
        return False
    
    def infer_cont(self, column, name, nunique):
        
        size = column.shape[0]
        sample_size = int(size * 0.05)
        sampled_rows = column.sample(sample_size, replace=False)
        
        total = 0
        for v in sampled_rows.values:
            if column.dtype == 'int' or column.dtype == 'float':
                total += 1
            if nunique > 500:
                total += 1
            if any(x in name for x in ['max','min','avg','mean','average','count','sum','total','computation_time']):
                total += 1
            if  len(str(v)) < 15:
                total += 1
        
        if total / (sample_size * 4) > 0.5:
            return True
        return False
            
    def assign_impute_vartype(self, data, **params):
        """
        :param opath: 
        :param nrows: 
        
        :return:  
        """
        
        impute_dict = dict()
        for c in params['columns']:
            nunique = params['nunique'][c]
            dtype = params['dtypes'][c]
            null  = params['nulls'][c]
            
            if nunique == 1:
                impute_dict[c] = 'single_value'
            elif nunique == 2 or (nunique == 3 and null):
                impute_dict[c] = 'binary'
            elif dtype == 'datetime64[ns]' or dtype == 'Timestamp':
                impute_dict[c] = 'datetime'
            elif self.infer_dt(data[c], dtype):
                impute_dict[c] = 'datetime'
            elif nunique >= 3 and self.infer_special_case(data[c], c, nunique, dtype):
                impute_dict[c] = 'unknown'
            elif nunique >= 3 and self.infer_cat(data[c], c, nunique):
                impute_dict[c] = 'categorical'
            elif nunique >= 3 and self.infer_cont(data[c], c, nunique): 
                impute_dict[c] = 'continuous'
            else:
                impute_dict[c] = 'unknown'
            
        return impute_dict
    
    def single_file_process(self, path, skipids):
        """
        :param path:
        :param skipids:

        :return:  
        """
        
        sample = pd.read_csv(path, skiprows=skipids, low_memory=False)
        columns = sample.columns
        dtypes = sample.dtypes.to_dict()
        nulls = sample.isnull().any().to_dict()
        nunique = {k:(v + 1 if nulls[k] else v) 
                    for k, v in sample.nunique().items()}
        itypes = self.assign_impute_vartype(sample, 
                                            columns = columns,
                                            dtypes = dtypes,
                                            nulls = nulls,
                                            nunique = nunique)
    
        return [columns, itypes, dtypes, nulls]
                
    def generate(self, opath='', prows=0.10, id_vars=[]):
        """
        :param opath: 
        :param prows: 
        
        :return:  
        """
        
        filesize = len([1 for l in open(self.path)])
        rows_to_skip = np.random.choice(range(1,filesize), 
                                      size=int(filesize * (1-prows)), 
                                      replace=False)
        
        
        cols, itypes, dtypes, nulls = self.single_file_process(self.path, rows_to_skip)

        output = list()
        for c in cols:
            itype = itypes[c]
            dtype = dtypes[c]
            null = nulls[c]
            
            if c in id_vars:
                dtype = '_id'
            if itype in ['categorical']:
                dtype = 'category'
            
            output.append({'field': c, 
                           'imputetype': itype, 
                           'dtype': dtype,
                           'null': null})
        
        return output
             
"""
[
{'column': 'name', 'null': True, 'dtype': <dtype>, 'imputetype': <imputetype>}
]
"""

cc = CSVCreateConfig('/Users/daniellee/Data/device_risk_score/data_v1/raw/raw.csv')
tempoutput=cc.generate('/Users/daniellee/Jupyter/utils/test_data/drs_Config.json')

TypeError: data type "Timestamp" not understood

In [385]:
tempoutput

[{'dtype': dtype('O'),
  'field': '__time',
  'imputetype': 'datetime',
  'null': False},
 {'dtype': dtype('float64'),
  'field': 'android_accelerometer_data',
  'imputetype': 'single_value',
  'null': True},
 {'dtype': 'category',
  'field': 'android_accelerometer_name',
  'imputetype': 'categorical',
  'null': True},
 {'dtype': 'category',
  'field': 'android_accelerometer_power',
  'imputetype': 'categorical',
  'null': True},
 {'dtype': 'category',
  'field': 'android_accelerometer_vendor',
  'imputetype': 'categorical',
  'null': True},
 {'dtype': 'category',
  'field': 'android_accelerometer_version',
  'imputetype': 'categorical',
  'null': True},
 {'dtype': 'category',
  'field': 'android_application_signature_set',
  'imputetype': 'categorical',
  'null': True},
 {'dtype': 'category',
  'field': 'android_available_browser_packages',
  'imputetype': 'categorical',
  'null': True},
 {'dtype': 'category',
  'field': 'android_available_call_packages',
  'imputetype': 'categorical'

In [359]:
t4 = pd.read_csv('/Users/daniellee/Data/device_risk_score/data_v1/raw/raw.csv', skiprows=rows_to_skip, low_memory=False)

In [386]:
testtype = {k['field']:k['dtype'] for k in tempoutput}

In [388]:
dd.read_csv('/Users/daniellee/Data/device_risk_score/data_v1/raw/raw.csv', dtype=testtype).head()

Unnamed: 0,__time,android_accelerometer_data,android_accelerometer_name,android_accelerometer_power,android_accelerometer_vendor,android_accelerometer_version,android_application_signature_set,android_available_browser_packages,android_available_call_packages,android_available_external_storage,...,user_agent_device_family,user_agent_os_family,user_agent_os_patch,user_agent_os_vendor,user_agent_type,user_cookie,user_cookie_computation_errors,user_cookie_computation_time_sum,user_session_cookie,flagged
0,2018-01-30T23:08:03.781Z,,,,,,,,,,...,Personal computer,Windows 7,,Microsoft Corporation.,BROWSER,356B171B-0270-4BB3-84CD-D0EA4BD5CF11,,0,1EDBC62F-C2FD-4D09-A212-200DDA01CBE5,1
1,2018-01-31T01:26:29.225Z,,,,,,,,,,...,Personal computer,Windows 7,,Microsoft Corporation.,BROWSER,E0861F0E-AACE-468B-91EF-09F55FD5B407,,0,7CE6A8F6-B492-405A-A38D-1EFAF3FCF9CC,1
2,2018-01-31T01:27:31.679Z,,,,,,,,,,...,Personal computer,Windows 7,,Microsoft Corporation.,BROWSER,E0861F0E-AACE-468B-91EF-09F55FD5B407,,0,7CE6A8F6-B492-405A-A38D-1EFAF3FCF9CC,1
3,2018-01-31T13:52:48.269Z,,,,,,,,,,...,Personal computer,Windows 7,,Microsoft Corporation.,BROWSER,243FD4D9-4754-409D-87C7-0E4FFB141D4A,,0,384F795E-A92D-4588-B21B-A279A5A334EC,1
4,2018-01-31T13:54:10.067Z,,,,,,,,,,...,Personal computer,Windows 7,,Microsoft Corporation.,BROWSER,243FD4D9-4754-409D-87C7-0E4FFB141D4A,,0,384F795E-A92D-4588-B21B-A279A5A334EC,1


In [443]:
import pandas as pd
import numpy as np
from glob import glob
    
class JSONCreateConfig:
    
    def __init__(self, path):
        """
        :param path:
        :parm nrows:
        
        """ 
        
        from glob import glob
        
        self.path = path
        
        self.multiplefiles = False
        self.filepaths = glob(path)
        
    def infer_dt(self, column, dtype):
        
        if dtype in ['int','float']:
            return False
        
        size = column.shape[0]
        sample_size = int(size * 0.05)
        sampled_rows = column.sample(sample_size, replace=False)
        
        total = 0
        
        for v in sampled_rows.values:
            if isinstance(v, int) or isinstance(v, float) or v == None or isinstance(v, list):
                v = str(v)
            if column.dtype == 'object':
                total += 1
            if '-' in v or ':' in v:
                total += 1
            if len(v) < 30:
                total += 1
            if len(v.split('.')) > 1:
                total += 1
            if len(v.split('-')) == 3:
                total += 1
            if len(v.split(':')) == 3:
                total += 1
                
        if total / (sample_size * 6) > 0.5:
            return True
        return False
    
    def infer_special_case(self, column, name, nunique, dtype):
        
        if dtype in ['int','float']:
            return False
        
        size = column.shape[0]
        sample_size = int(size * 0.05)
        sampled_rows = column.sample(sample_size, replace=False)
        
        total = 0
        for v in sampled_rows.values:
            if isinstance(v, int) or isinstance(v, float) or v == None or isinstance(v, list):
                v = str(v)
            if column.dtype == 'object':
                total += 1
            if isinstance(v, (list, dict)):
                total += 1
            if nunique > 500:
                total += 1
            if len(str(v)) > 30:
                total += 1
        
        if total / (sample_size * 4) > 0.5:
            return True
        return False
    
    def infer_cat(self, column, name, nunique):
        
        size = column.shape[0]
        sample_size = int(size * 0.05)
        sampled_rows = column.sample(sample_size, replace=False)
        
        total = 0
        for v in sampled_rows.values:
            if isinstance(v, int) or isinstance(v, float) or v == None or isinstance(v, list):
                v = str(v)
            if column.dtype == 'object':
                total += 1
            if isinstance(v, (str)):
                total += 1
            if nunique < 500:
                total += 1
            if len(str(v)) < 5:
                total += 1
        
        if total / (sample_size * 4) > 0.5:
            return True
        return False
    
    def infer_cont(self, column, name, nunique):
        
        size = column.shape[0]
        sample_size = int(size * 0.05)
        sampled_rows = column.sample(sample_size, replace=False)
        
        total = 0
        for v in sampled_rows.values:
            if isinstance(v, int) or isinstance(v, float) or v == None or isinstance(v, list):
                v = str(v)
            if column.dtype == 'int' or column.dtype == 'float':
                total += 1
            if nunique > 500:
                total += 1
            if any(x in name for x in ['max','min','avg','mean','average','count','sum','total','computation_time']):
                total += 1
            if  len(str(v)) < 15:
                total += 1
        
        if total / (sample_size * 4) > 0.5:
            return True
        return False
    
    def assign_impute_vartype(self, data, **params):
        """
        :param opath: 
        :param nrows: 
        
        :return:  
        """
        
        impute_dict = dict()
        for c in params['columns']:
            nunique = params['nunique'][c]
            dtype = params['dtypes'][c]
            null  = params['nulls'][c]

            if nunique == 1:
                impute_dict[c] = 'single_value'
            elif nunique == 2 or (nunique == 3 and null):
                impute_dict[c] = 'binary'
            elif dtype == 'datetime64[ns]':
                impute_dict[c] = 'datetime'
            elif self.infer_dt(data[c], dtype):
                impute_dict[c] = 'datetime'
            elif nunique >= 3 and self.infer_special_case(data[c], c, nunique, dtype):
                impute_dict[c] = 'unknown'
            elif nunique >= 3 and self.infer_cat(data[c], c, nunique):
                impute_dict[c] = 'categorical'
            elif nunique >= 3 and self.infer_cont(data[c], c, nunique): 
                impute_dict[c] = 'continuous'
            else:
                impute_dict[c] = 'unknown'
            
        return impute_dict
    
    def read_multiple_json(self, paths):
        def read_json(path):
            return pd.read_json(path, lines=True)
        return pd.concat(map(read_json, paths))
    
    def multiple_file_process(self, path):
        """
        :param path:
        :param skipids:

        :return:  
        """
        
        sample = self.read_multiple_json(path)
        columns = sample.columns
        dtypes = sample.dtypes.to_dict()
        nulls = sample.isnull().any().to_dict()
        
        nunique = dict()
        for c in columns:
            try:
                v = sample[c].nunique()
                nunique[c] = v + 1 if nulls[k] else v
            except:
                nunique[c] = sample[c].astype('str').nunique()
        itypes = self.assign_impute_vartype(sample, 
                                            columns = columns,
                                            dtypes = dtypes,
                                            nulls = nulls,
                                            nunique = nunique)
    
        return [columns, itypes, dtypes, nulls]
                
    def generate(self, opath='', pfile=0.2, id_vars=[]):
        """
        :param opath: 
        :param prows: 
        
        :return:  
        """
        
        filecount = len(self.filepaths)
        files_to_select = np.random.choice(self.filepaths, 
                                      size=int(filecount * (pfile)), 
                                      replace=False)
        cols, itypes, dtypes, nulls = self.multiple_file_process(files_to_select)

        output = list()
        for c in cols:
            itype = itypes[c]
            dtype = dtypes[c]
            null = nulls[c]
            
            if c in id_vars:
                dtype = '_id'
            if itype in ['categorical']:
                dtype = 'category'
            
            output.append({'field': c, 
                           'imputetype': itype, 
                           'dtype': str(dtype),
                           'null': null})
            
        setting = {
            "setting": {
                "continuous": [],
                "categorical": [],
                "binary": [],
                "datetime": [],
                "single_value": [],
                "unknown": []
            },
            "variables": output
        }
        
        with open(opath, 'w') as outfile:
            print(setting)
            json.dumps(setting, outfile, sort_keys=True, indent=4, separators=(',', ': '))
        
        return setting
             
"""
[
{'column': 'name', 'null': True, 'dtype': <dtype>, 'imputetype': <imputetype>}
]
"""

jcc = JSONCreateConfig('/Users/daniellee/Jupyter/utils/test_data/dice/05/01/*Realtimesuccess.json.gz')
tempoutput=jcc.generate('/Users/daniellee/Jupyter/utils/test_data/dice_config.json')

{'setting': {'continuous': [], 'categorical': [], 'binary': [], 'datetime': [], 'single_value': [], 'unknown': []}, 'variables': [{'field': 'applies_to_1st_party', 'imputetype': 'categorical', 'dtype': 'category', 'null': True}, {'field': 'applies_to_3rd_party', 'imputetype': 'categorical', 'dtype': 'category', 'null': True}, {'field': 'bare_handle_emailaddress', 'imputetype': 'categorical', 'dtype': 'category', 'null': True}, {'field': 'bare_handle_emailaddress_eid', 'imputetype': 'categorical', 'dtype': 'category', 'null': True}, {'field': 'bare_handle_emailaddress_num_bad_user_per_bare_handle_email_lifetime', 'imputetype': 'categorical', 'dtype': 'category', 'null': True}, {'field': 'bare_handle_emailaddress_num_user_per_bare_handle_email_lifetime', 'imputetype': 'categorical', 'dtype': 'category', 'null': True}, {'field': 'bare_handle_emailaddress_sim_created_at', 'imputetype': 'datetime', 'dtype': 'datetime64[ns]', 'null': True}, {'field': 'bare_handle_emailaddress_sim_creation_da

TypeError: dumps() takes 1 positional argument but 2 positional arguments (and 3 keyword-only arguments) were given

In [442]:
o = '/Users/daniellee/Jupyter/utils/test_data/dice_config.json'
with open(o, 'w') as outfile:
    json.dumps(setting, outfile, sort_keys=True, indent=4, separators=(',', ': '))

{'setting': {'binary': [],
  'categorical': [],
  'continuous': [],
  'datetime': [],
  'single_value': [],
  'unknown': []},
 'variables': [{'dtype': 'category',
   'field': 'applies_to_1st_party',
   'imputetype': 'categorical',
   'null': True},
  {'dtype': 'category',
   'field': 'applies_to_3rd_party',
   'imputetype': 'categorical',
   'null': True},
  {'dtype': 'category',
   'field': 'bare_handle_emailaddress',
   'imputetype': 'categorical',
   'null': True},
  {'dtype': 'category',
   'field': 'bare_handle_emailaddress_eid',
   'imputetype': 'categorical',
   'null': True},
  {'dtype': 'category',
   'field': 'bare_handle_emailaddress_num_bad_user_per_bare_handle_email_lifetime',
   'imputetype': 'categorical',
   'null': True},
  {'dtype': 'category',
   'field': 'bare_handle_emailaddress_num_user_per_bare_handle_email_lifetime',
   'imputetype': 'categorical',
   'null': True},
  {'dtype': 'datetime64[ns]',
   'field': 'bare_handle_emailaddress_sim_created_at',
   'imputety

In [425]:
jcc2 = JSONCreateConfig('/Users/daniellee/Jupyter/utils/test_data/home_advisor/*Realtimesuccess.json.gz')
tempoutput2=jcc2.generate()

2
['/Users/daniellee/Jupyter/utils/test_data/home_advisor/2018-05-07T18H-Realtimesuccess.json.gz'
 '/Users/daniellee/Jupyter/utils/test_data/home_advisor/2018-05-07T19H-Realtimesuccess.json.gz']


KeyboardInterrupt: 

In [123]:
import glob
def read_json(x):
    return pd.read_json(x, lines=True)

df_json = pd.concat(map(read_json, glob.glob('/Users/daniellee/Jupyter/utils/test_data/dice/05/01/*Realtimesuccess.json.gz')))

In [139]:
df_json_dtypes = df_json.dtypes.to_dict()

In [138]:
df_json['device_id_ip_ews_country_iso_code'] = df_json['device_id_ip_ews_country_iso_code'].astype('category')

In [214]:
df_json.dtypes == 'datetime64[ns]'

applies_to_1st_party                                                    False
applies_to_3rd_party                                                    False
bare_handle_emailaddress                                                False
bare_handle_emailaddress_eid                                            False
bare_handle_emailaddress_num_bad_user_per_bare_handle_email_lifetime    False
bare_handle_emailaddress_num_user_per_bare_handle_email_lifetime        False
bare_handle_emailaddress_sim_created_at                                  True
bare_handle_emailaddress_sim_creation_date                              False
bare_handle_emailaddress_sim_frequently_used                            False
bare_handle_emailaddress_sim_updated_at                                  True
bare_handle_emailaddress_sim_updated_customer                           False
bare_handle_emailaddress_sim_updated_internal                           False
city                                                            

In [84]:
readfile = DaskReadFile()
data2 = readfile.read_csv('/Users/daniellee/Data/device_risk_score/data_v1/raw/raw.csv', dtype=dtype)

TypeError: read_csv() got an unexpected keyword argument 'dtype'

In [93]:
data2 = dd.read_csv('/Users/daniellee/Data/device_risk_score/data_v1/raw/raw.csv', dtype=dtype, assume_missing=True)

In [94]:
data2.head()

ValueError: could not convert string to float: 'MSM8974'

In [80]:
data2_csv = pd.read_csv('/Users/daniellee/Data/device_risk_score/data_v1/raw/raw.csv', nrows=10000)

  interactivity=interactivity, compiler=compiler, result=result)


In [82]:
dtype = data2.dtypes.to_dict()

In [92]:
data2_csv.apply(lambda x: 'MSM8974' in x, axis=0).sum()

0

In [209]:
data2.dtypes.unique()

array([dtype('O'), dtype('float64'), dtype('int64'), dtype('bool')],
      dtype=object)