# Data Cleaning & Locating Multivalued and Duplicate records (single csv file)
### Saksham Gakhar, DA - DKSF

Keep changing the input csv file and look for duplicate and multivalued records, enlist devices that generally misbehave...

In [241]:
import numpy as np 
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import cm
from collections import defaultdict
import datetime
# without mpld3
%matplotlib notebook 

In [242]:
def readCSV(dt):
    """
        Read the CSV file into a dataframe for a YYYY-MM (dt)
        arg: dt -- string with format YYYY-MM
    """
    folder = '2020-07-06-DataKind/'
    filename = 'output-' + str(dt) + '-01T00_00_00+00_00.csv'
    df = pd.read_csv(folder+filename)
    df.when_captured = pd.to_datetime(df.when_captured)

    # Need to change the format of the Time Stamp for all the measurements in the raw data

    df.service_uploaded =  df.service_uploaded.apply(lambda x: \
                            datetime.datetime.strptime(x, '%b %d, %Y @ %H:%M:%S.%f')\
                            .replace(tzinfo=datetime.timezone.utc))
    #### Add a column for the year
    df['year'] = pd.DatetimeIndex(df['when_captured']).year
    df.info()
    df[0:5]
    return df

Based on above table for (`device`, `when_captured`) key, let's see what these multiple values for each time stamp correspond to. Sometimes there are negative RH, sometimes 0.0 PM (which measn very clean air)

In [243]:
def findBadData(df):
    
    temp_df = df.groupby(['device_urn', 'device_sn','when_captured']).size().to_frame('size').\
                                    reset_index().sort_values('size', ascending=False)
    print("bad device data counts: ")
    badRecords = temp_df[(temp_df['size']>1)]
    print(badRecords)
    
    print("all bad device list: ")
    # Devices that have misbehaved at some point - more than one data values per time stamp
    print(np.unique(temp_df[temp_df['size']>1]['device_sn'].values)) # devices that have misbehaved
    
    return badRecords

## Data Cleansing based on [Protocol](https://github.com/DataKind-SF/safecast/blob/master/Solarcast_data_cleansing.md)

In [244]:
def rmInvalidTimeStamps(df):
    """
    remove invalid time stamped records
    """
    
    ## remove records with NULL `when_captured`
    print("Null date records to remove: ", df['when_captured'].isna().sum())
    df = df[df['when_captured'].notna()]
    print("df shape after remove records with NULL `when_captured` : ",df.shape)

    ## remove records where `when_captured` is an invalid
    boolean_condition = df['when_captured'] >  pd.to_datetime(2000/1/19, infer_datetime_format=True).tz_localize('UTC')
    print("Valid `when_captured`  entires: ", boolean_condition.sum())
    df = df[df['when_captured'] >  pd.to_datetime(2000/1/19, infer_datetime_format=True).tz_localize('UTC')]
    print("df shape after remove records where `when_captured` is an invalid : ",df.shape)

    ## remove records where gap of `service_uploaded` and `when_captured` > 7 days
    boolean_condition = abs(df['when_captured'].subtract(df['service_uploaded'])).astype('timedelta64[D]') < 7
    boolean_condition.shape
    print("Lag 7 days to remove: ",df.shape[0] - (boolean_condition).sum())
    df = df[boolean_condition]
    print("df shape after records where gap of `service_uploaded` and `when_captured` > 7 days : ",df.shape)
    
    return df

In [245]:
def imputeInaccurateRH(df):
    """ 
    impute data with NaN(missing) for inaccurate values of RH
    """
    
    boolean_condition = (df['env_humid']<0) | (df['env_humid']>100)
    column_name = 'env_humid'
    new_value = np.nan
    df.loc[boolean_condition, column_name] = new_value
    print("Inaccurate RH records imputed: ", boolean_condition.sum())
    
    return df

In [246]:
def dropServiceUploaded(df):
    """
    Inplace dropping of the 'service_uploaded' column
    """
    df.drop('service_uploaded', axis=1, inplace=True)

Drop Duplicates

In [247]:
def rmDuplicates(df):
    """
    Inplace dropping of duplicates
    preserve a single copy of duplicative rows
    """
    incoming = df.shape[0]
    df.drop_duplicates(subset=df.columns[0:df.shape[1]], inplace=True, keep='first') # args: subset=[df.columns[0:df.shape[1]]], keep = 'first'
    print("Number of duplicative entries removed : ", -df.shape[0]+incoming)

# #testing inplace = True and no return in fucntion above
# df = pd.DataFrame(np.arange(12).reshape(3, 4),columns=['A', 'B', 'C', 'D'])
# df.loc[-1] = [0, 1, 2, 3] 
# df
# rmDuplicates(df)
# df

### Filtering bad row records

