In [225]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, IntegerType

In [6]:
from pyspark.sql import SparkSession

jdbc_jar_path = "/usr/share/java/mysql-connector-j-8.0.32.jar"

spark = (
    SparkSession.builder
    .appName("US Campaign Data Cleaning ")
    .config("spark.jars", jdbc_jar_path)

    # Memory and execution config
    .config("spark.executor.memory", "6g")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.cores", "3")
    .config("spark.driver.cores", "2")

    # Shuffle and parallelism tuning
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")

    # JDBC and large job tuning
    .config("spark.network.timeout", "300s")
    .config("spark.sql.broadcastTimeout", "300")

    # Arrow optimization for faster Pandas conversion
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")

    .getOrCreate()
)


In [7]:
spark

## Common - utils

In [164]:
import requests
from bs4 import BeautifulSoup
import pandas as pd

def extract_table_data(url, table_class=None):
    try:
        response = requests.get(url)
        response.raise_for_status()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching the page: {e}")
        return None

    soup = BeautifulSoup(response.content, 'html.parser')
    table = soup.find('table', class_=table_class) if table_class else soup.find('table')
    if not table:
        print("Table not found on the page.")
        return None

    header_row = table.find('thead').find('tr') if table.find('thead') else table.find('tr')
    headers = [th.text.strip() for th in header_row.find_all(['th', 'td'])]

    rows = []
    for row in table.find_all('tr')[1:]:
        cols = row.find_all(['td', 'th'])
        if cols:
            rows.append([col.text.strip() for col in cols])

    return headers, rows


In [165]:
## Check Null-like values include: 'None', 'NULL', empty string '', actual nulls, and NaN
def get_null_like_counts(df):
    null_like_counts = df.select([
        F.count(
            F.when(
                F.col(c).contains('None') |
                F.col(c).contains('NULL') |
                (F.col(c) == '') |
                F.col(c).isNull() |
                F.isnan(c),
                c
            )
        ).alias(c) for c in df.columns
    ])
    return null_like_counts

In [166]:
url = "https://www.fec.gov/campaign-finance-data/party-code-descriptions/"

headers, data = extract_table_data(url)

df = pd.DataFrame(data, columns=headers)

if 'Notes' in df.columns:
    df = df.drop('Notes', axis=1)

party_code_description_dict = dict(zip(df['Party code'], df['Party code description']))

party_code_description_dict['None'] = 'Unknown'

---

## Cleaning and Enriching All Candidates Dataset

In [167]:
gcs_input_path = "bronze-layer"
gcs_output_path = "silver-layer"
gcs_bucket_path = "gs://us-campaign-finance-fec-2019-2020/"

In [168]:
all_candidates_schema = StructType([
    StructField('CAND_ID', StringType(), False),
    StructField('CAND_NAME', StringType(), True),
    StructField('CAND_ICI', StringType(), True),
    StructField('PTY_CD', StringType(), True),
    StructField('CAND_PTY_AFFILIATION', StringType(), True),
    StructField('TTL_RECEIPTS', DoubleType(), True),
    StructField('TRANS_FROM_AUTH', DoubleType(), True),
    StructField('TTL_DISB', DoubleType(), True),
    StructField('TRANS_TO_AUTH', DoubleType(), True),
    StructField('COH_BOP', DoubleType(), True),
    StructField('COH_COP', DoubleType(), True),
    StructField('CAND_CONTRIB', DoubleType(), True),
    StructField('CAND_LOANS', DoubleType(), True),
    StructField('OTHER_LOANS', DoubleType(), True),
    StructField('CAND_LOAN_REPAY', DoubleType(), True),
    StructField('OTHER_LOAN_REPAY', DoubleType(), True),
    StructField('DEBTS_OWED_BY', DoubleType(), True),
    StructField('TTL_INDIV_CONTRIB', DoubleType(), True),
    StructField('CAND_OFFICE_ST', StringType(), True),
    StructField('CAND_OFFICE_DISTRICT', StringType(), True),
    StructField('SPEC_ELECTION', StringType(), True),
    StructField('PRIM_ELECTION', StringType(), True),
    StructField('RUN_ELECTION', StringType(), True),
    StructField('GEN_ELECTION', StringType(), True),
    StructField('GEN_ELECTION_PRECENT', DoubleType(), True),
    StructField('OTHER_POL_CMTE_CONTRIB', DoubleType(), True),
    StructField('POL_PTY_CONTRIB', DoubleType(), True),
    StructField('CVG_END_DT', StringType(), True),
    StructField('INDIV_REFUNDS', DoubleType(), True),
    StructField('CMTE_REFUNDS', DoubleType(), True)
])

