# Clean Data

### Description:

Filter the three main data sources (Fund Info, Returns and Holdings)
based on some parameters and save the result.

Approach:
1. Match fund summary to each portfolio/date pair
2. Filter fund/date pairs based on those infos
3. Filter returns based on final sample of holdings
4. Save different versions of the files with different timeframes

Parameters: 
- Lipper_class
- Flags

In [20]:
%matplotlib inline

import os
import sys

# add the 'src' directory as one where we can import modules
src_dir = os.path.join(os.getcwd(), os.pardir, 'src')
sys.path.append(src_dir)
from data.basic_functions import * 

import numpy as np
import pandas as pd
from scipy import sparse
import datetime

import matplotlib.pyplot as plt

# For multiprocessing
import multiprocessing
from itertools import product

### Load the data files

In [21]:
path = '../data/raw/monthly_returns.feather'
returns = feather.read_dataframe(path)   # Downloaded from wrds
returns.shape

(7273320, 3)

In [22]:
path = '../data/raw/total_summary.feather'
summary = feather.read_dataframe(path)     # Downloaded from wrds
summary.shape

(167842, 10)

In [23]:
npz_path = '../data/interim/sparse_matrix.npz'      # Preprocessed holdings data (see notebook 011)
holdings = sparse.load_npz(npz_path)
holdings.shape

(716429, 2382968)

In [24]:
path = '../data/interim/sparse_info.feather'
holdings_summary = feather.read_dataframe(path)
holdings_summary.shape

(716429, 2)

In [25]:
path = '../data/raw/portno_map.feather'
portno_map = feather.read_dataframe(path)
portno_map.shape

(75283, 4)

In [26]:
# Stock map
path = '../data/interim/stock_map.feather'
stock_map = feather.read_dataframe(path)
stock_map.shape

(2382968, 3)

## Drop duplicates in summary data

To increase speed of matching obj_codes and other fund info to portfolios

In [27]:
print('Shape of summary before cleaning: {:,} x {:,}'.format(summary.shape[0],summary.shape[1]))

Shape of summary before cleaning: 167,842 x 10


In [28]:
# Delet observations with missing portnos
summary_clean = summary.loc[summary['crsp_portno'].notna(),:]

In [29]:
# Change dtypes
summary_clean.loc[:,'crsp_fundno'] = pd.to_numeric(summary_clean['crsp_fundno'], downcast='integer')
summary_clean.loc['crsp_portno'] = pd.to_numeric(summary_clean['crsp_portno'], downcast='integer')

summary_clean.loc['first_offer_dt'] = pd.to_datetime(summary_clean['first_offer_dt']).dt.date
summary_clean.loc['begdt'] = pd.to_datetime(summary_clean['begdt']).dt.date
summary_clean.loc['enddt'] = pd.to_datetime(summary_clean['enddt']).dt.date

In [34]:
# Check if dates should be convertes
summary_clean.dtypes

crsp_fundno        float64
crsp_portno        float64
fund_name           object
first_offer_dt      object
index_fund_flag     object
et_flag             object
begdt               object
enddt               object
lipper_class        object
avrcs              float64
dtype: object

In [35]:
print('Shape of summary data after cleaning: {:,} x {:,}'.format(summary_clean.shape[0],summary_clean.shape[1]))

Shape of summary data after cleaning: 132,650 x 10


# Match obj code to portfolios

TODO: 

Fund info flags like the index_fund_flag could in theory also be different in the fund_history

Therefore a similar approach should also be used for those items

In general portno fundno map also beginning and end times

In [36]:
def port_ID_to_port_info(fund_info):  
            """
            Used to merge the right obj_code and other fund info 
            to each holdings_info row and therefore to each row of the sparse holdings matrix
            
            Input:
            - fund_info: Tuple consisting of the port_no (1st element) and the report_dt (2nd)
            
            Output:
            - Tuple of port_no, report_dt, index_fund_flag, et_flag and crsp_obj_cd. 
                NaN if value is not available
            
            Attention:
            Depends on global variable 'summary' to look up the values
            Must be renamed or changed in the function
            """
        
            port_no = fund_info[0]
            report_dt = fund_info[1].date()
            
            
            # Select based on port_no
            my_class = summary_clean[summary_clean.crsp_portno == port_no]
            my_class = my_class.loc[
                (my_class.begdt <= report_dt) & 
                (my_class.enddt >= report_dt)]
            
            try:
                lipper_class = my_class['lipper_class'].values[0]
                fund_name = my_class['fund_name'].values[0]
                index_fund_flag = my_class['index_fund_flag'].values[0]
                et_flag = my_class['et_flag'].values[0]
                avrcs = my_class['avrcs'].values[0]

            except:
                lipper_class = np.nan
                fund_name = np.nan
                index_fund_flag = np.nan
                et_flag = np.nan
                avrcs = np.nan
                
            return(port_no, report_dt, fund_name, et_flag, index_fund_flag, avrcs, lipper_class)        

