In [None]:
# default_exp filter

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#hide
# stellt sicher, dass beim verändern der core library diese wieder neu geladen wird
%load_ext autoreload
%autoreload 2

# 01_05_Filter_And_Partition

In this notebook, we filter the lines in which we are interested for the following steps.<br> These are
* only lines from a 10K and 10Q reports (field form is "10-K" or "10-Q")
* only lines which belong to a "primary financal statement" (field stmt is not empty)
* only lines with tags that are not custom tags (version beginning with '00')
* only companies whose shares are, or have been traded at NASDAQ or NYSE

<br>
The result will be stored as a new Parquet-file. We will use our own partition definition in order to make sure, that all lines from one company are inside the same partition.

In [None]:
# imports
from bfh_cas_bgd_fs2020_sa.core import * # initialze spark

from pathlib import Path
from typing import List, Tuple, Union, Set
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col

import pandas as pd

import shutil          # provides high level file operations
import time            # used to measure execution time
import os
import sys

In [None]:
# folder with our test-dataset which contains only data from two zip files
tst_parquet_folder = "./tmp/parquet/"
tst_filtered_folder = "./tmp/filtered/"

# folder with the whole dataset as a single parquet
all_parquet_folder = "D:/data/parquet/"
all_filtered_folder = "D:/data/parq_filtered"

## Init Spark

In [None]:
spark = get_spark_session() # Session anlegen
spark # display the most important information of the session

## Load the dataset

Loading the data doesn't really do anything. It just prepares the df. But we well use the cache() method to keep the data in memory, once it is loaded for the first time.

### Load the test data

In [None]:
start = time.time()
df_tst = spark.read.parquet(tst_parquet_folder).cache()
duration = time.time() - start
print("duration: ", duration)

duration:  0.17499494552612305


### Load the whole dataset

In [None]:
start = time.time()
df_all = spark.read.parquet(all_parquet_folder).cache()
duration = time.time() - start
print("duration: ", duration)

duration:  2.0872995853424072


### Print all the contained column names

In [None]:
_ = [print(x, end=", ") for x in df_all.columns] # print the name of the columns for convenience

cik, adsh, tag, version, coreg, ddate, qtrs, uom, value, footnote, name, sic, countryba, stprba, cityba, zipba, bas1, bas2, baph, countryma, stprma, cityma, zipma, mas1, mas2, countryinc, stprinc, ein, former, changed, afs, wksi, fye, form, period, fy, fp, filed, accepted, prevrpt, detail, instance, nciks, aciks, report, line, stmt, inpth, rfile, plabel, negating, ticker, name_cik_tic, exchange, 

## Loading data into memory

We just make a count on the test and the all dataset. This ensure that the data will be loaded into the memory and is cached afterwards.

In [None]:
print("Entries in Test: ", "{:_}".format(df_tst.count())) # loading test dataset into memory

Entries in Test:  5_239_639


In [None]:
print("Entries in Test: ", "{:_}".format(df_all.count())) # loading all dataset into memory

Entries in Test:  109_392_813


## Filter lines for "10K" and "10Q" and "Primary Financial Statement"

In [None]:
filter_string = "stmt is not null and version NOT LIKE '00%' and form in ('10-K', '10-Q')"

### Test dataset

In [None]:
df_tst_filtered = df_tst.where(filter_string)

In [None]:
start = time.time()
print("after filter   : ", "{:_}".format(df_tst_filtered.count()))
duration = time.time() - start
print("duration: ", duration)

after filter   :  2_994_680
duration:  4.5779969692230225


### Whole dataset

In [None]:
df_all_filtered = df_all.where(filter_string)

In [None]:
start = time.time()
print("after filter   : ", "{:_}".format(df_all_filtered.count()))
duration = time.time() - start
print("duration: ", duration)

after filter   :  55_696_724
duration:  24.144516706466675


## Filter companies traded at NYSE and NASDAQ