In [169]:
all_candidates_file_path = f"{gcs_bucket_path}/{gcs_input_path}/all-candidates/weball20.txt"

all_candidates_df = spark.read.csv(
    all_candidates_file_path,
    sep='|',
    header=None, 
    schema=all_candidates_schema
)

In [170]:
all_candidates_df.printSchema()

root
 |-- CAND_ID: string (nullable = true)
 |-- CAND_NAME: string (nullable = true)
 |-- CAND_ICI: string (nullable = true)
 |-- PTY_CD: string (nullable = true)
 |-- CAND_PTY_AFFILIATION: string (nullable = true)
 |-- TTL_RECEIPTS: double (nullable = true)
 |-- TRANS_FROM_AUTH: double (nullable = true)
 |-- TTL_DISB: double (nullable = true)
 |-- TRANS_TO_AUTH: double (nullable = true)
 |-- COH_BOP: double (nullable = true)
 |-- COH_COP: double (nullable = true)
 |-- CAND_CONTRIB: double (nullable = true)
 |-- CAND_LOANS: double (nullable = true)
 |-- OTHER_LOANS: double (nullable = true)
 |-- CAND_LOAN_REPAY: double (nullable = true)
 |-- OTHER_LOAN_REPAY: double (nullable = true)
 |-- DEBTS_OWED_BY: double (nullable = true)
 |-- TTL_INDIV_CONTRIB: double (nullable = true)
 |-- CAND_OFFICE_ST: string (nullable = true)
 |-- CAND_OFFICE_DISTRICT: string (nullable = true)
 |-- SPEC_ELECTION: string (nullable = true)
 |-- PRIM_ELECTION: string (nullable = true)
 |-- RUN_ELECTION: string

In [171]:
all_candidates_df.show(5)

[Stage 258:>                                                        (0 + 1) / 1]

+---------+-------------------+--------+------+--------------------+------------+---------------+----------+-------------+---------+---------+------------+----------+-----------+---------------+----------------+-------------+-----------------+--------------+--------------------+-------------+-------------+------------+------------+--------------------+----------------------+---------------+----------+-------------+------------+
|  CAND_ID|          CAND_NAME|CAND_ICI|PTY_CD|CAND_PTY_AFFILIATION|TTL_RECEIPTS|TRANS_FROM_AUTH|  TTL_DISB|TRANS_TO_AUTH|  COH_BOP|  COH_COP|CAND_CONTRIB|CAND_LOANS|OTHER_LOANS|CAND_LOAN_REPAY|OTHER_LOAN_REPAY|DEBTS_OWED_BY|TTL_INDIV_CONTRIB|CAND_OFFICE_ST|CAND_OFFICE_DISTRICT|SPEC_ELECTION|PRIM_ELECTION|RUN_ELECTION|GEN_ELECTION|GEN_ELECTION_PRECENT|OTHER_POL_CMTE_CONTRIB|POL_PTY_CONTRIB|CVG_END_DT|INDIV_REFUNDS|CMTE_REFUNDS|
+---------+-------------------+--------+------+--------------------+------------+---------------+----------+-------------+---------+----

                                                                                

