In [38]:
import pyspark
print(pyspark.__version__)

3.5.5


In [1]:
import pandas as pd
import json
import requests
import numpy as np
import os

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data Cleaning App") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.memory", '4g') \
    .config("spark.executor.memory", '4g') \
    .getOrCreate()


In [6]:
spark

In [4]:
spark.stop()

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [9]:
# reading candidate master
cand_master_schema = StructType([
            StructField("CAND_ID", StringType(), False),           # Not nullable
            StructField("CAND_NAME", StringType(), True),
            StructField("CAND_PTY_AFFILIATION", StringType(), True),
            StructField("CAND_ELECTION_YR", IntegerType(), True),
            StructField("CAND_OFFICE_ST", StringType(), True),
            StructField("CAND_OFFICE", StringType(), True),
            StructField("CAND_OFFICE_DISTRICT", StringType(), True),
            StructField("CAND_ICI", StringType(), True),
            StructField("CAND_STATUS", StringType(), True),
            StructField("CAND_PCC", StringType(), True),
            StructField("CAND_ST1", StringType(), True),
            StructField("CAND_ST2", StringType(), True),
            StructField("CAND_CITY", StringType(), True),
            StructField("CAND_ST", StringType(), True),
            StructField("CAND_ZIP", StringType(), True)
                ])

candidate_master = spark.read.format('csv')\
                .option('header','true')\
                .option("delimiter","|")\
                .schema(cand_master_schema)\
                .load("D:/Sohail_DE_Project/fec_env/big-data-fec-project/data/raw/candidate_master/cn.txt")

In [48]:
# read committe master

comm_master_schema = StructType([
    StructField("CMTE_ID", StringType(), False),                # Not nullable
    StructField("CMTE_NM", StringType(), True),
    StructField("TRES_NM", StringType(), True),
    StructField("CMTE_ST1", StringType(), True),
    StructField("CMTE_ST2", StringType(), True),
    StructField("CMTE_CITY", StringType(), True),
    StructField("CMTE_ST", StringType(), True),
    StructField("CMTE_ZIP", StringType(), True),
    StructField("CMTE_DSGN", StringType(), True),
    StructField("CMTE_TP", StringType(), True),
    StructField("CMTE_PTY_AFFILIATION", StringType(), True),
    StructField("CMTE_FILING_FREQ", StringType(), True),
    StructField("ORG_TP", StringType(), True),
    StructField("CONNECTED_ORG_NM", StringType(), True),
    StructField("CAND_ID", StringType(), True)
])

committee_master = spark.read.format('csv')\
                .option('header','true')\
                .option("delimiter","|")\
                .schema(comm_master_schema)\
                .load("D:/Sohail_DE_Project/fec_env/big-data-fec-project/data/raw/committe_master/cm.txt")


In [49]:
committee_master.show(5)

+---------+--------------------+--------------------+--------------------+---------+-------------+-------+---------+---------+-------+--------------------+----------------+------+--------------------+-------+
|  CMTE_ID|             CMTE_NM|             TRES_NM|            CMTE_ST1| CMTE_ST2|    CMTE_CITY|CMTE_ST| CMTE_ZIP|CMTE_DSGN|CMTE_TP|CMTE_PTY_AFFILIATION|CMTE_FILING_FREQ|ORG_TP|    CONNECTED_ORG_NM|CAND_ID|
+---------+--------------------+--------------------+--------------------+---------+-------------+-------+---------+---------+-------+--------------------+----------------+------+--------------------+-------+
|C00000059|  HALLMARK CARDS PAC|           SARAH MOE|          2501 MCGEE|  MD #500|  KANSAS CITY|     MO|    64108|        U|      Q|                 UNK|               M|     C|                NULL|   NULL|
|C00000422|AMERICAN MEDICAL ...|   WALKER, KEVIN MR.|25 MASSACHUSETTS ...|SUITE 600|   WASHINGTON|     DC|200017400|        B|      Q|                NULL|         

