In [None]:
# default_exp prepare_whole

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#hide
# stellt sicher, dass beim verändern der core library diese wieder neu geladen wird
%load_ext autoreload
%autoreload 2

# Prepare_Whole_Dataset

## Basic Settings

In [None]:
# imports
from bfh_mt_hs2020_sec_data.core import get_spark_session # initialze spark
from pathlib import Path
from typing import List, Tuple, Union, Set
import urllib.request  # used to download resources from the web 
import shutil          # provides high level file operations
import time            # used to measure execution time
import os
import zipfile

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, DoubleType, BooleanType
from pyspark.sql.functions import udf, col
from pyspark.sql.dataframe import DataFrame

ModuleNotFoundError: No module named 'findspark'

In [None]:
# Basic Definitions
all_zip_folder = "d:/data/sec_zips/"
target_csv_folder = "d:/data/zip_joined/"
extract_temp_folder = "d:/data/tmp/"
all_parquet_folder = "d:/data/parquet/"

In [None]:
Path(all_zip_folder).mkdir(parents=True, exist_ok=True)
Path(target_csv_folder).mkdir(parents=True, exist_ok=True)
Path(extract_temp_folder).mkdir(parents=True, exist_ok=True)

In [None]:
# init Spark
spark = get_spark_session() # Session anlegen
spark # display the moste important information of the session

## 01_Download_ZIP

### Prepare download urls

In [None]:
# definitions to create download urls
sec_base_path = "https://www.sec.gov/files/dera/data/financial-statement-data-sets/"
start_year = 2009        # start year to download the data
end_year   = 2020        # end year for download
format_str = "{}q{}.zip" # all file names are like 2020q1.zip 

In [None]:
def create_download_urls_df():
    
    # create list with all download links
    download_urls = []
    for year in range(start_year, end_year + 1):
        for quarter in range(1,5):
            download_urls.append(sec_base_path + format_str.format(year, quarter))
            
    download_urls.append("https://www.sec.gov/files/node/add/data_distribution/2020q1.zip")
    
    download_urls_df = spark.createDataFrame(download_urls, StringType())
    
    download_urls_df = download_urls_df.withColumnRenamed("value","url")
    
    return download_urls_df

### download the data

In [None]:
def downloader_function(url):
    """
    """
  
    # From URL construct the destination path and filename.
    file_name = os.path.basename(urllib.parse.urlparse(url).path)
    file_path = os.path.join(all_zip_folder, file_name) 

    # Check if the file has already been downloaded.
    if os.path.exists(file_path):
        return "already downloaded"

    # Download and write to file.
    try:
        with urllib.request.urlopen(url, timeout=30) as urldata,\
              open(file_path, 'wb') as out_file:
            shutil.copyfileobj(urldata, out_file)
            return "success"
    except Exception as ex:
        return "failed: {}".format(ex)

In [None]:
downloader_udf = udf(lambda s: downloader_function(s), StringType())

In [None]:
def download_zip_files():
    download_urls_df = create_download_urls_df()
    start_time = time.time()
    result_df =  download_urls_df.select('url', downloader_udf('url').alias('result')).collect()
    execution_time = (time.time() - start_time)
    print("execution time:      ", execution_time)
    return result_df

## 02_join sec data

In [None]:
# Define constants for the names of the filese inside the zip file
SUB_TXT = "sub.txt"
PRE_TXT = "pre.txt"
NUM_TXT = "num.txt"
TAG_TXT = "tag.txt"

In [None]:
# create a list with paths to all the zip files
all_zip_path = Path(all_zip_folder)
zip_files = [str(file) for file in all_zip_path.glob("*.zip")]

In [None]:
def read_csv_in_zip_into_df_extract(zip_file: str, data_file: str) -> DataFrame:
    """
       Extracts the data from zipfile and stores it on disk. 
       Uses spark.csv.read to read the data into the df
    """
    with zipfile.ZipFile(zip_file, "r") as container_zip:
        with container_zip.open(data_file) as f:
            # create a unique tempfile to extract the data
            tempfile = extract_temp_folder +Path(zip_file).name.replace(".zip","").replace("/","").replace("\\","")+"_"+data_file
            
            with open(tempfile, "wb+") as f_temp:
                data = f.read()
                f_temp.write(data)
                f_temp.close()
                f_temp_dbfs  = tempfile.replace("/dbfs","")
         
                df = spark.read.csv(f_temp_dbfs, sep='\t', header=True)
                return df