In [172]:
all_candidate_null_count = get_null_like_counts(all_candidates_df)
all_candidate_null_count.show()

[Stage 259:>                                                        (0 + 1) / 1]

+-------+---------+--------+------+--------------------+------------+---------------+--------+-------------+-------+-------+------------+----------+-----------+---------------+----------------+-------------+-----------------+--------------+--------------------+-------------+-------------+------------+------------+--------------------+----------------------+---------------+----------+-------------+------------+
|CAND_ID|CAND_NAME|CAND_ICI|PTY_CD|CAND_PTY_AFFILIATION|TTL_RECEIPTS|TRANS_FROM_AUTH|TTL_DISB|TRANS_TO_AUTH|COH_BOP|COH_COP|CAND_CONTRIB|CAND_LOANS|OTHER_LOANS|CAND_LOAN_REPAY|OTHER_LOAN_REPAY|DEBTS_OWED_BY|TTL_INDIV_CONTRIB|CAND_OFFICE_ST|CAND_OFFICE_DISTRICT|SPEC_ELECTION|PRIM_ELECTION|RUN_ELECTION|GEN_ELECTION|GEN_ELECTION_PRECENT|OTHER_POL_CMTE_CONTRIB|POL_PTY_CONTRIB|CVG_END_DT|INDIV_REFUNDS|CMTE_REFUNDS|
+-------+---------+--------+------+--------------------+------------+---------------+--------+-------------+-------+-------+------------+----------+-----------+------------

                                                                                

In [173]:
### Dropping 'SPEC_ELECTION', 'RUN_ELECTION', 'GEN_ELECTION', 'GEN_ELECTION_PRECENT' columns because post 2006 they have not provided any data for these column

all_candidates_columns_to_drop = ['SPEC_ELECTION',
 'PRIM_ELECTION',
 'RUN_ELECTION',
 'GEN_ELECTION',
 'GEN_ELECTION_PRECENT'
 ]

for col_to_drop in all_candidates_columns_to_drop:
    all_candidates_df = all_candidates_df.drop(col_to_drop)


In [174]:
## Enriching the data for CAND_ICI and CAND_PTY_AFFILIATION

all_candidates_df = all_candidates_df.withColumn(
    'CAND_ICI_FULL',
    F.when(F.col('CAND_ICI') == 'I', 'Incumbent')
     .when(F.col('CAND_ICI') == 'C', 'Challenger')
     .when(F.col('CAND_ICI') == 'O', 'Open seat')
     .otherwise('Unknown')
)

In [175]:
@F.udf(StringType())
def map_party_code(party_code):
    return party_code_description_dict.get(party_code)

In [176]:
all_candidates_df = all_candidates_df.withColumn(
    "CAND_PTY_AFFILIATION_FULL",
    map_party_code(F.col('CAND_PTY_AFFILIATION'))
)

In [177]:
all_candidate_null_count = get_null_like_counts(all_candidates_df)
all_candidate_null_count.show()

[Stage 262:>                                                        (0 + 1) / 1]

+-------+---------+--------+------+--------------------+------------+---------------+--------+-------------+-------+-------+------------+----------+-----------+---------------+----------------+-------------+-----------------+--------------+--------------------+----------------------+---------------+----------+-------------+------------+-------------+-------------------------+
|CAND_ID|CAND_NAME|CAND_ICI|PTY_CD|CAND_PTY_AFFILIATION|TTL_RECEIPTS|TRANS_FROM_AUTH|TTL_DISB|TRANS_TO_AUTH|COH_BOP|COH_COP|CAND_CONTRIB|CAND_LOANS|OTHER_LOANS|CAND_LOAN_REPAY|OTHER_LOAN_REPAY|DEBTS_OWED_BY|TTL_INDIV_CONTRIB|CAND_OFFICE_ST|CAND_OFFICE_DISTRICT|OTHER_POL_CMTE_CONTRIB|POL_PTY_CONTRIB|CVG_END_DT|INDIV_REFUNDS|CMTE_REFUNDS|CAND_ICI_FULL|CAND_PTY_AFFILIATION_FULL|
+-------+---------+--------+------+--------------------+------------+---------------+--------+-------------+-------+-------+------------+----------+-----------+---------------+----------------+-------------+-----------------+--------------+--

                                                                                