A companies "being traded at" status can change during its life time, so we cannot simply filter by the "exchange" column. Instead we have to create a list with all the companies (CIK number) which have been traded or ar traded at NYSE or NASDAQ.
<br>
We add a column "cik_select" which recieves the value 1 which we can use to filter after the join.

### Create list with cik

#### Test dataset

In [None]:
df_tst_cik_exchange = df_tst_filtered[['cik','exchange']].distinct() \
    .where("exchange in ('NASDAQ','NYSE','NYSE ARCA','NYSE MKT') ").selectExpr("cik", "1 as cik_select").cache()

In [None]:
start = time.time()
print("count cik_exchange   : ", "{:_}".format(df_tst_cik_exchange.count()))
duration = time.time() - start
print("duration: ", duration)

count cik_exchange   :  2_844
duration:  0.6120038032531738


#### Whole dataset

In [None]:
df_all_cik_exchange = df_all_filtered[['cik','exchange']].distinct() \
    .where("exchange in ('NASDAQ','NYSE','NYSE ARCA','NYSE MKT') ").selectExpr("cik", "1 as cik_select").cache()

In [None]:
start = time.time()
print("count cik_exchange   : ", "{:_}".format(df_all_cik_exchange.count()))
duration = time.time() - start
print("duration: ", duration)

count cik_exchange   :  4_459
duration:  42.441001892089844


### Join the cik list with the filtered dataframe

Finally, we can left join the filtered dataframe with the cik list and use the column "cik_select" to filter the ones we are interested in.

#### Test dataset

In [None]:
df_tst_filter_complete = df_tst_filtered.join(df_tst_cik_exchange, 'cik', "left").where("cik_select == 1")

In [None]:
start = time.time()
print("count cik_exchange   : ", "{:_}".format(df_tst_filter_complete.count()))
duration = time.time() - start
print("duration: ", duration)

count cik_exchange   :  1_680_108
duration:  0.8090000152587891


#### Whole dataset

In [None]:
df_all_filter_complete = df_all_filtered.join(df_all_cik_exchange, 'cik', "left").where("cik_select == 1")

In [None]:
start = time.time()
print("count cik_exchange   : ", "{:_}".format(df_all_filter_complete.count()))
duration = time.time() - start
print("duration: ", duration)

count cik_exchange   :  35_454_045
duration:  25.643085956573486


We were able to reduce the 110 million rows to about 35 million rows. 

 ## Save as a new Parquet file

When we think about defining the partitions, we have to think about how this will effect performance. There are mainly to reasons to use partitions. The first reason ist to be able to read with serveral cores in parallel, so it makes sense to have at least as many partitons as there are cores in the cluster. The second reason is to reduce the amount of data that has to be read. The Parquet-reader can interpret the where-clause of a query and therefore optimize the reading. For instance, let us assume we have data with a country field and use this is the partition key. If we execute a select "coutry = 'Switzerland'", then the Parquet-reader will know that it only has to read the partition, in which data for Switzerland are stored.
<br>
In our case, it makes sense, that all data lines of one company (cik) are stored in the same partition. Moreover, we will use a second level of partitions which are based on the stmt column (the type of the financial statement) since these entries should also belong together.

### Test dataset

In [None]:
df_tst_partioned = df_tst_filter_complete.repartition(16,col("cik"), col("stmt"))

In [None]:
shutil.rmtree(tst_filtered_folder,  ignore_errors=True)
start = time.time()
df_tst_partioned.write.parquet(tst_filtered_folder)
duration = time.time() - start
print("duration: ", duration)

duration:  17.03229856491089


### Whole dataset

In [None]:
df_all_partioned = df_all_filter_complete.repartition(16,col("cik"), col("stmt"))

In [None]:
shutil.rmtree(all_filtered_folder,  ignore_errors=True)
start = time.time()
df_all_partioned.write.parquet(all_filtered_folder)
duration = time.time() - start
print("duration: ", duration)

duration:  238.66271138191223


In [None]:
spark.stop()