In [None]:
!pip install neo4j yfiles_jupyter_graphs_for_neo4j
!pip install pyspark

Collecting neo4j
  Downloading neo4j-5.28.1-py3-none-any.whl.metadata (5.9 kB)
Collecting yfiles_jupyter_graphs_for_neo4j
  Downloading yfiles_jupyter_graphs_for_neo4j-1.6.0-py3-none-any.whl.metadata (17 kB)
Collecting yfiles_jupyter_graphs>=1.9.0 (from yfiles_jupyter_graphs_for_neo4j)
  Downloading yfiles_jupyter_graphs-1.10.1-py3-none-any.whl.metadata (19 kB)
Collecting ipywidgets>=8.0.0 (from yfiles_jupyter_graphs>=1.9.0->yfiles_jupyter_graphs_for_neo4j)
  Downloading ipywidgets-8.1.5-py3-none-any.whl.metadata (2.3 kB)
Collecting comm>=0.1.3 (from ipywidgets>=8.0.0->yfiles_jupyter_graphs>=1.9.0->yfiles_jupyter_graphs_for_neo4j)
  Downloading comm-0.2.2-py3-none-any.whl.metadata (3.7 kB)
Collecting widgetsnbextension~=4.0.12 (from ipywidgets>=8.0.0->yfiles_jupyter_graphs>=1.9.0->yfiles_jupyter_graphs_for_neo4j)
  Downloading widgetsnbextension-4.0.13-py3-none-any.whl.metadata (1.6 kB)
