In [1]:
import findspark, pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
# Create the Spark session

findspark.init()
findspark.find()

spark = SparkSession\
        .builder\
        .appName("ADBFinacialReportsSecDataPreparation")\
        .config("spark.sql.shuffle.partitions",200)\
        .config("spark.driver.memory", "16G")\
        .config("spark.sql.repl.eagereval.enabled",True)\
        .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/01 20:13:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/01 20:13:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [5]:
data_dir = "../Datasets/financial-reports-sec/data/large/"
output_dir = "../Datasets/financial-reports-sec/parquet/large/"

In [6]:
def concat_sections(col_list):
  # Concatenate elements in each array into a single string for each section
  concat_array_cols = [F.concat_ws(" ", F.col(section)).alias(section) for section in col_list]
  
  return F.concat_ws(" ", *concat_array_cols).alias("report")

In [7]:
def df_transform(df):
  # List of sections to concatenate
  sections = [
    "filing.report.section_1", "filing.report.section_1A", "filing.report.section_1B",
    "filing.report.section_2", "filing.report.section_3", "filing.report.section_4",
    "filing.report.section_5", "filing.report.section_6", "filing.report.section_7",
    "filing.report.section_7A", "filing.report.section_8", "filing.report.section_9",
    "filing.report.section_9A", "filing.report.section_9B", "filing.report.section_10",
    "filing.report.section_11", "filing.report.section_12", "filing.report.section_13",
    "filing.report.section_14", "filing.report.section_15"
  ]
  # Explode the filings array to get one row per filing
  df_exploded = df.withColumn("filing", F.explode("filings")).drop("filings")

  # Concatenate all sections into one single text field
  df_exploded = df_exploded.withColumn("report", concat_sections(sections))

  # Extract labels, dates and returns fields
  df_exploded = df_exploded \
  .withColumn("labels_1d", F.col("filing.labels.1d")) \
  .withColumn("labels_30d", F.col("filing.labels.30d")) \
  .withColumn("labels_5d", F.col("filing.labels.5d")) \
  .withColumn("returns_1d_closePriceEndDate", F.col("filing.returns.1d.closePriceEndDate")) \
  .withColumn("returns_1d_closePriceStartDate", F.col("filing.returns.1d.closePriceStartDate")) \
  .withColumn("returns_1d_endDate", F.col("filing.returns.1d.endDate")) \
  .withColumn("returns_1d_ret", F.col("filing.returns.1d.ret")) \
  .withColumn("returns_1d_startDate", F.col("filing.returns.1d.startDate")) \
  .withColumn("returns_30d_closePriceEndDate", F.col("filing.returns.30d.closePriceEndDate")) \
  .withColumn("returns_30d_closePriceStartDate", F.col("filing.returns.30d.closePriceStartDate")) \
  .withColumn("returns_30d_endDate", F.col("filing.returns.30d.endDate")) \
  .withColumn("returns_30d_ret", F.col("filing.returns.30d.ret")) \
  .withColumn("returns_30d_startDate", F.col("filing.returns.30d.startDate")) \
  .withColumn("returns_5d_closePriceEndDate", F.col("filing.returns.5d.closePriceEndDate")) \
  .withColumn("returns_5d_closePriceStartDate", F.col("filing.returns.5d.closePriceStartDate")) \
  .withColumn("returns_5d_endDate", F.col("filing.returns.5d.endDate")) \
  .withColumn("returns_5d_ret", F.col("filing.returns.5d.ret")) \
  .withColumn("returns_5d_startDate", F.col("filing.returns.5d.startDate")) \
  .withColumn("acceptanceDateTime", F.col("filing.acceptanceDateTime")) \
  .withColumn("filingDate", F.col("filing.filingDate")) \
  .withColumn("reportDate", F.col("filing.reportDate")) \
  .withColumn("form", F.col("filing.form")) \
  .drop("filing")

  df_exploded = df_exploded \
  .withColumn("labels_1d_num", F.when(F.col("labels_1d") == "positive", "1").otherwise("0")) \
  .withColumn("labels_5d_num", F.when(F.col("labels_5d") == "positive", "1").otherwise("0")) \
  .withColumn("labels_30d_num", F.when(F.col("labels_30d") == "positive", "1").otherwise("0")) \
  .withColumn("labels_concat", F.concat("labels_30d_num", "labels_5d_num", "labels_1d_num")) \
  .withColumn("label", F.udf(lambda x: int(x, 2), IntegerType())("labels_concat"))

  return df_exploded.select(["name", "label", "report"])