In [12]:
# reading committee transaction
comm_trans_schema = StructType([
    StructField("CMTE_ID", StringType(), False),               # Not nullable
    StructField("AMNDT_IND", StringType(), True),
    StructField("RPT_TP", StringType(), True),
    StructField("TRANSACTION_PGI", StringType(), True),
    StructField("IMAGE_NUM", StringType(), True),              # VARCHAR2(11) or (18) → String
    StructField("TRANSACTION_TP", StringType(), True),
    StructField("ENTITY_TP", StringType(), True),
    StructField("NAME", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("STATE", StringType(), True),
    StructField("ZIP_CODE", StringType(), True),
    StructField("EMPLOYER", StringType(), True),
    StructField("OCCUPATION", StringType(), True),
    StructField("TRANSACTION_DT", DateType(), True),           # MMDDYYYY - will need date parsing
    StructField("TRANSACTION_AMT", DoubleType(), True),        # NUMBER(14,2)
    StructField("OTHER_ID", StringType(), True),
    StructField("TRAN_ID", StringType(), True),
    StructField("FILE_NUM", LongType(), True),                 # NUMBER(22) → Long
    StructField("MEMO_CD", StringType(), True),
    StructField("MEMO_TEXT", StringType(), True),
    StructField("SUB_ID", LongType(), False)                   # Not nullable
])

committee_trans = spark.read.format('csv')\
                .option('header','true')\
                .option("delimiter","|")\
                .schema(comm_trans_schema)\
                .load("D:/Sohail_DE_Project/fec_env/big-data-fec-project/data/raw/committee_to_committee_transaction/itoth.txt")

In [13]:
committee_trans.show(5)

+---------+---------+------+---------------+------------------+--------------+---------+--------------------+----------+-----+---------+--------------------+----------+--------------+---------------+--------+--------------------+--------+-------+--------------------+-------------------+
|  CMTE_ID|AMNDT_IND|RPT_TP|TRANSACTION_PGI|         IMAGE_NUM|TRANSACTION_TP|ENTITY_TP|                NAME|      CITY|STATE| ZIP_CODE|            EMPLOYER|OCCUPATION|TRANSACTION_DT|TRANSACTION_AMT|OTHER_ID|             TRAN_ID|FILE_NUM|MEMO_CD|           MEMO_TEXT|             SUB_ID|
+---------+---------+------+---------------+------------------+--------------+---------+--------------------+----------+-----+---------+--------------------+----------+--------------+---------------+--------+--------------------+--------+-------+--------------------+-------------------+
|C00161810|        A|    M3|              P|202004209219753657|           10J|      ORG|THE CHICKASAW NATION|       ADA|   OK|748209255|

In [14]:
# reading operating expense
oper_exp_schema = StructType([
    StructField("CMTE_ID", StringType(), False),               # VARCHAR2(9)
    StructField("AMNDT_IND", StringType(), True),             # VARCHAR2(1)
    StructField("RPT_YR", IntegerType(), True),               # Number(4)
    StructField("RPT_TP", StringType(), True),                # VARCHAR2(3)
    StructField("IMAGE_NUM", StringType(), True),             # VARCHAR2(11/18)
    StructField("LINE_NUM", StringType(), True),
    StructField("FORM_TP_CD", StringType(), True),
    StructField("SCHED_TP_CD", StringType(), True),
    StructField("NAME", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("STATE", StringType(), True),
    StructField("ZIP_CODE", StringType(), True),
    StructField("TRANSACTION_DT", DateType(), True),          # DATE
    StructField("TRANSACTION_AMT", DoubleType(), True),       # NUMBER(14,2)
    StructField("TRANSACTION_PGI", StringType(), True),       # VARCHAR2(5)
    StructField("PURPOSE", StringType(), True),               # VARCHAR2(100)
    StructField("CATEGORY", StringType(), True),              # VARCHAR2(3)
    StructField("CATEGORY_DESC", StringType(), True),         # VARCHAR2(40)
    StructField("MEMO_CD", StringType(), True),               # VARCHAR2(1)
    StructField("MEMO_TEXT", StringType(), True),             # VARCHAR2(100)
    StructField("ENTITY_TP", StringType(), True),             # VARCHAR2(3)    
    StructField("SUB_ID", LongType(), False),                  # NUMBER(19)
    StructField("FILE_NUM", IntegerType(), True),             # NUMBER(7)
    StructField("TRAN_ID", StringType(), True),               # VARCHAR2(32)
    StructField("BACK_REF_TRAN_ID", StringType(), True)       # VARCHAR2(32)
])

operating_exp = spark.read.format('csv')\
                .option('header','true')\
                .option("delimiter","|")\
                .schema(oper_exp_schema)\
                .load("D:/Sohail_DE_Project/fec_env/big-data-fec-project/data/raw/operating_expense/oppexp.txt")

In [15]:
operating_exp.show(5)

+---------+---------+------+------+------------------+--------+----------+-----------+--------------------+-------------+-----+---------+--------------+---------------+---------------+--------------------+--------+--------------------+-------+---------+---------+-------------------+--------+-----------+----------------+
|  CMTE_ID|AMNDT_IND|RPT_YR|RPT_TP|         IMAGE_NUM|LINE_NUM|FORM_TP_CD|SCHED_TP_CD|                NAME|         CITY|STATE| ZIP_CODE|TRANSACTION_DT|TRANSACTION_AMT|TRANSACTION_PGI|             PURPOSE|CATEGORY|       CATEGORY_DESC|MEMO_CD|MEMO_TEXT|ENTITY_TP|             SUB_ID|FILE_NUM|    TRAN_ID|BACK_REF_TRAN_ID|
+---------+---------+------+------+------------------+--------+----------+-----------+--------------------+-------------+-----+---------+--------------+---------------+---------------+--------------------+--------+--------------------+-------+---------+---------+-------------------+--------+-----------+----------------+
|C00639872|        T|  2019|   TER

In [16]:
# reading candidate committee linkage
cand_cmte_link_schema = StructType([
    StructField("CAND_ID",StringType(),False),
    StructField("CAND_ELECTION_YR",IntegerType(),False),
    StructField("FEC_ELECTION_YR",IntegerType(),False),
    StructField("CMTE_ID",StringType(),True),
    StructField("CMTE_TP",StringType(),True),
    StructField("CMTE_DSGN",StringType(),True),
    StructField("LINKAGE_ID",IntegerType(),False)

])


cand_cmte_link = spark.read.format('csv')\
                .option('header','true')\
                .option("delimiter","|")\
                .schema(cand_cmte_link_schema)\
                .load("D:/Sohail_DE_Project/fec_env/big-data-fec-project/data/raw/candidate_committtee_linkage/ccl.txt")

In [17]:
cand_cmte_link.show(5)

+---------+----------------+---------------+---------+-------+---------+----------+
|  CAND_ID|CAND_ELECTION_YR|FEC_ELECTION_YR|  CMTE_ID|CMTE_TP|CMTE_DSGN|LINKAGE_ID|
+---------+----------------+---------------+---------+-------+---------+----------+
|C00713602|            2019|           2020|C00712851|      O|        U|    228963|
|H0AK00105|            2020|           2020|C00607515|      H|        P|    229250|
|H0AL01055|            2020|           2020|C00697789|      H|        P|    226125|
|H0AL01063|            2020|           2020|C00701557|      H|        P|    227053|
|H0AL01071|            2020|           2020|C00701409|      H|        P|    227054|
+---------+----------------+---------------+---------+-------+---------+----------+
only showing top 5 rows



In [18]:
pac_summary_schema = StructType([
    StructField("CMTE_ID", StringType(), False),                          # Not nullable
    StructField("CMTE_NM", StringType(), True),
    StructField("CMTE_TP", StringType(), True),
    StructField("CMTE_DSGN", StringType(), True),
    StructField("CMTE_FILING_FREQ", StringType(), True),
    StructField("TTL_RECEIPTS", DoubleType(), True),
    StructField("TRANS_FROM_AFF", DoubleType(), True),
    StructField("INDV_CONTRIB", DoubleType(), True),
    StructField("OTHER_POL_CMTE_CONTRIB", DoubleType(), True),
    StructField("CAND_CONTRIB", DoubleType(), True),
    StructField("CAND_LOANS", DoubleType(), True),
    StructField("TTL_LOANS_RECEIVED", DoubleType(), True),
    StructField("TTL_DISB", DoubleType(), True),
    StructField("TRANF_TO_AFF", DoubleType(), True),
    StructField("INDV_REFUNDS", DoubleType(), True),
    StructField("OTHER_POL_CMTE_REFUNDS", DoubleType(), True),
    StructField("CAND_LOAN_REPAY", DoubleType(), True),
    StructField("LOAN_REPAY", DoubleType(), True),
    StructField("COH_BOP", DoubleType(), True),
    StructField("COH_COP", DoubleType(), True),
    StructField("DEBTS_OWED_BY", DoubleType(), True),
    StructField("NONFED_TRANS_RECEIVED", DoubleType(), True),
    StructField("CONTRIB_TO_OTHER_CMTE", DoubleType(), True),
    StructField("IND_EXP", DoubleType(), True),
    StructField("PTY_COORD_EXP", DoubleType(), True),
    StructField("NONFED_SHARE_EXP", DoubleType(), True),
    StructField("CVG_END_DT", StringType(), True)                           # Needs conversion from MM/DD/YYYY
])

pac_summary= spark.read.format('csv')\
                .option('header','true')\
                .option("delimiter","|")\
                .schema(pac_summary_schema)\
                .load("D:/Sohail_DE_Project/fec_env/big-data-fec-project/data/raw/pac_summary/webk20.txt")

In [19]:
from pyspark.sql.functions import to_date

pac_summary = pac_summary.withColumn("CVG_END_DT", to_date("CVG_END_DT", "MM/dd/yyyy"))

In [20]:
pac_summary.show(5)

+---------+--------------------+-------+---------+----------------+------------+--------------+------------+----------------------+------------+----------+------------------+----------+------------+------------+----------------------+---------------+----------+----------+----------+-------------+---------------------+---------------------+--------+-------------+----------------+----------+
|  CMTE_ID|             CMTE_NM|CMTE_TP|CMTE_DSGN|CMTE_FILING_FREQ|TTL_RECEIPTS|TRANS_FROM_AFF|INDV_CONTRIB|OTHER_POL_CMTE_CONTRIB|CAND_CONTRIB|CAND_LOANS|TTL_LOANS_RECEIVED|  TTL_DISB|TRANF_TO_AFF|INDV_REFUNDS|OTHER_POL_CMTE_REFUNDS|CAND_LOAN_REPAY|LOAN_REPAY|   COH_BOP|   COH_COP|DEBTS_OWED_BY|NONFED_TRANS_RECEIVED|CONTRIB_TO_OTHER_CMTE| IND_EXP|PTY_COORD_EXP|NONFED_SHARE_EXP|CVG_END_DT|
+---------+--------------------+-------+---------+----------------+------------+--------------+------------+----------------------+------------+----------+------------------+----------+------------+------------+---

In [21]:
# reading individual contributor
ind_contr_schema =StructType([
    StructField("CMTE_ID", StringType(), False),             # Not nullable
    StructField("AMNDT_IND", StringType(), True),
    StructField("RPT_TP", StringType(), True),
    StructField("TRANSACTION_PGI", StringType(), True),
    StructField("IMAGE_NUM", StringType(), True),            # Covers VARCHAR2(11) or (18)
    StructField("TRANSACTION_TP", StringType(), True),
    StructField("ENTITY_TP", StringType(), True),
    StructField("NAME", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("STATE", StringType(), True),
    StructField("ZIP_CODE", StringType(), True),
    StructField("EMPLOYER", StringType(), True),
    StructField("OCCUPATION", StringType(), True),
    StructField("TRANSACTION_DT", StringType(), True),         # Will need parsing if in string format
    StructField("TRANSACTION_AMT", DoubleType(), True),
    StructField("OTHER_ID", StringType(), True),
    StructField("TRAN_ID", StringType(), True),
    StructField("FILE_NUM", LongType(), True),
    StructField("MEMO_CD", StringType(), True),
    StructField("MEMO_TEXT", StringType(), True),
    StructField("SUB_ID", LongType(), False)                 # Not nullable
])

import pandas as pd
headers = [
    "CMTE_ID",
    "AMNDT_IND",
    "RPT_TP",
    "TRANSACTION_PGI",
    "IMAGE_NUM",
    "TRANSACTION_TP",
    "ENTITY_TP",
    "NAME",
    "CITY",
    "STATE",
    "ZIP_CODE",
    "EMPLOYER",
    "OCCUPATION",
    "TRANSACTION_DT",
    "TRANSACTION_AMT",
    "OTHER_ID",
    "TRAN_ID",
    "FILE_NUM",
    "MEMO_CD",
    "MEMO_TEXT",
    "SUB_ID"
]


from pyspark.sql.types import StructType  # Make sure you've already defined ind_contr_schema

# Read 10% sample of the data
individual_contrib = spark.read.format("csv") \
    .option("header", "false") \
    .option("delimiter", "|") \
    .schema(ind_contr_schema) \
    .load("D:/Sohail_DE_Project/fec_env/big-data-fec-project/data/raw/individual_contribution/itcont.txt") \
    .sample(withReplacement=False, fraction=0.1, seed=42)


In [22]:
from pyspark.sql.functions import to_date

individual_contrib = individual_contrib.withColumn("TRANSACTION_DT", to_date("TRANSACTION_DT", "MM/dd/yyyy"))

In [23]:
individual_contrib.rdd.getNumPartitions()

103

In [24]:
individual_contrib.show(5)

+---------+---------+------+---------------+------------------+--------------+---------+---------------+------------+-----+--------+------------------+----------+--------------+---------------+--------+-------------+--------+-------+---------+-------------------+
|  CMTE_ID|AMNDT_IND|RPT_TP|TRANSACTION_PGI|         IMAGE_NUM|TRANSACTION_TP|ENTITY_TP|           NAME|        CITY|STATE|ZIP_CODE|          EMPLOYER|OCCUPATION|TRANSACTION_DT|TRANSACTION_AMT|OTHER_ID|      TRAN_ID|FILE_NUM|MEMO_CD|MEMO_TEXT|             SUB_ID|
+---------+---------+------+---------------+------------------+--------------+---------+---------------+------------+-----+--------+------------------+----------+--------------+---------------+--------+-------------+--------+-------+---------+-------------------+
|C00618371|        A|    Q3|              P|202102099427331574|           15E|      IND| FLANNER, GREGG|     TRENTON|   NJ|   08601|FLANNER ASSOCIATES|   MFG REP|          NULL|           25.0|    NULL|SA11AI

In [25]:
individual_contrib = individual_contrib.withColumn("TRANSACTION_DT", to_date("TRANSACTION_DT", "MMddyyyy"))

In [26]:
individual_contrib_cache = individual_contrib.cache()

In [27]:
individual_contrib_cache.count()

6938128

## 1. Candidate Funding Analysis

### 1. Compare funding source across candidate

In [28]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, LongType, IntegerType

In [29]:
# Compare funding sources across candidates
funding_sources = individual_contrib_cache.join(cand_cmte_link, "CMTE_ID") \
    .join(candidate_master, "CAND_ID") \
    .groupBy("CAND_NAME", "CMTE_TP") \
    .agg(sum("TRANSACTION_AMT").alias("TOTAL_RAISED")) \
    .orderBy(desc("TOTAL_RAISED"))

In [30]:
funding_sources.show(5)

+--------------------+-------+------------+
|           CAND_NAME|CMTE_TP|TOTAL_RAISED|
+--------------------+-------+------------+
|  BIDEN, JOSEPH R JR|      P| 5.2297193E7|
|    TRUMP, DONALD J.|      P|  1.761625E7|
|       SULLIVAN, DAN|      Y| 1.6793788E7|
|BLOOMBERG, MICHAE...|      P| 1.6197367E7|
|    SANDERS, BERNARD|      P|   8818298.0|
+--------------------+-------+------------+
only showing top 5 rows



### 2. Analyze the geographic distribution of support

In [31]:
# Geographic distribution of support
geo_distribution = individual_contrib_cache.join(cand_cmte_link, "CMTE_ID") \
    .join(candidate_master, "CAND_ID") \
    .groupBy("CAND_NAME", "STATE") \
    .agg(sum("TRANSACTION_AMT").alias("STATE_TOTAL")) \
    .orderBy("CAND_NAME", desc("STATE_TOTAL"))



In [32]:
geo_distribution.show(5)

+--------------------+-----+-----------+
|           CAND_NAME|STATE|STATE_TOTAL|
+--------------------+-----+-----------+
| WEINSTOCK, MICHA...|   NY|     5600.0|
| WEINSTOCK, MICHA...|   IL|      518.0|
| WEINSTOCK, MICHA...|   NJ|      250.0|
| WEINSTOCK, MICHA...|   SD|      250.0|
| WEINSTOCK, MICHA...|   GA|      250.0|
+--------------------+-----+-----------+
only showing top 5 rows



### 3. Track Funding momentum over time

In [33]:
# Funding momentum over time (monthly)
funding_trends = individual_contrib_cache.withColumn("MONTH", date_format(to_date("TRANSACTION_DT", "MMddyyyy"), "yyyy-MM")) \
    .join(cand_cmte_link, "CMTE_ID") \
    .join(candidate_master, "CAND_ID") \
    .groupBy("MONTH", "CAND_NAME") \
    .agg(sum("TRANSACTION_AMT").alias("MONTHLY_TOTAL")) \
    .orderBy("MONTH", desc("MONTHLY_TOTAL"))

In [34]:
funding_trends.show(5)

+-----+--------------------+-------------+
|MONTH|           CAND_NAME|MONTHLY_TOTAL|
+-----+--------------------+-------------+
| NULL|  BIDEN, JOSEPH R JR|  5.2297193E7|
| NULL|    TRUMP, DONALD J.|   1.761625E7|
| NULL|       SULLIVAN, DAN|  1.7356328E7|
| NULL|BLOOMBERG, MICHAE...|  1.6197367E7|
| NULL|    SANDERS, BERNARD|    9046307.0|
+-----+--------------------+-------------+
only showing top 5 rows



## 2. Donor Demographic Analysis

In [35]:
# Contribution patterns by occupation/employer
donor_analysis = individual_contrib_cache.groupBy("OCCUPATION", "EMPLOYER") \
    .agg(
        sum("TRANSACTION_AMT").alias("TOTAL_CONTRIBUTIONS"),
        count("*").alias("NUMBER_OF_CONTRIBUTIONS"),
        avg("TRANSACTION_AMT").alias("AVG_CONTRIBUTION")
    ) \
    .orderBy(desc("TOTAL_CONTRIBUTIONS"))

In [36]:
donor_analysis.show(5)

+------------+-------------------+-------------------+-----------------------+------------------+
|  OCCUPATION|           EMPLOYER|TOTAL_CONTRIBUTIONS|NUMBER_OF_CONTRIBUTIONS|  AVG_CONTRIBUTION|
+------------+-------------------+-------------------+-----------------------+------------------+
|        NULL|               NULL|       2.89435207E8|                 312696| 925.6121184792897|
|NOT EMPLOYED|       NOT EMPLOYED|       1.36888359E8|                1658364| 82.54421767476863|
|     RETIRED|            RETIRED|       1.33309015E8|                 805566|165.48490750602681|
|   PHYSICIAN|ADELSON DRUG CLINIC|          3.50084E7|                      7|         5001200.0|
|     FOUNDER|     BLOOMBERG INC.|        3.4670869E7|                    218|159040.68348623853|
+------------+-------------------+-------------------+-----------------------+------------------+
only showing top 5 rows



In [37]:
# Large vs small donors analysis
donor_size_analysis = individual_contrib_cache.withColumn("DONOR_SIZE", 
    when(col("TRANSACTION_AMT") >= 2000, "LARGE")
    .otherwise("SMALL")) \
    .groupBy("DONOR_SIZE") \
    .agg(
        sum("TRANSACTION_AMT").alias("TOTAL_AMOUNT"),
        count("*").alias("COUNT"),
        (sum("TRANSACTION_AMT")/count("*")).alias("AVG_AMOUNT")
    )


In [38]:
donor_size_analysis.show(5)

+----------+------------+-------+------------------+
|DONOR_SIZE|TOTAL_AMOUNT|  COUNT|        AVG_AMOUNT|
+----------+------------+-------+------------------+
|     SMALL|  5.895497E8|6848724| 86.08168470506331|
|     LARGE|9.22163594E8|  89404|10314.567513757775|
+----------+------------+-------+------------------+



In [39]:
# Repeat donors analysis
repeat_donors = individual_contrib_cache.groupBy("NAME", "CITY", "STATE", "ZIP_CODE") \
    .agg(count("*").alias("CONTRIBUTION_COUNT")) \
    .groupBy("CONTRIBUTION_COUNT") \
    .agg(count("*").alias("DONOR_COUNT")) \
    .orderBy("CONTRIBUTION_COUNT")

In [40]:
repeat_donors.show(5)

+------------------+-----------+
|CONTRIBUTION_COUNT|DONOR_COUNT|
+------------------+-----------+
|                 1|    1646038|
|                 2|     518404|
|                 3|     231247|
|                 4|     122230|
|                 5|      72040|
+------------------+-----------+
only showing top 5 rows



## 3. Expenditure Analysis

In [41]:
# Categorize spending by purpose
spending_categories = operating_exp.groupBy("PURPOSE") \
    .agg(sum("TRANSACTION_AMT").alias("TOTAL_SPENT")) \
    .orderBy(desc("TOTAL_SPENT"))

In [42]:
spending_categories.show(5)

+-------------------+--------------------+
|            PURPOSE|         TOTAL_SPENT|
+-------------------+--------------------+
|          MEDIA BUY|1.3232088511499999E9|
|DIGITAL ADVERTISING| 7.658470001700001E8|
|   T.V. ADVERTISING|        4.50370127E8|
|            PAYROLL|4.2170076009000075E8|
|       PLACED MEDIA| 3.625128648999999E8|
+-------------------+--------------------+
only showing top 5 rows



In [43]:
# Media vs ground operations
media_vs_ground = operating_exp.withColumn("EXPENDITURE_TYPE",
    when(col("PURPOSE").contains("MEDIA") | 
         col("PURPOSE").contains("AD"), "MEDIA")
    .when(col("PURPOSE").contains("TRAVEL") |
          col("PURPOSE").contains("EVENT"), "GROUND_OPS")
    .otherwise("OTHER")) \
    .groupBy("EXPENDITURE_TYPE") \
    .agg(sum("TRANSACTION_AMT").alias("TOTAL_SPENT"))


In [44]:
media_vs_ground.show(5)

+----------------+--------------------+
|EXPENDITURE_TYPE|         TOTAL_SPENT|
+----------------+--------------------+
|           MEDIA| 5.530972661310006E9|
|      GROUND_OPS|3.0263701186000365E8|
|           OTHER|   6.5644278665901E9|
+----------------+--------------------+



In [45]:
# Spending efficiency by candidate
spending_efficiency = operating_exp.join(cand_cmte_link, "CMTE_ID") \
    .join(candidate_master, "CAND_ID") \
    .groupBy("CAND_NAME") \
    .agg(
        sum("TRANSACTION_AMT").alias("TOTAL_SPENT"),
        countDistinct("PURPOSE").alias("SPENDING_CATEGORIES")
    ) \
    .orderBy(desc("TOTAL_SPENT"))

In [46]:
spending_efficiency.show(5)

+--------------------+--------------------+-------------------+
|           CAND_NAME|         TOTAL_SPENT|SPENDING_CATEGORIES|
+--------------------+--------------------+-------------------+
|BLOOMBERG, MICHAE...|1.1877063575199995E9|                209|
|  BIDEN, JOSEPH R JR|1.1139123115000002E9|                410|
|    TRUMP, DONALD J.| 7.209492362300001E8|               3284|
|         STEYER, TOM| 3.653411600899999E8|               1075|
|    SANDERS, BERNARD|2.6228000153999993E8|                255|
+--------------------+--------------------+-------------------+
only showing top 5 rows



## 4. Committee Network Analysis

In [50]:
# Money flow through political ecosystem
money_flow = committee_trans.join(committee_master, "CMTE_ID") \
    .groupBy("CMTE_NM", "CMTE_TP", "OTHER_ID") \
    .agg(sum("TRANSACTION_AMT").alias("TOTAL_TRANSFERRED")) \
    .orderBy(desc("TOTAL_TRANSFERRED"))

In [51]:
money_flow.show(5)

+--------------------+-------+---------+-----------------+
|             CMTE_NM|CMTE_TP| OTHER_ID|TOTAL_TRANSFERRED|
+--------------------+-------+---------+-----------------+
|REPUBLICAN NATION...|      Y|     NULL|     3.47517739E8|
| BIDEN FOR PRESIDENT|      P|     NULL|     2.96371987E8|
| BIDEN FOR PRESIDENT|      P|C00744946|          2.375E8|
|  BIDEN VICTORY FUND|      N|C00703975|          2.375E8|
|TRUMP MAKE AMERIC...|      N|C00580100|     2.33727208E8|
+--------------------+-------+---------+-----------------+
only showing top 5 rows



In [52]:
# Key fundraising hubs
fundraising_hubs = individual_contrib_cache.groupBy("CMTE_ID") \
    .agg(sum("TRANSACTION_AMT").alias("TOTAL_RAISED")) \
    .join(committee_master, "CMTE_ID") \
    .orderBy(desc("TOTAL_RAISED"))

In [53]:
fundraising_hubs.show(5)

+---------+------------+--------------------+-------------------+--------------------+--------+------------+-------+--------+---------+-------+--------------------+----------------+------+------------------+---------+
|  CMTE_ID|TOTAL_RAISED|             CMTE_NM|            TRES_NM|            CMTE_ST1|CMTE_ST2|   CMTE_CITY|CMTE_ST|CMTE_ZIP|CMTE_DSGN|CMTE_TP|CMTE_PTY_AFFILIATION|CMTE_FILING_FREQ|ORG_TP|  CONNECTED_ORG_NM|  CAND_ID|
+---------+------------+--------------------+-------------------+--------------------+--------+------------+-------+--------+---------+-------+--------------------+----------------+------+------------------+---------+
|C00401224|2.08328964E8|             ACTBLUE|         HILL, ERIN|     P.O. BOX 441146|    NULL|  SOMERVILLE|     MA|   02144|        U|      V|                NULL|               M|  NULL|              NULL|     NULL|
|C00694323| 9.0084281E7|              WINRED|OTTENHOFF, BENJAMIN|         PO BOX 9891|    NULL|   ARLINGTON|     VA|   22219|   

In [55]:
# Party committee influence
party_influence = (
    committee_trans
    .join(committee_master.select("CMTE_ID", "CMTE_TP", "CMTE_NM"), "CMTE_ID")
    .filter(col("CMTE_TP").contains("PARTY"))
    .groupBy("CMTE_ID", "CMTE_NM", "CMTE_TP")
    .agg(sum("TRANSACTION_AMT").alias("TOTAL_INFLUENCE"))
    .orderBy(col("TOTAL_INFLUENCE").desc())
    .select(
        "CMTE_ID",
        "CMTE_NM",
        "CMTE_TP",
        "TOTAL_INFLUENCE"
    )
)

In [56]:
party_influence.show(5)

+-------+-------+-------+---------------+
|CMTE_ID|CMTE_NM|CMTE_TP|TOTAL_INFLUENCE|
+-------+-------+-------+---------------+
+-------+-------+-------+---------------+



## 5. Temporal Trends

In [57]:
# Election cycle patterns
election_cycle = individual_contrib_cache.withColumn("MONTH", 
    date_format(to_date("TRANSACTION_DT", "MMddyyyy"), "yyyy-MM")) \
    .groupBy("MONTH") \
    .agg(sum("TRANSACTION_AMT").alias("MONTHLY_TOTAL")) \
    .orderBy("MONTH")


In [58]:
election_cycle.show(5)

+-----+-------------+
|MONTH|MONTHLY_TOTAL|
+-----+-------------+
| NULL|1.511713294E9|
+-----+-------------+



In [59]:
# Event response analysis (example)
event_response = individual_contrib_cache.filter(
    (to_date(col("TRANSACTION_DT"), "MMddyyyy") >= "2023-10-01") & 
    (to_date(col("TRANSACTION_DT"), "MMddyyyy") <= "2023-10-15")) \
    .join(cand_cmte_link, "CMTE_ID") \
    .join(candidate_master, "CAND_ID") \
    .groupBy("CAND_NAME") \
    .agg(sum("TRANSACTION_AMT").alias("POST_EVENT_FUNDING")) \
    .orderBy(desc("POST_EVENT_FUNDING"))

In [60]:
event_response.show(5)

+---------+------------------+
|CAND_NAME|POST_EVENT_FUNDING|
+---------+------------------+
+---------+------------------+



In [61]:
# Final push strategies (last 30 days)
final_push = individual_contrib_cache.filter(
    to_date(col("TRANSACTION_DT"), "MMddyyyy") >= date_sub(current_date(), 30)) \
    .join(cand_cmte_link, "CMTE_ID") \
    .join(candidate_master, "CAND_ID") \
    .groupBy("CAND_NAME") \
    .agg(sum("TRANSACTION_AMT").alias("FINAL_PUSH_TOTAL")) \
    .orderBy(desc("FINAL_PUSH_TOTAL"))

In [62]:
final_push.show(5)

+---------+----------------+
|CAND_NAME|FINAL_PUSH_TOTAL|
+---------+----------------+
+---------+----------------+