In [178]:
## Replacing null value with Unknnow for CAND_ICI and CAND_PTY_AFFILIATION
all_candidates_df = all_candidates_df.fillna({'CAND_ICI' : 'Unknown', 'CAND_PTY_AFFILIATION' : 'Unknown', 'CAND_PTY_AFFILIATION_FULL' : 'Unknown'})

all_candidates_df = all_candidates_df.withColumn(
    'CAND_PTY_AFFILIATION_FULL',
    F.when(F.col('CAND_PTY_AFFILIATION_FULL') == 'None', 'Unknown')
     .otherwise(F.col('CAND_PTY_AFFILIATION_FULL'))
)

In [179]:
all_candidate_null_count = get_null_like_counts(all_candidates_df)
all_candidate_null_count.show()

[Stage 265:>                                                        (0 + 1) / 1]

+-------+---------+--------+------+--------------------+------------+---------------+--------+-------------+-------+-------+------------+----------+-----------+---------------+----------------+-------------+-----------------+--------------+--------------------+----------------------+---------------+----------+-------------+------------+-------------+-------------------------+
|CAND_ID|CAND_NAME|CAND_ICI|PTY_CD|CAND_PTY_AFFILIATION|TTL_RECEIPTS|TRANS_FROM_AUTH|TTL_DISB|TRANS_TO_AUTH|COH_BOP|COH_COP|CAND_CONTRIB|CAND_LOANS|OTHER_LOANS|CAND_LOAN_REPAY|OTHER_LOAN_REPAY|DEBTS_OWED_BY|TTL_INDIV_CONTRIB|CAND_OFFICE_ST|CAND_OFFICE_DISTRICT|OTHER_POL_CMTE_CONTRIB|POL_PTY_CONTRIB|CVG_END_DT|INDIV_REFUNDS|CMTE_REFUNDS|CAND_ICI_FULL|CAND_PTY_AFFILIATION_FULL|
+-------+---------+--------+------+--------------------+------------+---------------+--------+-------------+-------+-------+------------+----------+-----------+---------------+----------------+-------------+-----------------+--------------+--

                                                                                

In [180]:
## Trimming trailing edges

for column in all_candidates_df.columns:
  if isinstance(all_candidates_df.schema[column].dataType, StringType):
    all_candidates_df = all_candidates_df.withColumn(column, F.trim(F.col(column)))  

In [181]:
all_candidates_df = all_candidates_df.withColumn('ADJ_RECEIPTS', F.col('TTL_RECEIPTS') - F.col('TRANS_FROM_AUTH'))
all_candidates_df = all_candidates_df.withColumn('ADJ_DISBURSEMENTS', F.col('TTL_DISB') - F.col('TRANS_TO_AUTH'))

In [182]:
all_candidates_df = all_candidates_df.withColumn('CVG_END_DT', F.to_date(F.col('CVG_END_DT'), 'MM/dd/yyyy'))

In [183]:
all_candidates_df.write.mode('overwrite').parquet(f"{gcs_bucket_path}/{gcs_output_path}/all_candidates/")

                                                                                

---

## Cleaning Candidate Master Dataset 

In [None]:
jdbc_url = "jdbc:mysql://<Hostname>:3306/<dbname>"  # Replace with actual hostname and database name

table_name = "candidate_master"
properties = {
    "user" : "Username Here",  # Replace with actual username
    "password" : "Password Here"  # Replace with actual password
}

In [184]:
candidate_master_df = spark.read.jdbc(
    url=jdbc_url,
    table=table_name,
    properties=properties
)