In [None]:
def join_files(zip_file: str, target_folder: str) -> str:
    """
        Joins the content of the 3 csv files that are contained in the provided zip_file and 
        create one csv file containing all relevant columns inside target_folder.
    """
    
    target_path = target_folder + Path(zip_file).name.replace(".zip","").replace("/","").replace("\\","")
    
    if os.path.exists(target_path):
        return zip_file + " : " + " already Joined"
    
    df_sub = read_csv_in_zip_into_df_extract(zip_file, SUB_TXT)
    df_pre = read_csv_in_zip_into_df_extract(zip_file, PRE_TXT)
    df_num = read_csv_in_zip_into_df_extract(zip_file, NUM_TXT)
    
    df_joined = df_num.join(df_sub, ["adsh"]).join(df_pre, ["adsh","tag","version"],"left")
    
    target_path  = target_path.replace("/dbfs","")
    df_joined.write.csv(target_path, compression="gzip", header=True)
    
    return target_path

In [None]:
def join_zip_content():
    start = time.time()
    for file in zip_files:
        try: 
            print(join_files(file, target_csv_folder))
        except Exception as ex:
            print("failed: ", file, str(ex))
    duration = time.time() - start
    print("duration: ", duration)

## 03_Merge to single Parquet

In [None]:
all_csv_path = Path(target_csv_folder)
all_csv_path_list = [x.name for x in all_csv_path.iterdir() if x.is_dir()]

In [None]:
schema = StructType([  # num.txt  \
                StructField("adsh", 	 StringType(), True), \
                StructField("tag", 	 	 StringType(), True), \
                StructField("version", 	 StringType(), True), \
                StructField("coreg", 	 IntegerType(), True), \
                StructField("ddate", 	 DateType(), True), # date \ 
                StructField("qtrs", 	 StringType(), True), \
                StructField("uom", 	 	 StringType(), True), \
                StructField("value", 	 DoubleType(), True), \
                StructField("footnote",  StringType(), True), \
                      # sub.txt \ 
                StructField("cik", 	 	 IntegerType(), True), \
                StructField("name", 	 StringType(), True), \
                StructField("sic", 	 	 IntegerType(), True), \
                StructField("countryba", StringType(), True), \
                StructField("stprba", 	 StringType(), True), \
                StructField("cityba", 	 StringType(), True), \
                StructField("zipba", 	 StringType(), True), \
                StructField("bas1", 	 StringType(), True), \
                StructField("bas2", 	 StringType(), True), \
                StructField("baph", 	 StringType(), True), \
                StructField("countryma", StringType(), True), \
                StructField("stprma", 	 StringType(), True), \
                StructField("cityma", 	 StringType(), True), \
                StructField("zipma", 	 StringType(), True), \
                StructField("mas1", 	 StringType(), True), \
                StructField("mas2", 	 StringType(), True), \
                StructField("countryinc",StringType(), True), \
                StructField("stprinc", 	 StringType(), True), \
                StructField("ein", 	 	 IntegerType(), True), \
                StructField("former", 	 StringType(), True), \
                StructField("changed", 	 StringType(), True), \
                StructField("afs", 	 	 StringType(), True), \
                StructField("wksi", 	 IntegerType(), True), \
                StructField("fye", 	     StringType(), True), \
                StructField("form", 	 StringType(), True), \
                StructField("period", 	 DateType(), True),  # date \
                StructField("fy", 	 	 IntegerType(), True), \
                StructField("fp", 	 	 StringType(), True), \
                StructField("filed", 	 DateType(), True), # date \
                StructField("accepted",  StringType(), True), # datetime \
                StructField("prevrpt", 	 IntegerType(), True), \
                StructField("detail", 	 IntegerType(), True), \
                StructField("instance",  StringType(), True), \
                StructField("nciks", 	 IntegerType(), True), \
                StructField("aciks", 	 StringType(), True), \
                      # pre.txt \
                StructField("report", 	 IntegerType(), True), \
                StructField("line", 	 IntegerType(), True), \
                StructField("stmt", 	 StringType(), True), \
                StructField("inpth", 	 IntegerType(), True), \
                StructField("rfile", 	 StringType(), True), \
                StructField("plabel", 	 StringType(), True), \
                StructField("negating",  StringType(), True) \
])