In [8]:
df_test = spark.read.json(data_dir + "test")
print("Test Original Size: " + str(df_test.count()))

df_train = spark.read.json(data_dir + "train")
print("Train Original Size: " + str(df_train.count()))

df_validate = spark.read.json(data_dir + "validate")
print("Validate Original Size: " + str(df_validate.count()))

                                                                                

Test Original Size: 468


                                                                                

Train Original Size: 3741
Validate Original Size: 468


In [9]:
df_test.printSchema()

root
 |-- cik: string (nullable = true)
 |-- entityType: string (nullable = true)
 |-- exchanges: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- acceptanceDateTime: string (nullable = true)
 |    |    |-- filingDate: string (nullable = true)
 |    |    |-- form: string (nullable = true)
 |    |    |-- labels: struct (nullable = true)
 |    |    |    |-- 1d: string (nullable = true)
 |    |    |    |-- 30d: string (nullable = true)
 |    |    |    |-- 5d: string (nullable = true)
 |    |    |-- report: struct (nullable = true)
 |    |    |    |-- section_1: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- section_10: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- section_11: array (nullable = true)
 |    |    |    |    |-- element: string (contain

In [10]:
df_test = df_transform(df_test)
print("Test Transformed Size: " + str(df_test.count()))

df_train = df_transform(df_train)
print("Train Transformed Size: " + str(df_train.count()))

df_validate = df_transform(df_validate)
print("Validate Transformed Size: " + str(df_validate.count()))

                                                                                

Test Transformed Size: 1820


                                                                                

Train Transformed Size: 52663
Validate Transformed Size: 866


In [11]:
df_test.printSchema()

root
 |-- name: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- report: string (nullable = false)



In [12]:
df_full = df_test.union(df_train).union(df_validate)
print("Complete Dataset Size: " + str(df_full.count()))
df_full.printSchema()



Complete Dataset Size: 55349
root
 |-- name: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- report: string (nullable = false)



                                                                                

In [13]:
del df_test, df_train, df_validate

In [14]:
df_small = df_full.sampleBy(
  "label", 
  fractions={
    0: 0.5,
    1: 0.5,
    2: 0.5,
    3: 0.5,
    4: 0.5,
    5: 0.5,
    6: 0.5,
    7: 0.5,
    8: 0.5
  },
  seed = 7
)

In [15]:
df_full.write.mode("overwrite").parquet(output_dir + "full.parquet")

del df_full

                                                                                

In [16]:
df_train = df_small.sampleBy(
  "label", 
  fractions={
    0: 0.8,
    1: 0.8,
    2: 0.8,
    3: 0.8,
    4: 0.8,
    5: 0.8,
    6: 0.8,
    7: 0.8,
    8: 0.8
  },
  seed = 7
)

df_test = df_small.subtract(df_train)

In [17]:
df_small.groupBy("label").count().orderBy(F.col("count").desc()).show()
df_train.groupBy("label").count().orderBy(F.col("count").desc()).show()
df_test.groupBy("label").count().orderBy(F.col("count").desc()).show()

                                                                                

+-----+-----+
|label|count|
+-----+-----+
|    0| 8159|
|    7| 6820|
|    4| 3740|
|    3| 2603|
|    6| 2163|
|    1| 1755|
|    5| 1250|
|    2| 1220|
+-----+-----+



                                                                                

+-----+-----+
|label|count|
+-----+-----+
|    0| 6448|
|    7| 5454|
|    4| 2956|
|    3| 2084|
|    6| 1723|
|    1| 1395|
|    5|  996|
|    2|  951|
+-----+-----+



[Stage 39:>                                                       (0 + 12) / 13]

+-----+-----+
|label|count|
+-----+-----+
|    0| 1709|
|    7| 1364|
|    4|  780|
|    3|  518|
|    6|  439|
|    1|  360|
|    2|  267|
|    5|  253|
+-----+-----+



                                                                                

In [18]:
df_small.write.mode("overwrite").parquet(output_dir + "small.parquet")
df_train.write.mode("overwrite").parquet(output_dir + "train.parquet")
df_test.write.mode("overwrite").parquet(output_dir + "test.parquet")


                                                                                

In [19]:
spark.stop()