candidate_master_df.show(5)

[Stage 269:>                                                        (0 + 1) / 1]

+---------+--------------------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+---------+--------------------+--------+------------+-------+--------+
|  CAND_ID|           CAND_NAME|CAND_PTY_AFFILIATION|CAND_ELECTION_YR|CAND_OFFICE_ST|CAND_OFFICE|CAND_OFFICE_DISTRICT|CAND_ICI|CAND_STATUS| CAND_PCC|            CAND_ST1|CAND_ST2|   CAND_CITY|CAND_ST|CAND_ZIP|
+---------+--------------------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+---------+--------------------+--------+------------+-------+--------+
|H0AK00105|        LAMB, THOMAS|                 NNE|            2020|            AK|          H|                 0.0|       C|          N|C00607515|1861 W LAKE LUCIL...|     nan|     WASILLA|     AK| 99654.0|
|H0AK00113|   TUGATUK, RAY SEAN|                 DEM|            2020|            AK|          H|                 0.0|       C|          N|      nan|          P

                                                                                

In [185]:
candidate_master_null_count = get_null_like_counts(candidate_master_df)

candidate_master_null_count.show(truncate=False)

[Stage 270:>                                                        (0 + 1) / 1]

+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+--------+--------+--------+---------+-------+--------+
|CAND_ID|CAND_NAME|CAND_PTY_AFFILIATION|CAND_ELECTION_YR|CAND_OFFICE_ST|CAND_OFFICE|CAND_OFFICE_DISTRICT|CAND_ICI|CAND_STATUS|CAND_PCC|CAND_ST1|CAND_ST2|CAND_CITY|CAND_ST|CAND_ZIP|
+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+--------+--------+--------+---------+-------+--------+
|0      |0        |7                   |0               |0             |0          |7                   |322     |0          |1421    |189     |6844    |3        |179    |190     |
+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+--------+--------+--------+---------+-------+--------+



                                                                                

In [186]:
candidate_master_df.printSchema()

root
 |-- CAND_ID: string (nullable = true)
 |-- CAND_NAME: string (nullable = true)
 |-- CAND_PTY_AFFILIATION: string (nullable = true)
 |-- CAND_ELECTION_YR: string (nullable = true)
 |-- CAND_OFFICE_ST: string (nullable = true)
 |-- CAND_OFFICE: string (nullable = true)
 |-- CAND_OFFICE_DISTRICT: string (nullable = true)
 |-- CAND_ICI: string (nullable = true)
 |-- CAND_STATUS: string (nullable = true)
 |-- CAND_PCC: string (nullable = true)
 |-- CAND_ST1: string (nullable = true)
 |-- CAND_ST2: string (nullable = true)
 |-- CAND_CITY: string (nullable = true)
 |-- CAND_ST: string (nullable = true)
 |-- CAND_ZIP: string (nullable = true)



In [187]:
candidate_master_df.count()

                                                                                

7758

Since I am working with the 2019-2020 dataset, and the total count of candidates in the all_candidates table is 3,980, I will limit my analysis of the candidate_master table to only those 3,980 candidates to ensure consistency across the datasets.

In [213]:
candidates_joined_df = candidate_master_df.join(all_candidates_df, on='CAND_ID', how='inner')

candidates_result_df = candidates_joined_df.select(
    candidate_master_df['*'],
    all_candidates_df["CAND_ICI_FULL"],
    all_candidates_df["CAND_PTY_AFFILIATION_FULL"]
)


In [214]:
candidate_master_null_count = get_null_like_counts(candidates_result_df)

candidate_master_null_count.show(truncate=False)

[Stage 758:>                                                        (0 + 1) / 1]