### Multiprocessing

In [None]:
a = holdings_summary['port_no']
b = holdings_summary['date']

fund_info = list(zip(a,b))

In [None]:
len(fund_info)

In [None]:
with multiprocessing.Pool(processes=6) as pool:
    results = pool.map(port_ID_to_port_info, fund_info)

In [None]:
#labels = ['port_no','report_dt', 'fund_name', 'index_fund_flag','et_flag', 'avrcs', 'lipper_class']
labels = ['port_no','report_dt', 'fund_name','et_flag', 'index_fund_flag', 'avrcs', 'lipper_class']
holdings_summary = pd.DataFrame.from_records(results, columns=labels)

In [None]:
print('Out of the roughly 730k portfolios, for 129k there is no fund header info available')
print(holdings_summary.shape)
holdings_summary[holdings_summary['lipper_class'].isna()].shape # -> For some portfolios there is simly no row in fund_header

#### Save result to save time

In [None]:
path = '../data/interim/holdings_summary_raw.feather'
feather.write_dataframe(holdings_summary,path)

In [37]:
path = '../data/interim/holdings_summary_raw.feather'
holdings_summary = feather.read_dataframe(path)

# Clean holdings_summary data

Do not delet rows since they match to the sparse matrix

In [38]:
# Creat new flag var 'mutual_fund' that is Y for Mutual Funds and N for other funds
# Based on variable et_flag and index_fund_flag

# Make the two flags categories and rename those categories accordingly
#holdings_summary[['et_flag','index_fund_flag']] = holdings_summary[['et_flag','index_fund_flag']].astype('category')
holdings_summary[['et_flag']] = holdings_summary[['et_flag']].astype('category')

In [39]:
et_mapper = {'F':'ETF', 'N':'ETN', np.nan:'MF'}
holdings_summary['et_flag'] = holdings_summary['et_flag'].map(et_mapper)

#index_flag_mapper = {'B':'Index-based', 'D':'Pure Index', 'E':'Index enhanced', np.nan:'MF'}
#holdings_summary['index_fund_flag'] = holdings_summary['index_fund_flag'].map(index_flag_mapper)
#holdings_summary.loc[(holdings_summary['index_fund_flag'] == 'MF') & 
#                     (holdings_summary['et_flag'] == 'MF'),'mutual_fund'] = 'Y'

holdings_summary.loc[holdings_summary['et_flag'] == 'MF','mutual_fund'] = 'Y'

holdings_summary.loc[holdings_summary['mutual_fund'].isna(),'mutual_fund'] = 'N'

holdings_summary.shape

(716429, 7)

# Choose which CRSP style codes to use

In [40]:
selected_obj_codes = (
#                     'EIEI', 'G', 
                      'LCCE', 'LCGE', 'LCVE',   # Large-cap
                      'MCCE', 'MCGE', 'MCVE',   # Mid-cap
                      'MLCE', 'MLGE', 'MLVE',   # Multi-cap
                      'SCCE', 'SCGE', 'SCVE')   # Small-cap

In [41]:
# If fund is Mutual Fund and has the right style code -> Sample == 'Y'
holdings_summary.loc[(holdings_summary['mutual_fund'] == 'Y') & 
                     (holdings_summary['lipper_class'].isin(selected_obj_codes)),'sample'] = 'Y'

# Otherwise sample == 'N'
holdings_summary.loc[holdings_summary['sample'].isna(),'sample'] = 'N'

# Make the two new variables categorical
holdings_summary[['mutual_fund','sample']] = holdings_summary[['mutual_fund','sample']].astype('category')

