SCRIPT_NAME: data_analysis.ipynb <p>
FUCTION: Process itron device data <p>
AUTHOR: Nioy Chakraborty <p>
VERSION: 1.0.1

In [12]:
try:
    import pandas as pd
    from datetime import datetime
    import awswrangler as wr
    import numpy as np
    import os
    import dask.dataframe as dd
    import s3fs
except:
    !pip install awswrangler dask s3fs
    import pandas as pd
    from datetime import datetime
    import awswrangler as wr
    import numpy as np
    import os
    import dask.dataframe as dd
    import s3fs

In [None]:
meta_json = {
	"house1": {
		"owner_name": "h.rieger",
		"test_times": {
			"test1": {
				"date": "",
				"start_time": "",
				"end_time": ""
			},
			"test2": {
				"date": "",
				"start_time": "",
				"end_time": ""
			},
			"test3": {
				"date": "",
				"start_time": "",
				"end_time": ""
			},
			"test4": {
				"date": "",
				"start_time": "",
				"end_time": ""
			}
		}
	},
	"house2": {
		"owner_name": "claus",
		"test_times": {
			"test1": {
				"date": "",
				"start_time": "",
				"end_time": ""
			},
			"test2": {
				"date": "",
				"start_time": "",
				"end_time": ""
			},
			"test3": {
				"date": "",
				"start_time": "",
				"end_time": ""
			},
			"test4": {
				"date": "",
				"start_time": "",
				"end_time": ""
			}
		}
	},
	"house3": {
		"owner_name": "juho",
		"test_times": {
			"test1": {
				"date": "",
				"start_time": "",
				"end_time": ""
			},
			"test2": {
				"date": "",
				"start_time": "",
				"end_time": ""
			},
			"test3": {
				"date": "",
				"start_time": "",
				"end_time": ""
			},
			"test4": {
				"date": "",
				"start_time": "",
				"end_time": ""
			}
		}
	}
}

In [13]:
s3_bucket_url = "s3://grohe-bigdata-itron-sdcard/h.rieger/csv/dump9.csv"

In [17]:
def read_csv(FILENAME):
    df = dd.read_csv(FILENAME,sep=';')
    
    # filter out entries where the number of duplicate entries is less than 4
    counts = df.groupby('Time').size()
    valid_timestamps = counts[counts == 4].index.compute()
    df = df[df['Time'].isin(valid_timestamps)]
    
    df = df.reset_index()
    df = df.drop(['Unnamed: 304','index'],axis=1)
    
    return df


def create_miliseconds(df):
    # convert the timestamp column to datetime
    df['Time'] = dd.to_datetime(df['Time'], format='%d/%m/%Y %H:%M:%S')

    # compute the millisecond offset for each row
    df['millisecond_offset'] = ((df.index % 4) * 250).astype(str)

    df['millisecond_offset'] = df['millisecond_offset'].str.pad(width=3, side='left', fillchar='0')

    # combine the date and time components with the millisecond offset to create the new time column
    df['Time'] = df['Time'].dt.strftime('%Y-%m-%dT%H:%M:%S.') + df['millisecond_offset'].astype(str) + 'Z'

    df['Time'] = dd.to_datetime(df['Time'])
    df['Time'] = df['Time'].dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
    
    return df


def create_valid_columns():
    capt_up_data = []
    capt_down_data = []
    dic_measurement = {}
    mapping_dict = {}
    
    Iton_Sensors = ['Time','Tdown/TAD(us)','Tup/TRD(us)','TransitT(us)',
                    'DT(us)','Flow(l/h)','uC temp','HSPLL Calib','Count',
                    'PGA gain','MaxUp','MaxDown']
    
    for i in range(0,144):
        valup = "CaptUp"+str(i)
        valdn = "CaptDn"+str(i)
        capt_up_data.append(valup)
        capt_down_data.append(valdn)
        
    Iton_Sensors.extend(capt_up_data)
    Iton_Sensors.extend(capt_down_data)
    
    dic_measurement['itronSensors'] = Iton_Sensors[1:]
    dic_measurement['signalUp'] = capt_up_data
    dic_measurement['signalDown'] = capt_down_data
    
    # Create a mapping dictionary that maps each value in the relevant column to its corresponding key
    for key, values in dic_measurement.items():
        for value in values:
            mapping_dict[value] = key
            
            
    return Iton_Sensors, mapping_dict