+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+--------+--------+--------+---------+-------+--------+-------------+-------------------------+
|CAND_ID|CAND_NAME|CAND_PTY_AFFILIATION|CAND_ELECTION_YR|CAND_OFFICE_ST|CAND_OFFICE|CAND_OFFICE_DISTRICT|CAND_ICI|CAND_STATUS|CAND_PCC|CAND_ST1|CAND_ST2|CAND_CITY|CAND_ST|CAND_ZIP|CAND_ICI_FULL|CAND_PTY_AFFILIATION_FULL|
+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+--------+--------+--------+---------+-------+--------+-------------+-------------------------+
|0      |0        |1                   |0               |0             |0          |0                   |78      |0          |6       |176     |3521    |1        |169    |176     |0            |0                        |
+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+--

                                                                                

In [215]:
candidates_result_df = candidates_result_df.drop("CAND_ST2")

In [216]:
columns_to_clean = [
    "CAND_PTY_AFFILIATION", "CAND_ICI", "CAND_PCC",
    "CAND_ST1", "CAND_CITY", "CAND_ST", "CAND_ZIP"
]

In [219]:
for col_name in columns_to_clean:
    candidates_result_df = candidates_result_df.withColumn(
        col_name,
        F.when(
            (F.col(col_name).isNull()) |
            (F.col(col_name) == "") |
            (F.col(col_name) == "None") |
            (F.col(col_name) == "NULL") |
            (F.isnan(F.col(col_name))),
            "Unknown"
        ).otherwise(F.col(col_name))
    )

In [220]:
candidate_master_null_count = get_null_like_counts(candidates_result_df)

candidate_master_null_count.show(truncate=False)

[Stage 776:>                                                        (0 + 1) / 1]

+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+--------+--------+---------+-------+--------+-------------+-------------------------+
|CAND_ID|CAND_NAME|CAND_PTY_AFFILIATION|CAND_ELECTION_YR|CAND_OFFICE_ST|CAND_OFFICE|CAND_OFFICE_DISTRICT|CAND_ICI|CAND_STATUS|CAND_PCC|CAND_ST1|CAND_CITY|CAND_ST|CAND_ZIP|CAND_ICI_FULL|CAND_PTY_AFFILIATION_FULL|
+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+--------+--------+---------+-------+--------+-------------+-------------------------+
|0      |0        |0                   |0               |0             |0          |0                   |0       |0          |0       |0       |0        |0      |0       |0            |0                        |
+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+--------+--------+--------

                                                                                

C = Statutory candidate
F = Statutory candidate for future election
N = Not yet a statutory candidate
P = Statutory candidate in prior cycle

In [221]:
candidates_result_df = candidates_result_df.withColumn(
    "CAND_STATUS_FULL",
    F.when(F.col("CAND_STATUS") == "C", "Statutory candidate")
     .when(F.col("CAND_STATUS") == 'F', "Statutory candidate for future election")
     .when(F.col("CAND_STATUS") == 'N', "Not yet a statutory candidate")
     .when(F.col("CAND_STATUS") == 'P', "Statutory candidate in prior cycle")
     .otherwise('Unknown')
)

In [222]:
candidates_result_df = candidates_result_df.withColumn(
    "CAND_OFFICE_FULL",
    F.when(F.col('CAND_OFFICE') == 'H', "House")
     .when(F.col('CAND_OFFICE') == 'P', "President")
     .when(F.col('CAND_OFFICE') == 'S', "Senate")
     .otherwise('Unknown')
)

In [224]:
## Trimming trailing edges

for column in candidates_result_df.columns:
  if isinstance(candidates_result_df.schema[column].dataType, StringType):
    candidates_result_df = candidates_result_df.withColumn(column, F.trim(F.col(column)))  

In [226]:
candidates_result_df = candidates_result_df.withColumn('CAND_ELECTION_YR', candidates_result_df['CAND_ELECTION_YR'].cast(IntegerType()))

In [227]:
candidates_result_df.write.mode('overwrite').parquet(f"{gcs_bucket_path}/{gcs_output_path}/candidates_master/")

                                                                                

---