In [42]:
# Assign class EIEI to class LCVE
holdings_summary[holdings_summary.style == 'EIEI'] = 'LCVE'
holdings_summary[holdings_summary.style == 'G'] = 'LCGE'

In [43]:
holdings_summary['cap_class'] = holdings_summary['lipper_class'].astype(str).str[0]
holdings_summary['style_class'] = holdings_summary['lipper_class'].astype(str).str[2]

### Results

In [44]:
print('How are the two variables on which MF assignemnt is based on distributed:')
#pd.crosstab(holdings_summary['et_flag'],holdings_summary['index_fund_flag'])

How are the two variables on which MF assignemnt is based on distributed:


In [45]:
print('How many funds are considered in the sample?')
print(holdings_summary['sample'].value_counts())

print('How are the selected styles distributed?')
pd.crosstab(holdings_summary['lipper_class'],holdings_summary['sample']).sort_values('Y')[-len(selected_obj_codes):]

How many funds are considered in the sample?
N    716429
Name: sample, dtype: int64
How are the selected styles distributed?


KeyError: 'Y'

# Save holdings_summary

In [47]:
path = '../data/interim/holdings_summary_total.feather'
feather.write_dataframe(holdings_summary,path)

In [48]:
path = '../data/interim/holdings_summary_total.feather'
holdings_summary = feather.read_dataframe(path)

### Add fund_no to holdings_summary

#### Fundo is not an integer for now but not that important -> TODO

In [49]:
portno_map_unique = portno_map.drop_duplicates(subset='crsp_portno')

#### Maybe must be modified since all but one associated fund_nos per portfolio are deleted 

In [50]:
holdings_summary = holdings_summary.merge(portno_map_unique[['crsp_portno','crsp_fundno']],how='left', left_on = 'port_no', right_on='crsp_portno')

In [51]:
holdings_summary.shape

(716429, 13)

In [52]:
mask = holdings_summary['crsp_fundno'].notna()
holdings_summary['crsp_fundno'] = holdings_summary.loc[mask,'crsp_fundno'].astype(int)

In [53]:
holdings_summary.head(1)

Unnamed: 0,port_no,report_dt,fund_name,et_flag,avrcs,lipper_class,mutual_fund,sample,False,cap_class,style_class,crsp_portno,crsp_fundno
0,1000001,2003-03-31,,,,,N,N,LCGE,N,n,1000001.0,4273


In [54]:
holdings_summary = holdings_summary.rename(columns={'crsp_fundno':'fund_no'}, index=str)

# Take sample according to parameter

### Filter returns

In [55]:
# Convert to date format and filter based on date
returns['date'] =  pd.to_datetime(returns['caldt'], format='%Y-%m-%d')

In [56]:
unique_portno = holdings_summary[['fund_no']].drop_duplicates()

In [57]:
mask = returns['crsp_fundno'].isin(unique_portno['fund_no'])

In [58]:
returns_s = returns[mask]

In [59]:
print('Shape of returns before filtering',
     returns.shape)

print('Shape of returns after filtering ',
     returns_s.shape)

Shape of returns before filtering (7273320, 4)
Shape of returns after filtering  (2171612, 4)


# Filter holdings

### Mask to filter out only those in the sample according to holdings_summary

In [60]:
mask = (holdings_summary['sample'] == 'Y') 
np.sum(mask)

0

In [61]:
holdings_s = holdings[mask.values]
holdings_s

<0x2382968 sparse matrix of type '<class 'numpy.float64'>'
	with 0 stored elements in Compressed Sparse Row format>

In [73]:
holdings_summary_s = holdings_summary[mask]

ValueError: Item wrong length 2382968 instead of 716429.

### Filter holdings summary

### Take only last n number of obs per fund (Not needed at the moment)

To avoid overweight of funds with many observations

last_n = 5

holdings_summary_s = holdings_summary_s.reset_index()
index = pd.DataFrame(np.arange(holdings_summary_s.shape[0]))
index = index.groupby(holdings_summary_s['port_no']).tail(last_n)
index = index.values.T.flatten()

holdings_summary_s = holdings_summary_s.reset_index().loc[index,:]
holdings_s = holdings_s[index]

print('Observations left after taking only the last {} observations per fund:'.format(last_n))
holdings_s.shape