In [248]:
def dataAggWithKey(df):
    """
    Aggregate the df based on key: 'device_sn','when_captured'
    arg: df - incoming dataframe
    return: datframe with COUNTS and COUNT-DISTINCTS for each key
    """
    # STEP 1: Aggregate the dataframe based on key
    
    temp_df = df.groupby(['device_sn','when_captured']).agg(['count','nunique'])
    # temp_df.info()
    num_groups = temp_df.shape[0]
    print("num_groups  is : ", num_groups)

    # STEP 2: Merge Counts and Count-Distincts to check for duplicative records and multiplicities

    even = list(range(0,26,2))
    odd = list(range(1,26,2))
    tmp_df1 = temp_df.iloc[:,even].max(axis=1).to_frame('COUNTS').reset_index()
    tmp_df2 = temp_df.iloc[:,odd].max(axis=1).to_frame('DISTINCTS').reset_index()
    print(tmp_df1.shape, tmp_df2.shape)
    merged = pd.merge(tmp_df1, tmp_df2, left_on = ['device_sn', 'when_captured'], \
                      right_on=['device_sn', 'when_captured'])
    merged.head
    return merged, num_groups

### Calculating hits: Impose mutually exclusive conditions for filtering

In [249]:
def identifyALLNanRecs(merged):
    """
        Actionable: Records of useless data with all NaNs
        args: incoming datframe with COUNTS and COUNT-DISTINCTS for each key
        return : keys dataframe ('device_sn', 'when_captured') to remove later
    """
    bool1 = (merged.COUNTS >1) & (merged.DISTINCTS==0)
    sum1 = bool1.sum()
    print(sum1)
    toDiscard1 = merged.loc[:,['device_sn', 'when_captured']][bool1]
    toDiscard1.shape
    return sum1, toDiscard1

In [250]:
def identifyMultivaluedTimeStamps(merged):
    """
        Actionable: Records that are a mix of duplicates and non-duplicate rows 
        for a given (`device_sn`, `when_captured`) [must be all discarded]
        args: incoming datframe with COUNTS and COUNT-DISTINCTS for each key
        return : keys dataframe ('device_sn', 'when_captured') to remove later
    """
    bool3 = (merged.COUNTS >1) & (merged.DISTINCTS>1)
    sum3 = bool3.sum()
    print(sum3)
    toDiscard3 = merged.loc[:,['device_sn', 'when_captured']][bool3]
    toDiscard3.shape
    return sum3, toDiscard3

In [251]:
def identifyRemainingDupl(merged):
    """
        NOT Actionable as duplicates were dropped: 
        Records where all rows are purely duplicates [preserve only 1 later]
        args: incoming datframe with COUNTS and COUNT-DISTINCTS for each key
    """
    bool2 = (merged.COUNTS >1) & (merged.DISTINCTS==1)
    sum2 = bool2.sum()
    print("remaining duplicates check : " ,merged.COUNTS[bool2].sum() - merged.DISTINCTS[bool2].sum())
    return sum2

In [252]:
def goodTimeStamps(merged):
    """
        Records that are good
    """
    bool4 = (merged.COUNTS ==1) & (merged.DISTINCTS==1)
    sum4 = bool4.sum()
    print('good records : ', sum4)
    return sum4

In [253]:
def writeDF(dframe, descrpt):
    """
        write multivalued timestamps' keys to a csv
        args: dframe to write
        descrpt: string with descripttion to append to file
    """
    dframe.info()
    print("written records count : ", dframe.shape[0])
    dframe.to_csv(str(dt) + '-01_anomalies_' + str(descrpt) + '_.csv')

### Discard bas data now from the main dataframe


In [254]:
def filterRows(toDiscard1, toDiscard3, df):
    """
        Inplace discarding of rows based on allNaN record keys (in df : toDiscard1)
        and rows based on MultivaluedTimeStamps keys (in df : toDiscard3)
        from original dataframe: df
        args:
            toDiscard1: allNaN record keys
            toDiscard3: MultivaluedTimeStamps keys
            df: original dataframe
    """
    # STEP 1 : 
    # all tuples of keys to be discarded
    discard = pd.concat([toDiscard1, toDiscard3], ignore_index=True)
    discard['KEY_DevSN_WhenCapt'] = list(zip(discard.device_sn, discard.when_captured))
    print(df.shape, discard.shape)

    # STEP 2 :
    # tuples of all keys in the dataframe
    df['KEY_DevSN_WhenCapt'] = list(zip(df.device_sn, df.when_captured))
    df.shape

    # STEP 3 : 
    # discard the rows
    rows_to_discard = df['KEY_DevSN_WhenCapt'].isin(discard['KEY_DevSN_WhenCapt'])
    print("these many rows to discard: ", rows_to_discard.sum())

    incoming = df.shape[0]
    df = df[~rows_to_discard]
    print(incoming - df.shape[0])
    
    return df

## Run cleaning algo functions

In [255]:
dates = pd.date_range('2017-09-01','2020-07-01', freq='MS').strftime("%Y-%m").tolist()
dt = dates[0]

