In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("preprocess_raw_financial_statements") \
    .master("local[*]") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/01 18:14:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import os
from pyspark.sql.functions import col

TEST_MODE = True
RAW_DATA_DIR = "/workspace/data/raw/"

union_df = None

for dirname in os.listdir(RAW_DATA_DIR):
    if TEST_MODE and not dirname.startswith("2020"):
        continue

    print(f" - {dirname}")

    num_df = spark.read.csv(
        path=os.path.join(RAW_DATA_DIR, dirname, "num.txt"),
        sep="\t",
        header=True,
        inferSchema=True
    ) \
        .filter(col("version").startswith("us-gaap")) \
        .filter(col("value").isNotNull()) \
        .filter(col("segments").isNull()) \
        .filter(col("coreg").isNull()) \
        .filter(col("uom") == "USD") \
        .filter(col("qtrs").isin([0, 4])) \
        .select("adsh", "tag", "ddate", "value")

    sub_df = spark.read.csv(
        path=os.path.join(RAW_DATA_DIR, dirname, "sub.txt"),
        sep="\t",
        header=True,
        inferSchema=True
    ) \
        .filter(col("prevrpt") == 0) \
        .filter(col("form") == "10-K") \
        .select("adsh", "cik", "sic", "filed")

    joined_df = num_df.join(sub_df, on="adsh", how="inner").drop("adsh")

    union_df = joined_df if union_df is None else union_df.union(joined_df)

 - 2020_Q1


                                                                                

 - 2020_Q2


                                                                                

 - 2020_Q3


                                                                                

 - 2020_Q4


                                                                                

In [6]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window = Window.partitionBy("cik", "tag", "ddate").orderBy(desc("filed"))
deduped_df = union_df.withColumn("row_num", row_number().over(window)) \
               .filter("row_num = 1").drop("row_num")

In [7]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def industry_mapping_func(industries_filepath: str):
    industries_map = {}

    with open(industries_filepath, "r") as industries_file:
        current_industry = None

        for line in industries_file:
            line = line.strip()

            if line.endswith(":"):
                current_industry = line[:-1]
                industries_map[current_industry] = []
            elif len(line) > 0:
                min_sic, max_sic = map(int, line.split("-"))
                industries_map[current_industry].append((min_sic, max_sic))

    def map_sic_to_industry(sic: int) -> str:
        for industry, ranges in industries_map.items():
            for min_sic, max_sic in ranges:
                if min_sic <= sic <= max_sic:
                    return industry
        return "Other"

    return map_sic_to_industry

sic_mapper_udf = udf(
    industry_mapping_func("/workspace/data/fama-french-industries.txt"),
    StringType()
)

In [9]:
from pyspark.sql.functions import substring

final_df = deduped_df.withColumn("year", substring("ddate", 1, 4)) \
               .withColumn("industry", sic_mapper_udf("sic")) \
               .select("tag", "industry", "cik", "year", "value")

final_df.show()



+--------------------+-------------+----+----+----------+
|                 tag|     industry| cik|year|     value|
+--------------------+-------------+----+----+----------+
|AssetImpairmentCh...|Manufacturing|1750|2020| 8100000.0|
|CashAndCashEquiva...|Manufacturing|1750|2020|   4.047E8|
|CashProvidedByUse...|Manufacturing|1750|2019| -500000.0|
|    CostsAndExpenses|Manufacturing|1750|2018|  1.6628E9|
|EffectOfExchangeR...|Manufacturing|1750|2018| -100000.0|
|            Goodwill|Manufacturing|1750|2018|   1.187E8|
|IncreaseDecreaseI...|Manufacturing|1750|2019|    3.44E7|
|IncreaseDecreaseI...|Manufacturing|1750|2018|-3400000.0|
|InvestmentIncomeI...|Manufacturing|1750|2018|  100000.0|
|PaymentsForProcee...|Manufacturing|1750|2020| 2800000.0|
|AccountsPayableTr...|   Healthcare|1800|2019|   3.252E9|
|AmortizationOfFin...|   Healthcare|1800|2017| 5000000.0|
|CashAndCashEquiva...|   Healthcare|1800|2019|    3.86E9|
|ConstructionInPro...|   Healthcare|1800|2018|    8.94E8|
|ConstructionI

                                                                                

In [12]:
# for each year/industry, sum up NetIncomeLoss
from pyspark.sql.functions import lit, sum

profit_df = final_df.filter(col("tag") == "NetIncomeLoss") \
                .groupBy("year", "industry") \
                .agg(sum("value").alias("value")) \
                .withColumn("metric", lit("profit"))

profit_df.show()



+----+--------------------+----------------+------+
|year|            industry|           value|metric|
+----+--------------------+----------------+------+
|2020|   Consumer Durables|    9.64858687E8|profit|
|2018|  Telecommunications| 3.8866045969E10|profit|
|2010|          Technology|         1.969E7|profit|
|2011|          Technology|       -278000.0|profit|
|2019|       Manufacturing| 5.3254011787E10|profit|
|2018|               Other|1.28920399787E11|profit|
|2020|Consumer Nondurables|    7.70115507E9|profit|
|2019|          Healthcare| 5.5588299566E10|profit|
|2020|Retail, Wholesale...| 4.1647079114E10|profit|
|2017|Consumer Nondurables|  4.692097116E10|profit|
|2016|           Chemicals|          6.48E8|profit|
|2018|           Utilities| 4.9517532738E10|profit|
|2017|  Telecommunications|    7.1878395E10|profit|
|2019|           Chemicals| 1.6472051893E10|profit|
|2020|           Chemicals|   2.996184334E9|profit|
|2017|       Manufacturing| 3.1188103871E10|profit|
|2020|  Tele

                                                                                