In [1]:
import numpy as np
import pandas as pd 
import warnings
import time
warnings.filterwarnings("ignore")

## Schema

**Table 1**

**Title:** data_v2

**Fields:** company_id, ticker, date, volume, price, ref_num, source, name


**Table 2**

**Title:** reference

**Fields:** id, ticker, name



**Constraints**:  
- Every row in **data_v2** must have one and only one corresponding element from **referrence**
- Every row from **referrence** can correspond to zero or many elements in **data_v2**.
- Volumn should be non-negative
- Price should be non-negative
- Each id, ticker and name from **reference** tbale should be unique

---
Here I used Crow's Foot Notation

![Schema.jpg](Schema.jpg)

### Side Note:
For **reference** table:
- Here I assume (id, ticker) forms the prime key for the **reference** table as the name is not atomic.
- As the SEC guidelines simply require that the choice of a ticker symbol shouldbe original (i.e. not replicate another company's stock ticker symbol) thus I will drop the second row with duplicated ticker 
- For rows with duplicated name, I will drop the row without a correct corresponding ticker - name


For **check_validate** function:
- As missing numerical values can be filled by data scientist or our client according to their wills (average, mode, etc), I didn't count missing values as failed rows.

## Loads Zip Files

### Side Note:
I have manually check the stock price for both repaeted tickers towards the corresponding datetime and the price did not macthing to the real ones; thus I assume the data is synthetic and I will drop the second row for the duplicated ticker. 


    


```

for item in list(df2[df2['name'].duplicated()==True]['name']):
    print(df2[df2['name']==item],'\n')

```

In [17]:
def load_data_v2(path: str) -> pd.DataFrame:
    return pd.read_csv(path)

def load_reference(path: str)-> pd.DataFrame:
    
    # perform data cleaning for reference table to ensure there is no duplicated ticker
    # and each ticker is corresponding to the right name. Also rename id to company_id 
    
    df = pd.read_csv(path)
    df.rename(columns={"id": "company_id", "ticker": "ticker", "name": "name"},inplace = True)
    
    # drop duplicated ticker
    df = df[df['ticker'].duplicated()!=True]
    
    # drop rows with incorrect ticker - name pair
    # error_list refer to the test code above where I manually checked each ticker
    # corresponding to the right name 
    
    error_list = [431,245,691,792,820,814,906]
    
    df.drop(index = error_list,inplace = True)
    
    return df

## Check Validate

In [18]:
def check_validate(df: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    
    # Return DataFrame where error rows have been dropped, and error DataFrame which
    # contains error info
    
    # part a) 
    

    # Error output dataframe should have columns: check_performed, line_number, 
    # field, expected_value, actual_value
    
    # Based on the constarints: We should have company_id refer to id in reference, 
    # volume non-negative, price non-negative

    
    # check if company_id in reference
    id_list = list(df2['company_id'])

    reference_error = df[df['company_id'].isin(id_list)!=True]
    reference_error['check_performed'] = 'Schema Check'
    reference_error['line_number'] = reference_error.index
    reference_error['field'] = 'company_id'
    reference_error['expected_value'] = 'company_id refer to id in reference'
    reference_error['actual_value'] = reference_error['company_id']
    
    # Check volume and price if there was any negative values
    volume_error = df[df.eval('volume < 0')]
    volume_error['check_performed'] = 'Negative Value'
    volume_error['line_number'] = volume_error.index
    volume_error['field'] = 'volume'
    volume_error['expected_value'] = 'volume >= 0'
    volume_error['actual_value'] = volume_error['volume']
    
    price_error = df[df.eval('price < 0')]
    price_error['check_performed'] = 'Negative Value'
    price_error['line_number'] = price_error.index
    price_error['field'] = 'price'
    price_error['expected_value'] = 'price >= 0'
    price_error['actual_value'] = price_error['price']

    
    ## check duplicated rows if have the same date and ticker
    
    temp = df[df['company_id'].isin(id_list)] # get rows in the reference 
    duplicated_error = temp[temp.duplicated()]
    duplicated_error['check_performed'] = 'Duplicated Rows'
    duplicated_error['line_number'] = duplicated_error.index
    duplicated_error['field'] = 'Ticker and Date'
    duplicated_error['expected_value'] = 'Non Duplicates'
    duplicated_error['actual_value'] = 'Duplicates'
    
    # concate into one DataFrame
    
    error_df = pd.concat([reference_error,volume_error,price_error,duplicated_error])
    error_df = error_df[['check_performed','line_number','field','expected_value','actual_value']]
    error_df.reset_index(inplace = True,drop = True)
    
    # part b) Remove any failed rows
    
#     new_df = temp[temp[['date','ticker']].duplicated()!=True]
    temp.drop_duplicates(inplace = True)
    
    return temp, error_df

## Data Transform 

In [19]:
def data_transform(df: pd.DataFrame,df2: pd.DataFrame) -> pd.DataFrame:
    
    # fill the name column (data_v2) 
    # referring to name in reference and unnecessary columns
    
    new_df = pd.merge(df, df2, how="left", on=["company_id"])
    new_df = new_df[['company_id','ticker_y','date', 'volume', 'price', 'ref_num','name_y']]
    new_df.rename({'ticker_y':'ticker','name_y':'name'},inplace = True,axis='columns')
    
    
    # replace NaN values for ticker column (data_v2) 
    # referring to ticker in reference
    ticker_missing = new_df[new_df['ticker'].isnull()]
    index = ticker_missing.index
    result = pd.merge(ticker_missing, df2, how="left", on=["company_id"])
    result = result[['company_id','ticker_y','date', 'volume', 'price', 'ref_num'
                     ,'name_y']].set_index(index)

    for i in list(index):
        new_df.loc[i,list(new_df.keys())]=result.loc[i].values
    
    
    # convert str time to datetime time
    new_df['date'] = pd.to_datetime(new_df['date'])
    
    
    # add new column total_value to each row
    new_df['total_value'] = None
    total_value = new_df['volume']*new_df['price']
    new_df['total_value'] = total_value
    
    return new_df

## Export Data 

In [20]:
def export_clean_data(df: pd.DataFrame,output_path: str):
    return df.to_csv(output_path+'data_clean.csv',index= False)

## Report Details

In [21]:
def report_details(df: pd.DataFrame, old_df: pd.DataFrame, error_df:pd.DataFrame) -> pd.Series:
    
    # get the columns with nan values
    
    val = df.isna().sum().sort_values(ascending=False)
    print('columns with nan values:\n',val)
    
    # input row, output row, and error row

    input_row = old_df.shape[0]
    output_row = df.shape[0]
    error_row = error_df.shape[0]
    
    print(f'\nThe number of input rows is {input_row:,}\nThe number of output rows is {output_row:,}\nThe number of error rows is {error_row:,}')
    
    return val

## Main Function

In [22]:
if __name__ == '__main__' or '__file__' in globals():
    start = time.time()
    path_data = 's3://alpharoc-dev/data_challenge/etl/data_v2.csv.gz'
    path_reference = 's3://alpharoc-dev/data_challenge/etl/reference.csv.gz'
    output_path = '/home/fu_wang/'
    data_v2 = load_data_v2(path_data)
    reference = load_reference(path_reference)
    new_data_v2, error_df = check_validate(data_v2,reference)
    clean_data_v2 = data_transform(new_data_v2,reference)
    export_clean_data(clean_data_v2,output_path)
    end = time.time()
    print(f'Total time used {round(end - start,3)} in sceonds','\n')
    report_details(clean_data_v2,data_v2,error_df)

Total time used 14.818 in sceonds 

columns with nan values:
 total_value    73
price          48
volume         25
company_id      0
ticker          0
date            0
ref_num         0
name            0
dtype: int64

The number of input rows is 1,461,020
The number of output rows is 1,440,546
The number of error rows is 20,474


### Comments:
1. The numerical category has extreme values towards on the quantiles $[0,0.05]$  and $[0.95,1]$, and I suggest data scientist or clients to pay extra attention to that.

In [23]:
clean_data_v2['price'].quantile([0.1, .99])

0.10    109.96
0.99    199.01
Name: price, dtype: float64

In [24]:
clean_data_v2['volume'].quantile([0.0001, 1])

0.0001           80.0
1.0000    999999999.0
Name: volume, dtype: float64