In [None]:
def merge_to_single_parquet():
    start = time.time()
    df_all = spark.read.csv(target_csv_folder + "*", header=True, dateFormat="yyyyMMdd", schema=schema)
    
    # prepare cik_ticker data
    df_cik_ticker = spark.read.csv("./data/cik_ticker.csv", sep="|", header=True)[['CIK','Ticker','Name','Exchange']]
    df_cik_ticker = df_cik_ticker.withColumnRenamed('Name', "name_cik_tic") \
                                .withColumnRenamed('Ticker', "ticker") \
                                .withColumnRenamed('Exchange', "exchange") \
                                .withColumn("cik", col("CIK").cast(IntegerType()))
    
    df_all_join = df_all.join(df_cik_ticker, ["cik"], "left")
    
    df_all_join.write.parquet(all_parquet_folder)
    
    duration = time.time() - start
    print("duration: ", duration)

## 99_Execution

In [None]:
download_result = download_zip_files()
print(download_result)

execution time:       11.214993715286255
[Row(url='https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q1.zip', result='already downloaded'), Row(url='https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q2.zip', result='already downloaded'), Row(url='https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q3.zip', result='already downloaded'), Row(url='https://www.sec.gov/files/dera/data/financial-statement-data-sets/2009q4.zip', result='already downloaded'), Row(url='https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q1.zip', result='already downloaded'), Row(url='https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q2.zip', result='already downloaded'), Row(url='https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q3.zip', result='already downloaded'), Row(url='https://www.sec.gov/files/dera/data/financial-statement-data-sets/2010q4.zip', result='already downloaded'), Row(url='https

In [None]:
join_zip_content()

d:\data\sec_zips\2009q1.zip :  already Joined
d:\data\sec_zips\2009q2.zip :  already Joined
d:\data\sec_zips\2009q3.zip :  already Joined
d:\data\sec_zips\2009q4.zip :  already Joined
d:\data\sec_zips\2010q1.zip :  already Joined
d:\data\sec_zips\2010q2.zip :  already Joined
d:\data\sec_zips\2010q3.zip :  already Joined
d:\data\sec_zips\2010q4.zip :  already Joined
d:\data\sec_zips\2011q1.zip :  already Joined
d:\data\sec_zips\2011q2.zip :  already Joined
d:\data\sec_zips\2011q3.zip :  already Joined
d:\data\sec_zips\2011q4.zip :  already Joined
d:\data\sec_zips\2012q1.zip :  already Joined
d:\data\sec_zips\2012q2.zip :  already Joined
d:\data\sec_zips\2012q3.zip :  already Joined
d:\data\sec_zips\2012q4.zip :  already Joined
d:\data\sec_zips\2013q1.zip :  already Joined
d:\data\sec_zips\2013q2.zip :  already Joined
d:\data\sec_zips\2013q3.zip :  already Joined
d:\data\sec_zips\2013q4.zip :  already Joined
d:\data\sec_zips\2014q1.zip :  already Joined
d:\data\sec_zips\2014q2.zip :  alr

In [None]:
# Helper Code to clear the extract_temp_folder
shutil.rmtree(extract_temp_folder)
Path(extract_temp_folder).mkdir(parents=True, exist_ok=True) # create directory after it was deleted

In [None]:
shutil.rmtree(all_parquet_folder,  ignore_errors=True) # make sure the target folder is empty
merge_to_single_parquet()

duration:  981.0835425853729


In [None]:
spark.stop()