# Data Cleaning Pipeline
## Created a data screening and cleaning pipeline for raw datasets

## 0. sample dataset

In [1]:
# load packages
import pandas as pd
import numpy as np

import arcticdb as adb



Read datasets

In [2]:
file_path = '/Users/zway/Desktop/BTC_Project/raw_datasets/raw_sample_2023_07.csv'

raw_df = pd.read_csv(file_path)

missing_summary = raw_df.isnull().sum()
raw_df_info = raw_df.info()

raw_df.head(), missing_summary

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 36111 entries, 0 to 36110
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   Unnamed: 0.1  36111 non-null  int64  
 1   Unnamed: 0    36111 non-null  int64  
 2   Timestamp     36111 non-null  object 
 3   Open          36111 non-null  float64
 4   High          36111 non-null  float64
 5   Low           36111 non-null  float64
 6   Close         36111 non-null  float64
 7   Volume        36111 non-null  float64
 8   Avg_price     36111 non-null  float64
dtypes: float64(6), int64(2), object(1)
memory usage: 2.5+ MB


(   Unnamed: 0.1  Unnamed: 0            Timestamp     Open     High      Low  \
 0       1675787     1675787  2023-07-01 00:00:00  30407.1  30418.2  30407.1   
 1       1675788     1675788  2023-07-01 00:01:00  30416.9  30416.9  30392.3   
 2       1675789     1675789  2023-07-01 00:02:00  30393.5  30417.3  30393.5   
 3       1675790     1675790  2023-07-01 00:03:00  30404.1  30420.8  30404.1   
 4       1675791     1675791  2023-07-01 00:04:00  30413.6  30427.2  30413.4   
 
      Close    Volume  Avg_price  
 0  30416.9  0.000027   30412.65  
 1  30393.2  0.000016   30404.60  
 2  30402.2  0.000074   30405.40  
 3  30413.4  0.000056   30412.45  
 4  30422.8  0.000039   30420.30  ,
 Unnamed: 0.1    0
 Unnamed: 0      0
 Timestamp       0
 Open            0
 High            0
 Low             0
 Close           0
 Volume          0
 Avg_price       0
 dtype: int64)

Column 'Unnamed: 0' and 'Unnamed: 0.1' are likely the IDs and they may be duplicated. Examine if they are duplicated, and if yes, drop one and rename the rest as 'ID'

In [3]:
if raw_df['Unnamed: 0'].equals(raw_df['Unnamed: 0.1']):
    raw_df = raw_df.drop(columns=['Unnamed: 0'])  
    raw_df = raw_df.rename(columns={'Unnamed: 0.1': 'id'})
    print('These two columns are identical, renamed the remaining one to id')  
else:
    print('These two columns are not identical.')

These two columns are identical, renamed the remaining one to id