In [256]:
df = readCSV(dt)
findBadData(df)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 64655 entries, 0 to 64654
Data columns (total 16 columns):
 #   Column            Non-Null Count  Dtype              
---  ------            --------------  -----              
 0   service_uploaded  64655 non-null  datetime64[ns, UTC]
 1   when_captured     61839 non-null  datetime64[ns, UTC]
 2   device_urn        64655 non-null  object             
 3   device_sn         64655 non-null  object             
 4   device            64655 non-null  int64              
 5   loc_lat           64655 non-null  float64            
 6   loc_lon           64655 non-null  float64            
 7   env_temp          36580 non-null  float64            
 8   env_humid         36580 non-null  float64            
 9   pms_pm01_0        36899 non-null  object             
 10  pms_pm02_5        36898 non-null  object             
 11  pms_pm10_0        36897 non-null  object             
 12  lnd_7318c         55230 non-null  float64            
 13  l

Unnamed: 0,device_urn,device_sn,when_captured,size
5634,safecast:114699387,Solarcast #30023,2017-09-11 08:30:04+00:00,5
5639,safecast:114699387,Solarcast #30023,2017-09-11 14:45:03+00:00,5
5635,safecast:114699387,Solarcast #30023,2017-09-11 09:45:04+00:00,5
5636,safecast:114699387,Solarcast #30023,2017-09-11 11:00:03+00:00,5
5637,safecast:114699387,Solarcast #30023,2017-09-11 12:15:03+00:00,5
5638,safecast:114699387,Solarcast #30023,2017-09-11 13:30:03+00:00,5
5633,safecast:114699387,Solarcast #30023,2017-09-11 06:00:04+00:00,5
5641,safecast:114699387,Solarcast #30023,2017-09-11 16:00:03+00:00,3
31971,safecast:3714913954,Solarcast #30027,2017-09-29 09:22:50+00:00,2
32368,safecast:3714913954,Solarcast #30027,2017-09-30 13:52:50+00:00,2


In [257]:
df = rmInvalidTimeStamps(df)
print("new df: ", df.shape)

Null date records to remove:  2816
df shape after remove records with NULL `when_captured` :  (61839, 16)
Valid `when_captured`  entires:  61839
df shape after remove records where `when_captured` is an invalid :  (61839, 16)
Lag 7 days to remove:  0
df shape after records where gap of `service_uploaded` and `when_captured` > 7 days :  (61839, 16)
new df:  (61839, 16)


In [258]:
df = imputeInaccurateRH(df)
print("new df: ", df.shape)

Inaccurate RH records imputed:  9102
new df:  (61839, 16)


In [259]:
dropServiceUploaded(df)
print("new df: ", df.shape)

new df:  (61839, 15)


In [260]:
rmDuplicates(df)
print("new df: ", df.shape)

Number of duplicative entries removed :  8
new df:  (61831, 15)


In [261]:
merged,num_groups = dataAggWithKey(df)
print("merged: ", merged.shape)
print("num_groups : ", num_groups)

num_groups  is :  61801
(61801, 3) (61801, 3)
merged:  (61801, 4)
num_groups :  61801


In [262]:
sum1, toDiscard1 = identifyALLNanRecs(merged)
sum3, toDiscard3 = identifyMultivaluedTimeStamps(merged)
sum2 = identifyRemainingDupl(merged)
sum4 = goodTimeStamps(merged)
print("toDiscard1 shape: ",toDiscard1.shape)
print("toDiscard3 shape: ",toDiscard3.shape)

0
8
remaining duplicates check :  0
good records :  61793
toDiscard1 shape:  (0, 2)
toDiscard3 shape:  (8, 2)


In [263]:
# sanityCheck(): ensure you have all records covered by 1 of the 4 conditions
assert(num_groups == sum1+sum2+sum3+sum4)

In [264]:
writeDF(toDiscard3, 'MultivaluedTimeStamps')

<class 'pandas.core.frame.DataFrame'>
Int64Index: 8 entries, 40839 to 40847
Data columns (total 2 columns):
 #   Column         Non-Null Count  Dtype              
---  ------         --------------  -----              
 0   device_sn      8 non-null      object             
 1   when_captured  8 non-null      datetime64[ns, UTC]
dtypes: datetime64[ns, UTC](1), object(1)
memory usage: 192.0+ bytes
written records count :  8


In [265]:
df = filterRows(toDiscard1, toDiscard3, df)
print("final df shape: ", df.shape)

(61831, 15) (8, 3)
these many rows to discard:  38
38
final df shape:  (61793, 16)


### Now check to make sure no garbage data is left

In [266]:
badRecordsLeft = findBadData(df)
badRecordsLeft
assert(badRecordsLeft.empty)

bad device data counts: 
Empty DataFrame
Columns: [device_urn, device_sn, when_captured, size]
Index: []
all bad device list: 
[]