def create_features(df,mapping_dict,FILENAME):
    # Create the 'measurement' column by applying the mapping function to the relevant column in the dataframe
    df['_measurement'] = df['level_1'].map(mapping_dict)

    # create house ID column from s3 bucket url
    df['house_id'] = FILENAME.split('/')[3]
    
    # Compute the number of unique values in column 'Time'
    n = df['Time'].nunique()
    
    # Create a dictionary that maps each unique value in column 'Time' to an ID
    unique_A_values = df['Time'].unique()
    id_dict = {value: idx % n for idx, value in enumerate(unique_A_values)}
    
    # Map the dictionary to column 'Time' to create a new column ID named as 'point'
    df['point'] = df['Time'].map(id_dict)
    
    
    df = df.rename(columns = {'level_1':'_field',0:'_value','Time':'_time'})
    df_final = df[['point','_measurement','house_id','_time','_field','_value']].copy()
    
    return df_final


def write_annotated_csv(df):
    # Create a list of the annotations to add to the beginning of the file
    annotation1 = '#datatype,long,measurement,tag,dateTime:RFC3339,string,double\n'
    
    annotation2 = '#group,false,false,true,false,true,false\n'
    

    df_string = annotation1 + annotation2 + df.to_csv()

    # save the DataFrame as a CSV file
    with open('annotated_file.csv', 'w') as f:
        f.write(df_string)

#ANOTHER WAY 2 
#     # Open a new file to write the annotated CSV data
#     with open('annotated.csv', 'w') as outfile:
#         writer = csv.writer(outfile)

#         # Write the annotations to the beginning of the file
#         for annotation in annotations:
#             writer.writerow([annotation])

#         # Write the original CSV data to the file
#         for row in df:
#             writer.writerow(row)

# ANOTHER WAY 3
# Write the DataFrame to an annotated CSV file
# specifying the line_terminator parameter as '\n' to match the InfluxDB line protocol.

# df.to_csv('data_annotated.csv', index=False, header=False, line_terminator='\n')

In [18]:
def main(FILENAME):
    # read file from s3 location
    df = read_csv(FILENAME)
    
    # add miliseconds to time and create RFC3339 time format 
    df = create_miliseconds(df)
    
    # create column names and a mapping dictionary 
    Iton_Sensors, mapping_dict = create_valid_columns()
    df1 = df[Iton_Sensors].copy()
    
    # convert dask dataframe to pandas dataframe
    df1 = df1.compute()
    
    # reformat dataframe
    df1 = df1.set_index('Time')
    dfx = df1.stack().reset_index()

    # create additional features
    df_final = create_features(dfx,mapping_dict,FILENAME)

    # add annotation line at the top and save the file
    write_annotated_csv(df_final)
    
    print(df_final.head(10))

In [19]:
%timeit
if __name__ == "__main__":
    main(s3_bucket_url)

   point  _measurement  house_id                        _time         _field  \
0      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z  Tdown/TAD(us)   
1      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z    Tup/TRD(us)   
2      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z   TransitT(us)   
3      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z         DT(us)   
4      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z      Flow(l/h)   
5      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z        uC temp   
6      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z    HSPLL Calib   
7      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z          Count   
8      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z       PGA gain   
9      0  itronSensors  h.rieger  2023-02-01T13:13:01.000000Z          MaxUp   

       _value  
0   52.792760  
1   52.828160  
2    0.000000  
3    0.042942  
4  210.430400  
5    8.300000  
6    1.