In [4]:
raw_df.info(), raw_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 36111 entries, 0 to 36110
Data columns (total 8 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   id         36111 non-null  int64  
 1   Timestamp  36111 non-null  object 
 2   Open       36111 non-null  float64
 3   High       36111 non-null  float64
 4   Low        36111 non-null  float64
 5   Close      36111 non-null  float64
 6   Volume     36111 non-null  float64
 7   Avg_price  36111 non-null  float64
dtypes: float64(6), int64(1), object(1)
memory usage: 2.2+ MB


(None,
         id            Timestamp     Open     High      Low    Close    Volume  \
 0  1675787  2023-07-01 00:00:00  30407.1  30418.2  30407.1  30416.9  0.000027   
 1  1675788  2023-07-01 00:01:00  30416.9  30416.9  30392.3  30393.2  0.000016   
 2  1675789  2023-07-01 00:02:00  30393.5  30417.3  30393.5  30402.2  0.000074   
 3  1675790  2023-07-01 00:03:00  30404.1  30420.8  30404.1  30413.4  0.000056   
 4  1675791  2023-07-01 00:04:00  30413.6  30427.2  30413.4  30422.8  0.000039   
 
    Avg_price  
 0   30412.65  
 1   30404.60  
 2   30405.40  
 3   30412.45  
 4   30420.30  )

Check duplicate records and inconsistent datetime format

In [5]:
duplicate_rows = raw_df[raw_df.duplicated()]

duplicate_rows.info(), duplicate_rows.head()


<class 'pandas.core.frame.DataFrame'>
Index: 0 entries
Data columns (total 8 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   id         0 non-null      int64  
 1   Timestamp  0 non-null      object 
 2   Open       0 non-null      float64
 3   High       0 non-null      float64
 4   Low        0 non-null      float64
 5   Close      0 non-null      float64
 6   Volume     0 non-null      float64
 7   Avg_price  0 non-null      float64
dtypes: float64(6), int64(1), object(1)
memory usage: 0.0+ bytes


(None,
 Empty DataFrame
 Columns: [id, Timestamp, Open, High, Low, Close, Volume, Avg_price]
 Index: [])

In [6]:
timestamp_formats = raw_df["Timestamp"].apply(lambda x: isinstance(x, str) and len(x.strip()) != 0)
non_string_timestamps = raw_df[~timestamp_formats]

non_string_timestamps.info(), non_string_timestamps.head()

<class 'pandas.core.frame.DataFrame'>
Index: 0 entries
Data columns (total 8 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   id         0 non-null      int64  
 1   Timestamp  0 non-null      object 
 2   Open       0 non-null      float64
 3   High       0 non-null      float64
 4   Low        0 non-null      float64
 5   Close      0 non-null      float64
 6   Volume     0 non-null      float64
 7   Avg_price  0 non-null      float64
dtypes: float64(6), int64(1), object(1)
memory usage: 0.0+ bytes


(None,
 Empty DataFrame
 Columns: [id, Timestamp, Open, High, Low, Close, Volume, Avg_price]
 Index: [])

In [7]:
raw_df['Timestamp'] = pd.to_datetime(raw_df['Timestamp'])

In [8]:
clean_df = raw_df

clean_df.info(), clean_df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 36111 entries, 0 to 36110
Data columns (total 8 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   id         36111 non-null  int64         
 1   Timestamp  36111 non-null  datetime64[ns]
 2   Open       36111 non-null  float64       
 3   High       36111 non-null  float64       
 4   Low        36111 non-null  float64       
 5   Close      36111 non-null  float64       
 6   Volume     36111 non-null  float64       
 7   Avg_price  36111 non-null  float64       
dtypes: datetime64[ns](1), float64(6), int64(1)
memory usage: 2.2 MB


(None,
         id           Timestamp     Open     High      Low    Close    Volume  \
 0  1675787 2023-07-01 00:00:00  30407.1  30418.2  30407.1  30416.9  0.000027   
 1  1675788 2023-07-01 00:01:00  30416.9  30416.9  30392.3  30393.2  0.000016   
 2  1675789 2023-07-01 00:02:00  30393.5  30417.3  30393.5  30402.2  0.000074   
 3  1675790 2023-07-01 00:03:00  30404.1  30420.8  30404.1  30413.4  0.000056   
 4  1675791 2023-07-01 00:04:00  30413.6  30427.2  30413.4  30422.8  0.000039   
 
    Avg_price  
 0   30412.65  
 1   30404.60  
 2   30405.40  
 3   30412.45  
 4   30420.30  )

# Dataset cleaned and save to ArcticDB library

Set up ArcticDB path and libraries

In [9]:
DB_PATH = '/Users/zway/Desktop/BTC_Project/DB'

In [10]:
# set up ArcticDB
ac = adb.Arctic(f"lmdb://{DB_PATH}")

# Create libraries if not exist
if not ac.has_library('cleaned_data'):
    ac.create_library("cleaned_data")


In [11]:
# write clean_df to ArcticDB cleaned library
ac["cleaned_data"].write("btc_cleaned_sample", clean_df)

VersionedItem(symbol='btc_cleaned_sample', library='cleaned_data', data=n/a, version=0, metadata=None, host='LMDB(path=/Users/zway/Desktop/BTC_Project/DB)', timestamp=1747954783591906000)