### Delet columns (stocks) with little to no information
Delet all colums / stocks which occur no more than 'min_observations_per_stock' times

In [62]:
min_observations_per_stock = 50

In [63]:
holdings_b = sparse.csr_matrix(holdings_s, copy=True)
holdings_b.data = np.ones(len(holdings_s.data))

col_sums = pd.DataFrame(holdings_b.sum(0)).values.flatten()

In [64]:
print('Total number of securities:               {:>10,d}'.format(len(col_sums)))
print('Total number of securities with >1:       {:>10,d}'.format(sum(col_sums > 1)))
print('Total number of securities with >10:      {:>10,d}'.format(sum(col_sums > 10)))
print('Total number of securities with >50:      {:>10,d}'.format(sum(col_sums > 50)))

Total number of securities:                2,382,968
Total number of securities with >1:                0
Total number of securities with >10:               0
Total number of securities with >50:               0


In [65]:
# generate mask to delet some stocks
mask = col_sums > min_observations_per_stock

In [66]:
holdings_s = holdings_s.tocsc()
holdings_s = holdings_s[:,mask]
holdings_s = holdings_s.tocsr()
holdings_s

<0x0 sparse matrix of type '<class 'numpy.float64'>'
	with 0 stored elements in Compressed Sparse Row format>

In [67]:
holdings_b = holdings_b.tocsc()
holdings_b = holdings_b[:,mask]
holdings_b = holdings_b.tocsr()
holdings_b

<0x0 sparse matrix of type '<class 'numpy.float64'>'
	with 0 stored elements in Compressed Sparse Row format>

In [68]:
stock_map = stock_map[mask.T]

### Save final cleaned and filtered data

#### Sparse matrix

In [69]:
path = '../data/processed/holdings_s'
sparse.save_npz(path, holdings_s)

In [70]:
path = '../data/processed/holdings_b'
sparse.save_npz(path, holdings_b)

#### Sparse info

In [72]:
holdings_summary = holdings_summary.drop(columns=['index_fund_flag','et_flag','mutual_fund','sample','crsp_portno'])

KeyError: "['index_fund_flag'] not found in axis"

In [None]:
holdings_summary_s['report_dt'] = pd.to_datetime(holdings_summary_s['report_dt'], format='%Y-%m-%d').dt.date

In [None]:
path = '../data/processed/holdings_summary_s.feather'
feather.write_dataframe(holdings_summary_s,path)

#### Returns

In [None]:
# Convert to date format and filter based on date
begin_date = datetime.date.fromisoformat('2003-01-01')
returns['caldt'] =  pd.to_datetime(returns['caldt'], format='%Y-%m-%d').dt.date
returns_s = returns[returns['caldt'] > begin_date]

In [None]:
path = '../data/processed/returns_s.feather'
feather.write_dataframe(returns_s,path)

#### Stock Map

In [None]:
path = '../data/processed/stock_map.feather'
feather.write_dataframe(stock_map,path)

## Take smaller sub_sub sample (Everything before specified year)
Makes processing faster

In [None]:
start_date = datetime.date.fromisoformat('2015-01-01')
end_date = datetime.date.fromisoformat('2018-01-01')

#### Holdings & holdings_summary

In [None]:
mask = (holdings_summary_s['report_dt'] > start_date) & (holdings_summary_s['report_dt'] < end_date)

In [None]:
holdings_s_s = holdings_s[mask.values]
holdings_b_b = holdings_b[mask.values]

In [None]:
holdings_summary_s_s = holdings_summary_s[mask]

#### Test

In [None]:
holdings_s_s.shape

In [None]:
holdings_summary_s_s.shape

#### Returns

In [None]:
mask = (returns_s['caldt'] > start_date) & (returns_s['caldt'] < end_date)
returns_s_s = returns_s[mask]

### Save final cleaned and filtered data

#### Sparse matrix

In [None]:
path = '../data/processed/holdings_s_s'
sparse.save_npz(path, holdings_s_s)

path = '../data/processed/holdings_b_b'
sparse.save_npz(path, holdings_b_b)

#### Sparse info

In [None]:
path = '../data/processed/holdings_summary_s_s.feather'
feather.write_dataframe(holdings_summary_s_s,path)

#### Returns

In [None]:
path = '../data/processed/returns_s_s.feather'
feather.write_dataframe(returns_s_s,path)