Collecting jedi>=0.16 (from ipython>=6.1.0->ipywidgets>=8.0.0->yfiles_jupyter_graphs>=1.9.0->yfiles_

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, FloatType

# Initializing Spark session
spark = SparkSession.builder \
    .appName("Loading Data") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .getOrCreate()

In [None]:
# Schema Definition
sub_schema = StructType([
    StructField("adsh", StringType(), nullable=False),
    StructField("cik", StringType(), nullable=False),
    StructField("name", StringType(), nullable=False),
    StructField("sic", StringType(), nullable=True),
    StructField("countryba", StringType(), nullable=True),
    StructField("stprba", StringType(), nullable=True),
    StructField("cityba", StringType(), nullable=True),
    StructField("zipba", StringType(), nullable=True),
    StructField("bas1", StringType(), nullable=True),
    StructField("bas2", StringType(), nullable=True),
    StructField("baph", StringType(), nullable=True),
    StructField("countryma", StringType(), nullable=True),
    StructField("stprma", StringType(), nullable=True),
    StructField("cityma", StringType(), nullable=True),
    StructField("zipma", StringType(), nullable=True),
    StructField("mas1", StringType(), nullable=True),
    StructField("mas2", StringType(), nullable=True),
    StructField("countryinc", StringType(), nullable=True),
    StructField("stprinc", StringType(), nullable=True),
    StructField("ein", StringType(), nullable=True),
    StructField("former", StringType(), nullable=True),
    StructField("changed", StringType(), nullable=True),
    StructField("afs", StringType(), nullable=True),
    StructField("wksi", StringType(), nullable=False),
    StructField("fye", StringType(), nullable=True),
    StructField("form", StringType(), nullable=False),
    StructField("period", StringType(), nullable=False),
    StructField("fy", StringType(), nullable=True),
    StructField("fp", StringType(), nullable=True),
    StructField("filed", StringType(), nullable=False),
    StructField("accepted", StringType(), nullable=False),
    StructField("prevrpt", StringType(), nullable=False),
    StructField("detail", StringType(), nullable=False),
    StructField("instance", StringType(), nullable=False),
    StructField("nciks", StringType(), nullable=False),
    StructField("aciks", StringType(), nullable=True)])

# Function to load data for a specific quarter
def load_sub_data(quarter):
    sub_df = spark.read.option("delimiter", "\t") \
        .schema(sub_schema) \
        .csv(f"/content/drive/Shareddrives/DATS6450-Project2/{quarter}/sub.txt")

    sub_df = sub_df.replace("", None)
    sub_df = sub_df.filter(sub_df.adsh != sub_df.first()[0])

    return sub_df

# Combinging all quarters into a single DataFrame
sub_data = (
    load_sub_data("2016q1")
    .union(load_sub_data("2016q2"))
    .union(load_sub_data("2016q3"))
    .union(load_sub_data("2016q4")))

sub_data.show(5)

+--------------------+-------+--------------------+----+---------+------+-------------------+----------+--------------------+--------------------+---------------+---------+------+-------------------+-------+--------------------+--------------------+----------+-------+---------+--------------------+--------+-----+----+----+------+--------+----+---+--------+--------------------+-------+------+-----------------+-----+-----+
|                adsh|    cik|                name| sic|countryba|stprba|             cityba|     zipba|                bas1|                bas2|           baph|countryma|stprma|             cityma|  zipma|                mas1|                mas2|countryinc|stprinc|      ein|              former| changed|  afs|wksi| fye|  form|  period|  fy| fp|   filed|            accepted|prevrpt|detail|         instance|nciks|aciks|
+--------------------+-------+--------------------+----+---------+------+-------------------+----------+--------------------+--------------------+----

In [None]:
# Defining Schema
num_schema = StructType([
    StructField("adsh", StringType(), nullable=False),
    StructField("tag", StringType(), nullable=False),
    StructField("version", StringType(), nullable=False),
    StructField("ddate", DateType(), nullable=False),
    StructField("qtrs", IntegerType(), nullable=False),
    StructField("uom", StringType(), nullable=False),
    StructField("segments", StringType(), nullable=True),
    StructField("coreg", StringType(), nullable=True),
    StructField("value", FloatType(), nullable=True),
    StructField("footnote", StringType(), nullable=True)
])

# Function to load data for a specific quarter
def load_num_data(quarter):
    num_df = spark.read.option("delimiter", "\t") \
        .schema(num_schema) \
        .csv(f"/content/drive/Shareddrives/DATS6450-Project2/{quarter}/num.txt")

    num_df = num_df.replace("", None)
    num_df = num_df.filter(num_df.adsh != num_df.first()[0])

    return num_df

# Combining all quarters into a single DataFrame
num_data = (
    load_num_data("2016q1")
    .union(load_num_data("2016q2"))
    .union(load_num_data("2016q3"))
    .union(load_num_data("2016q4")))

num_data.show(5)

+--------------------+--------------------+------------+-----+----+---+--------------------+-----+----------+--------+
|                adsh|                 tag|     version|ddate|qtrs|uom|            segments|coreg|     value|footnote|
+--------------------+--------------------+------------+-----+----+---+--------------------+-----+----------+--------+
|0000919574-16-012035|AccountsPayableCu...|us-gaap/2015| NULL|   0|USD|                NULL| NULL| 1807000.0|    NULL|
|0001477932-16-009072|AccumulatedDeprec...|us-gaap/2014| NULL|   0|USD|                NULL| NULL|   70484.0|    NULL|
|0001628280-16-010235|AccruedLiabilitie...|us-gaap/2015| NULL|   0|USD|                NULL| NULL|  4.8368E7|    NULL|
|0000919574-16-012194|DueToRelatedParti...|us-gaap/2015| NULL|   0|USD|RelatedPartyTrans...| NULL|       0.0|    NULL|
|0001140361-16-059398|       NetIncomeLoss|us-gaap/2015| NULL|   4|USD|PartnerCapitalCom...| NULL|-5438000.0|    NULL|
+--------------------+--------------------+-----

In [None]:
# Defining Schema
pre_schema = StructType([
    StructField("adsh", StringType(), nullable=False),
    StructField("report", IntegerType(), nullable=False),
    StructField("line", IntegerType(), nullable=False),
    StructField("stmt", StringType(), nullable=False),
    StructField("inpth", StringType(), nullable=False),
    StructField("rfile", StringType(), nullable=False),
    StructField("tag", StringType(), nullable=False),
    StructField("version", StringType(), nullable=False),
    StructField("plabel", StringType(), nullable=False),
    StructField("negating", StringType(), nullable=False)
])

# Loading data for all quarters
def load_pre_data(quarter):
    pre_df = spark.read.option("delimiter", "\t") \
        .schema(pre_schema) \
        .csv(f"/content/drive/Shareddrives/DATS6450-Project2/{quarter}/pre.txt")

    pre_df = pre_df.replace("", None)
    pre_df = pre_df.filter(pre_df.adsh != pre_df.first()[0])
    return pre_df

# Combining all quarters into a single DataFrame
pre_data = (
    load_pre_data("2016q1")
    .union(load_pre_data("2016q2"))
    .union(load_pre_data("2016q3"))
    .union(load_pre_data("2016q4")))

pre_data.show(5)

+--------------------+------+----+----+-----+-----+--------------------+------------+--------------------+--------+
|                adsh|report|line|stmt|inpth|rfile|                 tag|     version|              plabel|negating|
+--------------------+------+----+----+-----+-----+--------------------+------------+--------------------+--------+
|0000002178-16-000064|     2|   3|  BS|    0|    H|CashAndCashEquiva...|us-gaap/2015|Cash and cash equ...|       0|
|0000002178-16-000064|     2|   4|  BS|    0|    H|AccountsReceivabl...|us-gaap/2015|Accounts receivab...|       0|
|0000002178-16-000064|     2|   5|  BS|    0|    H|EnergyRelatedInve...|us-gaap/2015|         Inventories|       0|
|0000002178-16-000064|     2|   6|  BS|    0|    H|EnergyMarketingCo...|us-gaap/2015|Fair value contracts|       0|
|0000002178-16-000064|     2|   7|  BS|    0|    H|IncomeTaxesReceiv...|us-gaap/2015|Income tax receiv...|       0|
+--------------------+------+----+----+-----+-----+--------------------+

In [None]:
# Define Schema
tag_schema = StructType([
    StructField("tag", StringType(), nullable=False),
    StructField("version", StringType(), nullable=False),
    StructField("custom", StringType(), nullable=False),
    StructField("abstract", StringType(), nullable=False),
    StructField("datatype", StringType(), nullable=True),
    StructField("iord", StringType(), nullable=False),
    StructField("crdr", StringType(), nullable=True),
    StructField("tlabel", StringType(), nullable=True),
    StructField("doc", StringType(), nullable=True)
])

# Loading data for all quarters
def load_tag_data(quarter):
    tag_df = spark.read.option("delimiter", "\t") \
        .schema(tag_schema) \
        .csv(f"/content/drive/Shareddrives/DATS6450-Project2/{quarter}/tag.txt")

    tag_df = tag_df.replace("", None)
    tag_df = tag_df.filter(tag_df.tag != tag_df.first()[0])
    return tag_df

# Combining all quarters into a single DataFrame
tag_data = (
    load_tag_data("2016q1")
    .union(load_tag_data("2016q2"))
    .union(load_tag_data("2016q3"))
    .union(load_tag_data("2016q4")))

tag_data.show(5)

+--------------------+------------+------+--------+--------+----+----+--------------------+--------------------+
|                 tag|     version|custom|abstract|datatype|iord|crdr|              tlabel|                 doc|
+--------------------+------------+------+--------+--------+----+----+--------------------+--------------------+
|AccountsPayableCu...|us-gaap/2014|     0|       0|monetary|   I|   C|Accounts Payable,...|Carrying value as...|
|AccountsPayableRe...|us-gaap/2014|     0|       0|monetary|   I|   C|Accounts Payable,...|Amount for accoun...|
|AccountsReceivabl...|us-gaap/2014|     0|       0|monetary|   I|   D|Accounts Receivab...|Amount, after all...|
|AccruedLiabilitie...|us-gaap/2014|     0|       0|monetary|   I|   C|Accrued Liabiliti...|Carrying value as...|
|AccumulatedDeprec...|us-gaap/2014|     0|       0|monetary|   I|   C|Accumulated Depre...|Amount of accumul...|
+--------------------+------------+------+--------+--------+----+----+--------------------+-----

In [None]:
num_data = num_data.select(
    "adsh", "tag", "version", "qtrs", "uom", "value")

num_data.show(5)
sub_data = sub_data.select(
    "adsh", "cik", "name", "countryba", "stprba", "countryma", "stprma",
    "ein", "former", "changed", "afs", "fye", "form", "period", "fy",
    "fp", "filed", "nciks")

sub_data.show(5)
tag_data = tag_data.select("tag", "version", "abstract", "iord", "crdr", "tlabel")

tag_data.show()
pre_data = pre_data.select("adsh", "report", "line", "stmt", "inpth", "tag", "version", "plabel")

pre_data.show(5)

+--------------------+--------------------+------------+----+---+----------+
|                adsh|                 tag|     version|qtrs|uom|     value|
+--------------------+--------------------+------------+----+---+----------+
|0000919574-16-012035|AccountsPayableCu...|us-gaap/2015|   0|USD| 1807000.0|
|0001477932-16-009072|AccumulatedDeprec...|us-gaap/2014|   0|USD|   70484.0|
|0001628280-16-010235|AccruedLiabilitie...|us-gaap/2015|   0|USD|  4.8368E7|
|0000919574-16-012194|DueToRelatedParti...|us-gaap/2015|   0|USD|       0.0|
|0001140361-16-059398|       NetIncomeLoss|us-gaap/2015|   4|USD|-5438000.0|
+--------------------+--------------------+------------+----+---+----------+
only showing top 5 rows

+--------------------+-------+--------------------+---------+------+---------+------+---------+--------------------+--------+-----+----+------+--------+----+---+--------+-----+
|                adsh|    cik|                name|countryba|stprba|countryma|stprma|      ein|          

In [None]:
sub_data = sub_data.dropna()
num_data = num_data.dropna()
tag_data = tag_data.dropna()
pre_data = pre_data.dropna()

num_data.show(5)
tag_data.show(5)
pre_data.show(5)
sub_data.show(5)

+--------------------+--------------------+------------+----+---+----------+
|                adsh|                 tag|     version|qtrs|uom|     value|
+--------------------+--------------------+------------+----+---+----------+
|0000919574-16-012035|AccountsPayableCu...|us-gaap/2015|   0|USD| 1807000.0|
|0001477932-16-009072|AccumulatedDeprec...|us-gaap/2014|   0|USD|   70484.0|
|0001628280-16-010235|AccruedLiabilitie...|us-gaap/2015|   0|USD|  4.8368E7|
|0000919574-16-012194|DueToRelatedParti...|us-gaap/2015|   0|USD|       0.0|
|0001140361-16-059398|       NetIncomeLoss|us-gaap/2015|   4|USD|-5438000.0|
+--------------------+--------------------+------------+----+---+----------+
only showing top 5 rows

+--------------------+------------+--------+----+----+--------------------+
|                 tag|     version|abstract|iord|crdr|              tlabel|
+--------------------+------------+--------+----+----+--------------------+
|AccountsPayableCu...|us-gaap/2014|       0|   I|   C|

In [None]:
total_rows = sub_data.count()
total_columns = len(sub_data.columns)

print(f"\nTotal shape of sub_data DataFrame: ({total_rows}, {total_columns})")

total_rows = num_data.count()
total_columns = len(num_data.columns)

print(f"\nTotal shape of num_data DataFrame: ({total_rows}, {total_columns})")

total_rows = tag_data.count()
total_columns = len(tag_data.columns)

print(f"\nTotal shape of tag_data DataFrame: ({total_rows}, {total_columns})")

total_rows = pre_data.count()
total_columns = len(pre_data.columns)

print(f"\nTotal shape of num_data DataFrame: ({total_rows}, {total_columns})")


Total shape of sub_data DataFrame: (12681, 18)

Total shape of num_data DataFrame: (10290627, 6)

Total shape of tag_data DataFrame: (216448, 6)

Total shape of num_data DataFrame: (2734585, 8)


In [None]:
# Filter PRE data for specific stmt values
stmt_values = ["BS", "IS", "CF", "EQ", "CI", "SI"]
filtered_pre_data = pre_data.filter(pre_data.stmt.isin(stmt_values))

# Join SUB and PRE on adsh
sub_pre_joined = sub_data.join(filtered_pre_data, on="adsh", how="inner")

# Join NUM and SUB and PRE on adsh, version, and tag
num_pre_joined = sub_pre_joined.join(num_data, on=["adsh", "version", "tag"], how="inner")

num_pre_joined = num_pre_joined.dropDuplicates()

num_pre_joined.show(5)
print(num_pre_joined.count())

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

num_pre_joined = num_pre_joined.withColumn("value", F.col("value").cast(FloatType()))
crdr_mapping = {row.tag: row.crdr for row in tag_data.select("tag", "crdr").collect()}

# Broadcast the dictionary to optimize access
crdr_broadcast = spark.sparkContext.broadcast(crdr_mapping)

# Define a UDF to map the crdr value
def get_crdr(tag):
    return crdr_broadcast.value.get(tag, None)

# Register the UDF
get_crdr_udf = F.udf(get_crdr)
num_pre_joined = num_pre_joined.withColumn("crdr", get_crdr_udf(F.col("tag")))

category_rules = {
    "revenue": "D",
    "income": "D",
    "assets": "D",
    "debt": "C",
    "liability": "C"
}

agg_exprs = [
    F.sum(F.when(F.col("crdr") == crdr_value, F.col("value")).otherwise(0)).alias(category)
    for category, crdr_value in category_rules.items()
]

tag_sums = num_pre_joined.groupBy("tag").agg(*agg_exprs)

zero_sum_tags = tag_sums.filter(
    F.greatest(*[F.col(category) for category in category_rules.keys()]) == 0
).select("tag")

zero_sum_tag_list = [row.tag for row in zero_sum_tags.collect()]

num_pre_joined = num_pre_joined.filter(~F.col("tag").isin(zero_sum_tag_list))

In [None]:
from pyspark.sql import functions as F

# Add the new tag category column
num_pre_joined = num_pre_joined.withColumn(
    "tag_category",
    F.when((F.lower(F.col("tag")).rlike(".*income.*")) & (F.col("crdr") == 'C'), "income")  # Includes any tag with 'income'
      .when((F.lower(F.col("tag")).rlike(".*revenue.*")) & (F.col("crdr") == 'C'), "revenue")  # Includes any tag with 'revenue'
      .when((F.lower(F.col("tag")).rlike(".*asset.*")) & (F.col("crdr") == 'C'), "asset")      # Includes any tag with 'asset'
      .when((F.lower(F.col("tag")).rlike(".*debt.*")) & (F.col("crdr") == 'D'), "debt")        # Includes any tag with 'debt'
      .when((F.lower(F.col("tag")).rlike(".*liability.*")) & (F.col("crdr") == 'D'), "liability")  # Includes any tag with 'liability'
      .otherwise("other")  # Default case
)
num_pre_joined.show(5)


In [None]:
from neo4j import GraphDatabase
from pyspark.sql import functions as F

# Neo4j Connection
NEO4J_URI = "neo4j+s://22e0ea28.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "7L6AUSP97VOZFcl5dEeSIFCwacdVwgttCA5ZPsm3UgQ"

def load_company_nodes(df,batch_id):
    """
    Loads unique `cik` values as nodes and merges selected financial properties.
    - `name`, `tag`, `fp`, `stmt`, and `fye` are collected as sets.
    - `value` is summed for each `cik`.
    """

    # Aggregate specified properties and sum `value` for each `cik`
    df_grouped = df.groupBy("cik").agg(
        F.collect_set("name").alias("names"),  # Collect unique names
        F.collect_set("tag").alias("tags"),    # Collect unique tags
        F.collect_set("fp").alias("fps"),      # Collect unique fp values
        F.collect_set("stmt").alias("stmts"),   # Collect unique stmt values
        F.collect_set("fye").alias("fyes"),     # Collect unique fye values
        F.sum("value").alias("total_value")     # Sum of values
    )

    def process_partition(partition):
        driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
        try:
            with driver.session() as session:
                for row in partition:
                    query = """
                    MERGE (c:Company {cik: $cik})
                    SET c.names = $names,
                        c.tags = $tags,
                        c.fps = $fps,
                        c.stmts = $stmts,
                        c.fyes = $fyes,
                        c.total_value = $total_value
                    """
                    session.run(query,
                                cik=row["cik"],
                                names=list(row["names"]),   # Convert collected set to list
                                tags=list(row["tags"]),
                                fps=list(row["fps"]),
                                stmts=list(row["stmts"]),
                                fyes=list(row["fyes"]),
                                total_value=row["total_value"])
        finally:
            driver.close()

    df_grouped.foreachPartition(process_partition)

# Load data from num_pre_joined
load_company_nodes(num_pre_joined, batch_id='main_graph')


In [None]:
import pandas as pd
def fetch_grouped_data():
    driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
    query = """
    MATCH (c:Company)
    WITH c.cik AS cik, c.names AS names, c.tags AS tags, c.fps AS fps, c.total_value AS total_value
    UNWIND names AS name
    UNWIND tags AS tag
    UNWIND fps AS fp
    // Categorize tags based on keywords
    WITH
        cik,
        name,
        CASE
            WHEN toLower(tag) CONTAINS 'revenue' THEN 'Revenue'
            WHEN toLower(tag) CONTAINS 'income' THEN 'Income'
            WHEN toLower(tag) CONTAINS 'debt' THEN 'Debt'
            WHEN toLower(tag) CONTAINS 'assets' THEN 'Assets'
            WHEN toLower(tag) CONTAINS 'liabilities' THEN 'Liabilities'
            ELSE 'Other'  // Handle tags that don't match any condition
        END AS tag_category,
        fp,
        total_value
    // Group by cik, name, tag_category, and fp, and sum total_value
    RETURN
        cik,
        name,
        tag_category,
        fp,
        SUM(total_value) AS total_value
    ORDER BY cik, name, tag_category, fp
    """

    with driver.session() as session:
        result = session.run(query)
        output_data = [record.data() for record in result]

    driver.close()
    return output_data

# Fetch grouped data
grouped_data = fetch_grouped_data()

# Create a DataFrame from the fetched data
df_grouped_data = pd.DataFrame(grouped_data)

print(df_grouped_data)

total_rows = len(df_grouped_data)
total_columns = len(df_grouped_data.columns)
print(f"\nTotal shape of grouped data: ({total_rows}, {total_columns})")

            cik                    name tag_category  fp   total_value
0       1000045  NICHOLAS FINANCIAL INC       Assets  FY  1.868280e+12
1       1000045  NICHOLAS FINANCIAL INC       Assets  Q1  1.868280e+12
2       1000045  NICHOLAS FINANCIAL INC       Assets  Q2  1.868280e+12
3       1000045  NICHOLAS FINANCIAL INC       Assets  Q3  1.868280e+12
4       1000045  NICHOLAS FINANCIAL INC         Debt  FY  2.668971e+11
...         ...                     ...          ...  ..           ...
150002     9984        BARNES GROUP INC        Other  Q3  1.008689e+13
150003     9984        BARNES GROUP INC      Revenue  FY  3.879575e+11
150004     9984        BARNES GROUP INC      Revenue  Q1  3.879575e+11
150005     9984        BARNES GROUP INC      Revenue  Q2  3.879575e+11
150006     9984        BARNES GROUP INC      Revenue  Q3  3.879575e+11

[150007 rows x 5 columns]

Total shape of grouped data: (150007, 5)


In [None]:
categories = ["Revenue", "Debt", "Assets", "Liabilities", "Income"]  # Specified categories

filtered_rows = []

# Loop through each category and get 20 rows for each
for category in categories:
    category_rows = df_grouped_data[df_grouped_data['tag_category'] == category].head(20)
    filtered_rows.append(category_rows)

filtered_df = pd.concat(filtered_rows)
print(filtered_df)


         cik                          name tag_category  fp   total_value
20   1000045        NICHOLAS FINANCIAL INC      Revenue  FY  5.337942e+11
21   1000045        NICHOLAS FINANCIAL INC      Revenue  Q1  5.337942e+11
22   1000045        NICHOLAS FINANCIAL INC      Revenue  Q2  5.337942e+11
23   1000045        NICHOLAS FINANCIAL INC      Revenue  Q3  5.337942e+11
29   1000177   NORDIC AMERICAN TANKERS LTD      Revenue  FY  1.677138e+11
..       ...                           ...          ...  ..           ...
96   1000230            OPTICAL CABLE CORP       Income  FY  1.578437e+11
97   1000230            OPTICAL CABLE CORP       Income  Q1  1.578437e+11
98   1000230            OPTICAL CABLE CORP       Income  Q2  1.578437e+11
116  1000232  KENTUCKY BANCSHARES INC /KY/       Income  FY  3.802694e+12
117  1000232  KENTUCKY BANCSHARES INC /KY/       Income  Q1  3.802694e+12

[100 rows x 5 columns]


In [None]:
! pip install sec-api



In [None]:
import pandas as pd
from pyspark.sql.functions import col, lit, when
import requests
import time
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from neo4j import GraphDatabase
import matplotlib.pyplot as plt

sub_data_10K = sub_data.select("cik", "name")
sub_data_10K = sub_data_10K.toPandas()
sub_data_10K.head()

Unnamed: 0,cik,name
0,1527541,"WHEELER REAL ESTATE INVESTMENT TRUST, INC."
1,1169264,SYNIVERSE HOLDINGS INC
2,712534,FIRST MERCHANTS CORP
3,1112372,MEDICAL INTERNATIONAL TECHNOLOGY INC
4,1590565,"ASIA EQUITY EXCHANGE GROUP, INC."


In [None]:
distinct_cik = sub_data_10K["cik"].unique()
distinct_cik

array(['1527541', '1169264', '712534', ..., '1679817', '1393935',
       '1512228'], dtype=object)

In [None]:
distinct_cik_1 = pd.Series(distinct_cik)
distinct_cik_1.head()

Unnamed: 0,0
0,1527541
1,1169264
2,712534
3,1112372
4,1590565


We imported requests to fetch data from SEC API and beautifulsoup to scrape and parse the relevenat data points from HTML documents. We construced the API requests using company CIK identifiers. We had to extract executive names from the signature section in the filings that list executives and board members.

In [None]:
import requests
from bs4 import BeautifulSoup
import re
import pandas as pd
from datetime import datetime
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# ---------------- CONFIGURATION ----------------
USER_AGENT = "aishitha.pacipala@gwmail.gwu.edu"
TIMEOUT = 10
HEADERS = {"User-Agent": USER_AGENT}

# ---------------- FETCHING FUNCTIONS ----------------
def fetch_submission(cik):
    cik = str(cik).zfill(10)
    url = f"https://data.sec.gov/submissions/CIK{cik}.json"
    response = requests.get(url, headers=HEADERS, timeout=TIMEOUT)
    response.raise_for_status()
    return response.json()

def get_filing_url(submissions, form_type, year):
    filings = submissions["filings"]["recent"]
    for form, date_str, acc, primary in zip(
        filings["form"], filings["filingDate"], filings["accessionNumber"], filings["primaryDocument"]
    ):
        if form.upper() == form_type.upper():
            filing_date = datetime.strptime(date_str, "%Y-%m-%d")
            if filing_date.year == year:
                acc_no = acc.replace("-", "")
                if primary and primary.strip():
                    return f"https://www.sec.gov/Archives/edgar/data/{int(submissions['cik'])}/{acc_no}/{primary}"
                else:
                    return f"https://www.sec.gov/Archives/edgar/data/{int(submissions['cik'])}/{acc_no}.txt"
    return None

def fetch_filing_content(filing_url):
    response = requests.get(filing_url, headers=HEADERS, timeout=TIMEOUT)
    response.raise_for_status()
    return response.text

# ---------------- EXTRACTION FUNCTIONS ----------------

def extract_signature_section(html):
    """
    Attempt to locate the signature section in an SEC filing, even if:
      - 'SIGNATURES' appears multiple times
      - It's sometimes in <b> or with a bold style
      - It's near the end, or in the middle
    We'll do a fallback approach: first look for <b> 'SIGNATURES', else fallback
    to the last text occurrence of 'SIGNATURES' or 'SIGNATURE'.
    Then end at the next known boundary like 'EXHIBIT', '</DOCUMENT>', etc.
    """
    # -- Step 1: Collapse all whitespace for easier regex searching --
    collapsed = re.sub(r"\s+", " ", html)

    # -- Step 2: Attempt to locate bolded SIGNATURES if it exists --
    #    e.g., <b>SIGNATURES</b> or <tag style="font-weight:bold">SIGNATURES</tag>
    bold_pattern = re.compile(
        r"""(?i)            # ignore case
        (?:<b[^>]*>\s*SIGNATURES\s*</b>) |
        (?:<[a-zA-Z]+\s+[^>]*style\s*=\s*["'][^"']*font-weight\s*:\s*bold;?[^"']*["'][^>]*>\s*SIGNATURES\s*</[a-zA-Z]+>)
        """,
        re.VERBOSE
    )
    bold_matches = list(bold_pattern.finditer(collapsed))

    if bold_matches:
        # If we find bold SIGNATURES, pick the last occurrence
        last_bold = bold_matches[-1]
        start_idx = last_bold.start()
    else:
        # -- Step 3: Fallback to last occurrence of SIGNATURE(S) ignoring case --
        #    We also allow for singular SIGNATURE
        generic_pattern = re.compile(r"SIGNATURES?", re.IGNORECASE)
        matches = list(generic_pattern.finditer(collapsed))
        if not matches:
            return None  # No signature reference found at all
        # pick the last occurrence
        last_match = matches[-1]
        start_idx = last_match.start()

    # -- Step 4: Define an end boundary for the signature section
    #    e.g., next "EXHIBIT", closing tags, or end of file
    end_pattern = re.compile(r"(EXHIBIT\s|</HTML>|</DOCUMENT>|</SEC-HEADER>|$)", re.IGNORECASE)
    end_match = end_pattern.search(collapsed, start_idx)
    end_idx = end_match.start() if end_match else len(collapsed)

    # -- Step 5: Extract the substring
    signature_section = collapsed[start_idx:end_idx].strip()

    return signature_section


# ---------------- HELPER FUNCTIONS ----------------
def clean_line(line):
    return line.strip()

def is_date(text):
    return bool(re.search(r"(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},\s+\d{4}", text, re.IGNORECASE))

def guess_position(line):
    keywords = ["director", "officer", "president", "chief", "executive", "senior", "vice president", "title"]
    return any(kw in line.lower() for kw in keywords)

def parse_signature_entry(entry, cik):
    """
    Parse a signature block.
    If the entry contains newline characters, use the line-by-line heuristic:
      - First non-empty line is assumed to be the Name.
      - If >=3 lines and the second line looks like a date, use the third line as Position;
        otherwise, use the second line.
    If there are no newline characters, split on two or more spaces.
    """
    entry = entry.strip()
    name = ""
    position = ""
    # Check if entry contains newline characters
    if "\n" in entry:
        lines = [clean_line(l) for l in entry.splitlines() if l.strip()]
        if not lines:
            return None
        name = lines[0]

    return {"CIK": cik, "Name": name}



def is_valid_entry(entry):
    """
    Returns False if the entry's name is missing, too short, or matches a known heading.
    """
    name = entry.get("Name", "").strip().lower()
    if not name or len(name) < 3 or len(name) > 40:
        return False
    if re.search(r"\b(signatures?)\b", name, re.IGNORECASE):
        return False
    if re.search(r"(LLC|LLP)\s*[\.,]?$", name, re.IGNORECASE):
        return False
    return True

def parse_signatures_from_text(text, cik):
    """
    Split the entire text using the /s/ marker (with following whitespace) and parse each block.
    """
    entries = re.split(r"(?i)/s/\s+", text)
    results = []
    for entry in entries:
        entry = entry.strip()
        if not entry:
            continue
        parsed = parse_signature_entry(entry, cik)
        if parsed and is_valid_entry(parsed):
            results.append(parsed)
    return results


def parse_directors_for_cik(cik, form_type, year=2017):
    submissions = fetch_submission(cik)
    filing_url = get_filing_url(submissions, form_type, year)
    print(filing_url)
    if not filing_url:
        print(f"CIK {cik}: No filing URL found.")
        return pd.DataFrame(columns=["CIK", "Name"])
    html_content = fetch_filing_content(filing_url)
    sig_section = extract_signature_section(html_content)
    if not sig_section:
        print(f"CIK {cik}: No SIGNATURES section found.")
        return pd.DataFrame(columns=["CIK", "Name"])

    # plain text extraction.
    soup = BeautifulSoup(sig_section, "html.parser")
    text = soup.get_text(separator="\n").strip()
    text_results = parse_signatures_from_text(text, cik)
    return pd.DataFrame(text_results, columns=["CIK", "Name"])

# ---------------- MAIN EXECUTION ----------------
if __name__ == "__main__":
    pd.set_option('display.max_colwidth', None)
    final_df = pd.DataFrame(columns=["CIK", "Name"])

    for i, cik in enumerate(distinct_cik_1):
        try:
            df = parse_directors_for_cik(cik, "10-K", year=2016)
            print(f'CIK:{cik} Progess: {i}/{len(distinct_cik_1)}')
            final_df = pd.concat([final_df, df], ignore_index=True)
        except requests.exceptions.ReadTimeout:
            print(f"Read timeout for CIK {cik}. Retrying in 5 seconds...")
            # Try again after a delay:
            try:
                df = parse_directors_for_cik(cik, "10-K", year=2016)
                print(f'CIK:{cik} Progess: {i}/{len(distinct_cik_1)}')
                print(df)
                final_df = pd.concat([final_df, df], ignore_index=True)
            except requests.exceptions.ReadTimeout:
                print(f"Second read timeout for CIK {cik}. Skipping...")

    print("Finish!")
    print(final_df)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
None
CIK 813828: No filing URL found.
CIK:813828 Progess: 5018/7213
https://www.sec.gov/Archives/edgar/data/1516973/000151697316000057/mtge20151231form10k.htm
CIK:1516973 Progess: 5019/7213
https://www.sec.gov/Archives/edgar/data/1637757/000163775716000016/yieldllc1231201510-k.htm
CIK:1637757 Progess: 5020/7213
None
CIK 1371285: No filing URL found.
CIK:1371285 Progess: 5021/7213
https://www.sec.gov/Archives/edgar/data/1526689/000164033416001762/vend_10k.htm
CIK:1526689 Progess: 5022/7213
https://www.sec.gov/Archives/edgar/data/1221554/000114420416112910/v444107_10k.htm
CIK:1221554 Progess: 5023/7213
https://www.sec.gov/Archives/edgar/data/907254/000090725416000153/bfs-12312015x10k.htm
CIK:907254 Progess: 5024/7213
https://www.sec.gov/Archives/edgar/data/63908/000006390816000103/mcd-12312015x10k.htm
CIK:63908 Progess: 5025/7213
https://www.sec.gov/Archives/edgar/data/928022/000092802216000132/cpe-20151231x10k.htm
CIK:9280

We next extraced ticker's of companies and combined and cleaned the executives names, cik and tickers.

In [None]:
sec_url = "https://www.sec.gov/files/company_tickers.json"
headers = {"User-Agent": "YourName/YourCompany your_email@example.com"}
response = requests.get(sec_url, headers=headers)

if response.status_code == 200:
    cik_to_ticker = {str(v["cik_str"]).zfill(10): v["ticker"] for v in response.json().values()}
else:
    cik_to_ticker = {}

sub_data_10K['cik'] = sub_data_10K['cik'].astype(str).str.zfill(10)
sub_data_10K['ticker'] = sub_data_10K['cik'].map(cik_to_ticker).fillna('N/A')
sub_data_10K.head()

Unnamed: 0,cik,name,ticker
0,1527541,"WHEELER REAL ESTATE INVESTMENT TRUST, INC.",WHLRL
1,1169264,SYNIVERSE HOLDINGS INC,
2,712534,FIRST MERCHANTS CORP,FRMEP
3,1112372,MEDICAL INTERNATIONAL TECHNOLOGY INC,
4,1590565,"ASIA EQUITY EXCHANGE GROUP, INC.",


In [None]:
final_df.head()

Unnamed: 0,CIK,Name
0,1169264,STEPHEN C. GRAY
1,1169264,ROBERT F. REICH
2,1169264,"JAMES A. ATTWOOD, JR."
3,1169264,TONY G. HOLCOMBE
4,1169264,KRISTEN ANKERBRANDT


In [None]:
final_df['CIK'] = final_df['CIK'].astype(str).str.zfill(10)
final_df['Ticker'] = final_df['CIK'].map(cik_to_ticker).fillna('N/A')
final_df.head()

Unnamed: 0,CIK,Name,Ticker
0,1169264,STEPHEN C. GRAY,
1,1169264,ROBERT F. REICH,
2,1169264,"JAMES A. ATTWOOD, JR.",
3,1169264,TONY G. HOLCOMBE,
4,1169264,KRISTEN ANKERBRANDT,


In [None]:
# Remove rows where Ticker is 'N/A'
final_df_2 = final_df[final_df['Ticker'] != 'N/A'].reset_index(drop=True)
final_df_2

Unnamed: 0,CIK,Name,Ticker
0,0000861884,David H,RS
1,0000861884,Gregg J. Mollins,RS
2,0000861884,Karla R. Lewis,RS
3,0000861884,Sarah J. Anderson,RS
4,0000861884,John G. Figueroa,RS
...,...,...,...
19061,0001674862,J. Kevin Willis,ASH
19062,0001674862,J. William Heitman,ASH
19063,0001674862,Peter J. Ganz,ASH
19064,0001674862,William A. Wulfsohn,ASH


In [None]:
final_df_2.to_csv('/content/drive/My Drive/final_data.csv', index=False)
from google.colab import files
files.download('/content/drive/My Drive/final_data.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
import pandas as pd

final_df_2 = pd.read_csv('/content/drive/My Drive/final_data.csv')
final_df_2.head()
final_df_3 = final_df_2.head(10)

In [None]:
from neo4j import GraphDatabase

# Neo4j Connection details
NEO4J_URI = "neo4j+s://254c3ff0.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "BJcjt95hPsvkUSb_7yBZbEEjTftd_EoMS_aoxmeYqYk"

def delete_all_nodes():
    """
    Deletes all nodes and relationships from Neo4j.
    """
    driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
    try:
        with driver.session() as session:
            session.run("MATCH (n) DETACH DELETE n;")
        print("All nodes and relationships deleted successfully.")
    finally:
        driver.close()

# Call the function to delete all nodes
delete_all_nodes()


All nodes and relationships deleted successfully.


Created graph with executives and companies nodes

In [None]:
from neo4j import GraphDatabase
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Neo4j Upload") \
    .getOrCreate()

# Convert Pandas DataFrame to Spark DataFrame if needed
final_df_2_spark = spark.createDataFrame(final_df_3)

# Neo4j Connection details
NEO4J_URI = "neo4j+s://254c3ff0.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "BJcjt95hPsvkUSb_7yBZbEEjTftd_EoMS_aoxmeYqYk"

def load_executive_nodes(df):
    """
    Loads unique `Name` values as nodes and connects them to Company nodes.
    """
    def process_partition(partition):
        # Initialize Neo4j driver within the partition function
        driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
        try:
            with driver.session() as session:
                for row in partition:
                    cik = row["CIK"]
                    ticker = row["Ticker"]
                    name = row["Name"]

                    # Create Company node with explicit CIK property
                    session.run("""
                    MERGE (c:Company {cik: $cik})
                    SET c.cik = $cik,
                        c.ticker = $ticker

                    MERGE (e:Executive {name: $name})
                    SET e.name = $name

                    MERGE (e)-[:HAS_ROLE]->(c)
                    """, cik=cik, ticker=ticker, name=name)
        finally:
            driver.close()  # Ensure the driver is closed after processing the partition

    df.foreachPartition(process_partition)

load_executive_nodes(final_df_2_spark)


Fetched top 10 Highly Influential Executives by Degree Centrality and created graph.

In [None]:
from neo4j import GraphDatabase

# Neo4j Connection
NEO4J_URI = "neo4j+s://254c3ff0.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "BJcjt95hPsvkUSb_7yBZbEEjTftd_EoMS_aoxmeYqYk"

def get_top_companies_by_weighted_degree():
    driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
    try:
        with driver.session() as session:
            result = session.run("""
            MATCH (e:Executive)-[:HAS_ROLE]->(c:Company)
            WITH e, c, COUNT(c) AS exec_degree
            SET e.degree = exec_degree

            WITH c, SUM(e.degree) AS weighted_degree
            SET c.weighted_degree = weighted_degree
            RETURN c.cik AS cik, c.ticker AS ticker, c.weighted_degree AS weighted_degree
            ORDER BY weighted_degree DESC
            LIMIT 10;
            """)

            print("Top 10 Companies by Weighted Executive Degree Centrality:")
            for record in result:
                print(f"CIK: {record['cik']}, Ticker: {record['ticker']}, Weighted Degree: {record['weighted_degree']}")
    finally:
        driver.close()

# Run the function
get_top_companies_by_weighted_degree()


Top 10 Companies by Weighted Executive Degree Centrality:
CIK: 898171, Ticker: UWHR, Weighted Degree: 21
CIK: 1604028, Ticker: WMS, Weighted Degree: 20
CIK: 1265131, Ticker: HTH, Weighted Degree: 19
CIK: 27904, Ticker: DAL, Weighted Degree: 19
CIK: 14693, Ticker: BF-B, Weighted Degree: 18
CIK: 845877, Ticker: AGM-PG, Weighted Degree: 18
CIK: 1214816, Ticker: AXS-PE, Weighted Degree: 17
CIK: 1163609, Ticker: SDSYA, Weighted Degree: 17
CIK: 1506307, Ticker: EP-PC, Weighted Degree: 17
CIK: 354707, Ticker: HE, Weighted Degree: 17


In [None]:
from neo4j import GraphDatabase

# Neo4j Connection
NEO4J_URI = "neo4j+s://254c3ff0.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "BJcjt95hPsvkUSb_7yBZbEEjTftd_EoMS_aoxmeYqYk"

def get_highly_influential_executives():
    driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
    try:
        with driver.session() as session:
            # Run the degree centrality query for executives
            result = session.run("""
            MATCH (e:Executive)-[:HAS_ROLE]->(c:Company)
            WITH e, COUNT(c) AS degree
            SET e.degree = degree

            RETURN e.name AS name, e.degree AS degree
            ORDER BY e.degree DESC
            LIMIT 10;
            """)

            print("Top 10 Highly Influential Executives by Degree Centrality:")
            for record in result:
                print(f"Name: {record['name']}, Degree: {record['degree']}")
    finally:
        driver.close()

# Run the function
get_highly_influential_executives()


Top 10 Highly Influential Executives by Degree Centrality:
Name: Peter M. Robinson, Degree: 6
Name: Malcolm R. Fobes III, Degree: 6
Name: Melinda Gerber, Degree: 6
Name: Nicholas D. Gerber, Degree: 6
Name: Gordon L. Ellis, Degree: 6
Name: Andrew Ngim, Degree: 5
Name: Stuart Crumbaugh, Degree: 5
Name: John P. Love, Degree: 5
Name: Robert L. Nguyen, Degree: 5
Name: Graham Tuckwell, Degree: 5


In [None]:
from yfiles_jupyter_graphs_for_neo4j import Neo4jGraphWidget
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))

g = Neo4jGraphWidget(driver)

def show_graph(driver):
    query = """
    MATCH (c)<-[r:HAS_ROLE]-(e)
    RETURN c, r, e
    LIMIT 100
    """
    g.show_cypher(query)

show_graph(driver)

GraphWidget(layout=Layout(height='800px', width='100%'))

In [None]:
from google.colab import output
output.enable_custom_widget_manager()

Support for third party widgets will remain active for the duration of the session. To disable support:

In [None]:
from google.colab import output
output.disable_custom_widget_manager()