## Phase 2: Data Cleaning & Transformation

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as _sum,  trim, lower, to_date, year, month, weekofyear, quarter, avg as _avg, countDistinct, to_date, upper, lit, regexp_replace
from pyspark.sql.types import DoubleType, IntegerType, DateType, StringType, LongType

#### Optimize based on configuration of cluster
- **Workers (2)**	 2 × n2-standard-4 (4 vCPUs, 15 GB Memory each)
- **Master**	 × n2-standard-4
- Total vCPUs	12 (4×3)
- Total = 8 vCPUs and ~30 GB memory across workers

In [2]:
spark = (
    SparkSession.builder
    .appName("DataCleaningAndTransformation")
    .config("spark.sql.shuffle.partitions", "80")               # Since 8 cores (2*4vCPU) , rule of thumb: 10 shuffle partitions per CPU core 
    .config("spark.executor.memory", "5g")                      # memory per executor (per node 10gb memory assigned to executor out of 15gb,giving room for overhead)
    .config("spark.driver.memory", "4g")                        # Adjust if driver needs more, can be increased to 6g
    .config("spark.executor.cores", "2")                        # Good parallelism, 4 executor across cluster which avoids overloading one executor
    .config("spark.dynamicAllocation.enabled", "true")          # Useful for Dataproc
    .getOrCreate()
)

25/06/13 14:24:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Reading stored data in parquet format

In [3]:
# Path to file stored in parquet format
bronze_path = 'gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/bronze/'

In [4]:
# Read Data from parquet

# All candidates Data
AllCand_df = spark.read.parquet(bronze_path + 'AllCand_df')

# Any transaction from one committee to another Data
TranOneComToAno_df = spark.read.parquet(bronze_path + 'TranOneComToAno_df')

# Candidate-committee linkages
CanComLink_df = spark.read.parquet(bronze_path + 'CanComLink_df')

# Contributions by individuals
ConByInd_df = spark.read.parquet(bronze_path + 'ConByInd_df')

# Contributions from committees to candidates & independent expenditure
ConFromComToCanIndExpen_df = spark.read.parquet(bronze_path + 'ConFromComToCanIndExpen_df') 

# House Senate current campaigns
HouSenCurCam_df = spark.read.parquet(bronze_path + 'HouSenCurCam_df')

# Operating expenditures
OpEx_df = spark.read.parquet(bronze_path + 'OpEx_df')

# PAC summary
PacSum_df = spark.read.parquet(bronze_path + 'PacSum_df')

# Candidate master
CandMast_df = spark.read.parquet(bronze_path + 'CandMast_df')

# Commitee master
CommMast_df = spark.read.parquet(bronze_path + 'CommMast_df')

                                                                                

## 1. Cleaning – handle missing values, removing trailing spaces etc.

### Check Null values count in each column

In [5]:
# Function to check null counts in dataframe
def null_counts (df):
    null_counts_df = df.select( [_sum( when(col(c).isNull() , 1).otherwise(0) ).alias(c) 
                              for c in df.columns] )
    return null_counts_df

### Basic cleaning function which trims spaces and replaces "", "NA", "N/A" with None

In [6]:
def base_clean(df):
    # Trim all string columns
    for c in df.columns:
        df = df.withColumn(c, when(col(c).isNotNull() & (df[c].cast("string").isNotNull()), trim(col(c))).otherwise(col(c)))
    
    # Replace "", "NA", "N/A" with None
    for c in df.columns:
        df = df.withColumn(c, when((col(c).isin('', 'NA', 'N/A')), None).otherwise(col(c)))
    
    return df

### Cleaning All candidates df
1. Fill nulls for CAND_ICI with 'U' (Unknown)
2. Fill nulls for CAND_PTY_AFFILIATION with 'UNK' 
3. Drop legacy or mostly-null columns which will not be used for analysis

In [7]:

def clean_AllCand_df(df):
    # Fill nulls for CAND_ICI with 'U' (Unknown)
    df = df.withColumn("CAND_ICI", when(col("CAND_ICI").isNull(), "U").otherwise(col("CAND_ICI")))

    # Fill nulls for CAND_PTY_AFFILIATION with 'UNK'
    df = df.withColumn("CAND_PTY_AFFILIATION", when(col("CAND_PTY_AFFILIATION").isNull(), "UNK").otherwise(col("CAND_PTY_AFFILIATION")))

    # Drop legacy or mostly-null columns
    df = df.drop("SPEC_ELECTION", "PRIM_ELECTION", "RUN_ELECTION", "GEN_ELECTION", "GEN_ELECTION_PRECENT")

    return df



### Cleaning Any transaction from one committee to another df
1. Drop legacy columns not required for further analysis
2. Handle nulls with default values or flags 


In [8]:
# Cleaning any transaction from one committee to another df
def clean_TranOneComToAno_df(df):
    
    # Drop legacy columns that have too many nulls
    cols_to_drop = ['EMPLOYER', 'OCCUPATION', 'OTHER_ID', 'MEMO_CD', 'MEMO_TEXT', 'SUB_ID']
    df = df.drop(*cols_to_drop)
    
    # Handle nulls with default values or flags
    df = df.fillna({
        "TRANSACTION_PGI": "U0000",   # Unknown election year
        "ENTITY_TP": "UNK",          # Unknown entity
        "NAME": "Unknown Name",
        "CITY": "Unknown City",
        "STATE": "NA",
        "ZIP_CODE": "00000",
        "TRANSACTION_DT": "01011900",  # Dummy old date 
        "TRAN_ID": "UNKNOWN"
    })
    
    return df


### Cleaning Candidate-committee linkage df
1. Trim all string columns
2. Drop duplicates 


In [9]:
def clean_CanComLink_df(df):
    # Trim all string columns
    for column in df.columns:
        df = df.withColumn(column, trim(col(column)))
        
    # Drop duplicates
    df = df.dropDuplicates()
    
    return df

### Cleaning Contributions by individuals df
1. Filter out rows missing truly critical information
2. Keep only individual contributions 
3. Drop legacy columns
4. Replace nulls using fillna

In [10]:
# Cleaning Contributions by individuals df
def clean_ConByInd_df(df):
    
    # Filter out rows missing truly critical information
    df = df.filter(col('TRANSACTION_AMT').isNotNull())    
    df = df.filter(col('TRANSACTION_DT').isNotNull())
    df = df.filter(col('SUB_ID').isNotNull())

    # Keep only individual contributions
    df = df.filter(col('OTHER_ID').isNull())

    # Drop legacy columns
    cols_to_drop = ['OTHER_ID', 'MEMO_CD', 'MEMO_TEXT']
    df = df.drop(*cols_to_drop)

    # Replace nulls using fillna
    df = df.fillna({
        'TRANSACTION_PGI': 'O',
        'ENTITY_TP': 'IND', # null likely indicates an unrecorded individual contributor
        'NAME': 'UNKNOWN',
        'CITY': 'UNKNOWN',
        'STATE': 'NA',
        'ZIP_CODE': '00000',
        'EMPLOYER': 'UNKNOWN',
        'OCCUPATION': 'UNKNOWN',
        'TRAN_ID': 'MISSING',
        'FILE_NUM': 0
    })

    return df


### Cleaning Contributions from committees to candidates & independent expenditures df
1. Filter Columns where OTHER_ID is notNull since Null indicates contributions from individuals
2. Filter Columns where CAND_ID is notNUll 
3. Replace nulls using fillna
4. Drop rarely used or high-null columns

In [11]:
def clean_ConFromComToCanIndExpen_df(df):
    # Filter Columns where OTHER_ID is notNull since Null 
    # indicates contributions from individuals
    df = df.filter(col('OTHER_ID').isNotNull())
    
    # Filter Columns where CAND_ID is notNUll
    df = df.filter(col('CAND_ID').isNotNull())
  
    # Replace nulls using fillna
    df = df.fillna({
        'TRANSACTION_PGI': 'UKN',
        'ENTITY_TP' : 'COM', # the contributor is typically a committee here in com to cand contr
        'NAME': 'UNKNOWN',
        'CITY': 'UNKNOWN',
        'STATE': 'NA',
        'ZIP_CODE': '00000',
        'TRANSACTION_DT': '12319999',
        'TRAN_ID': 'MISSING',
    })
    
    # Drop rarely used or high-null columns
    cols_to_drop = ['EMPLOYER', 'OCCUPATION', 'OTHER_ID', 'MEMO_CD', 'MEMO_TEXT']
    df = df.drop(*cols_to_drop)
        
    return df


### Cleaning Current campaigns for House and Senate file df
1. Fill NUll value for col CAND_ICI as U (Unkown)
2. Fill nulls in CVG_END_DT with default date
3. Drop rarely used or high-null columns

In [12]:
def clean_HouSenCurCam_df(df):
    # Fill NUll value for col CAND_ICI as U (Unkown)
    df = df.fillna({'CAND_ICI': 'U'})
    
    # Fill nulls in CVG_END_DT with default date
    df = df.fillna({'CVG_END_DT': '12/31/9999'})
    
    # Drop rarely used or high-null columns
    cols_to_drop = ['SPEC_ELECTION', 'PRIM_ELECTION', 'RUN_ELECTION', 'GEN_ELECTION', 'GEN_ELECTION_PRECENT']
    df = df.drop(*cols_to_drop)
        
    return df


### Cleaning Operating expenditures df
1. Drop irrelevant or mostly-null columns
2. Handle Null values

In [13]:
def clean_OpEx_df(df):
    # Drop irrelevant or mostly-null columns
    cols_to_drop = ['MEMO_CD', 'MEMO_TEXT', 'BACK_REF_TRAN_ID', 'extra_column']
    df = df.drop(*cols_to_drop)
    
    # Handle Null values
    fill_values = {
    "NAME": "UNKNOWN",
    "CITY": "UNKNOWN",
    "STATE": "UNK",
    "ZIP_CODE": "00000",
    "TRANSACTION_DT": "12/31/9999",
    "TRANSACTION_PGI": "N/A",
    "PURPOSE": "UNSPECIFIED",
    "CATEGORY": "MISC",
    "CATEGORY_DESC": "MISCELLANEOUS",
    "ENTITY_TP": "UNKNOWN"
    }
    df = df.fillna(fill_values)

    return df

### Cleaning PAC and party summary df
1. Drop legacy columns
2. Fill nulls in numeric columns with 0.0

In [14]:
def clean_PacSum_df(df):
    # Drop legacy columns
    legacy_cols = [
        "CMTE_DSGN","CAND_CONTRIB", "CAND_LOANS", "TTL_LOANS_RECEIVED", "CAND_LOAN_REPAY",
        "OTHER_POL_CMTE_REFUNDS", "INDV_REFUNDS", "NONFED_TRANS_RECEIVED", 
        "NONFED_SHARE_EXP"
    ]
    df = df.drop(*legacy_cols)

    # Fill nulls in numeric columns with 0.0
    numeric_cols = [
        "TTL_RECEIPTS", "TRANS_FROM_AFF", "INDV_CONTRIB", "OTHER_POL_CMTE_CONTRIB",
        "TTL_DISB", "TRANF_TO_AFF", "LOAN_REPAY", "COH_BOP", "COH_COP", "DEBTS_OWED_BY",
        "CONTRIB_TO_OTHER_CMTE", "IND_EXP", "PTY_COORD_EXP"
    ]
    for col_name in numeric_cols:
        df = df.withColumn(col_name, when(col(col_name).isNull(), lit("0.0")).otherwise(col(col_name)))

    # Handle nulls in date
    df = df.withColumn("CVG_END_DT", when(col("CVG_END_DT").isNull(), lit("12/31/9999")).otherwise(col("CVG_END_DT")))

    return df

### Cleaning Candidate master df
1. Drop redundant or rarely useful columns
2. Handle nulls / empty values

In [15]:

def clean_CandMast_df(df):
    # Drop redundant or rarely useful columns
    df = df.drop("CAND_ST1", "CAND_ST2", "CAND_PCC")

    # Handle nulls / empty values
    df = df.fillna({
        "CAND_PTY_AFFILIATION": "UNK", # Unknown party
        "CAND_OFFICE_DISTRICT": "-1",  # -1 means non-district-based
        "CAND_ICI": "U",              # U = Unknown status
        "CAND_CITY": "UNKNOWN",                
        "CAND_ST": "NA",                       
        "CAND_ZIP": "00000"
    })
    
    return df


### Cleaning Committee master df
1. Drop mostly null or redundant address fields
2. Fill nulls with appropriate default values

In [16]:
def clean_CommMast_df(df):
    # Drop mostly null or redundant address fields
    df = df.drop("CMTE_ST1", "CMTE_ST2")

    # Fill nulls with appropriate default values
    df = df.fillna({
        "CMTE_NM": "UNKNOWN_COMMITTEE",
        "TRES_NM": "UNKNOWN_TREASURER",
        "CMTE_CITY": "UNKNOWN_CITY",
        "CMTE_ST": "NA",
        "CMTE_ZIP": "00000",
        "CMTE_DSGN": "U",  # Unknown designation
        "CMTE_TP": "U",    # Unknown type
        "CMTE_PTY_AFFILIATION": "OTH",  # Other
        "ORG_TP": "N/A",
        "CONNECTED_ORG_NM": "NONE",
        "CAND_ID": "UNLINKED"
    })

    return df


## 2. Typecasting – convert columns to correct data types

### Typecasting All candidates df

In [17]:

def typecast_AllCand_df(df):

    # List of columns to cast as DoubleType
    double_cols = [
        "TTL_RECEIPTS", "TRANS_FROM_AUTH", "TTL_DISB", "TRANS_TO_AUTH",
        "COH_BOP", "COH_COP", "CAND_CONTRIB", "CAND_LOANS", "OTHER_LOANS",
        "CAND_LOAN_REPAY", "OTHER_LOAN_REPAY", "DEBTS_OWED_BY",
        "TTL_INDIV_CONTRIB", "OTHER_POL_CMTE_CONTRIB", "POL_PTY_CONTRIB",
        "INDIV_REFUNDS", "CMTE_REFUNDS"
    ]

    # Cast numeric columns to Double
    for col_name in double_cols:
        df = df.withColumn(col_name, col(col_name).cast(DoubleType()))

    # Cast CVG_END_DT to DateType (assuming MM/dd/yyyy format)
    df = df.withColumn("CVG_END_DT", to_date(col("CVG_END_DT"), "MM/dd/yyyy"))

    return df


### Typecasting Any transaction from one committee to another df

In [18]:
def typecast_TranOneComToAno_df(df):
    
    df = df.withColumn("TRANSACTION_AMT", col("TRANSACTION_AMT").cast(DoubleType()))
    df = df.withColumn("FILE_NUM", col("FILE_NUM").cast(IntegerType()))
    
    # Convert date format (assuming MMDDYYYY or similar)
    df = df.withColumn("TRANSACTION_DT", to_date(col("TRANSACTION_DT"), "MMddyyyy"))
    
    return df

### Typecasting Candidate-committee linkage df

In [19]:
def typecast_CanComLink_df(df):
    df = df.withColumn("CAND_ELECTION_YR", col("CAND_ELECTION_YR").cast(IntegerType())) \
        .withColumn("FEC_ELECTION_YR", col("FEC_ELECTION_YR").cast(IntegerType())) \
        .withColumn("LINKAGE_ID", col("LINKAGE_ID").cast(IntegerType()))    
    return df 

### Typecasting Contributions by individuals df

In [20]:
def typecast_ConByInd_df(df):
    df = df.withColumn("TRANSACTION_AMT", col("TRANSACTION_AMT").cast(DoubleType())) \
        .withColumn("ZIP_CODE", col("ZIP_CODE").cast(StringType())) \
        .withColumn("TRANSACTION_DT", to_date(col("TRANSACTION_DT"), "MMddyyyy")) \
        .withColumn("FILE_NUM", col("FILE_NUM").cast(IntegerType())) \
        .withColumn("SUB_ID", col("SUB_ID").cast(LongType()))
    return df

### Typecasting Contributions from committees to candidates and independent expenditures df

In [21]:
def typecast_ConFromComToCanIndExpen_df(df):
    df = df.withColumn("TRANSACTION_AMT", col("TRANSACTION_AMT").cast(DoubleType())) \
        .withColumn("FILE_NUM", col("FILE_NUM").cast(IntegerType())) \
        .withColumn("SUB_ID", col("SUB_ID").cast(LongType())) \
        .withColumn("TRANSACTION_DT", to_date(col("TRANSACTION_DT"), "MMddyyyy")) \
        .withColumn("ZIP_CODE", col("ZIP_CODE").cast(StringType()))
    return df

### Typecasting Current campaigns for House and Senate file df

In [22]:
def typecast_HouSenCurCam_df(df):
    # Financial fields
    money_cols = [
        "TTL_RECEIPTS", "TRANS_FROM_AUTH", "TTL_DISB", "TRANS_TO_AUTH", "COH_BOP", "COH_COP",
        "CAND_CONTRIB", "CAND_LOANS", "OTHER_LOANS", "CAND_LOAN_REPAY", "OTHER_LOAN_REPAY",
        "DEBTS_OWED_BY", "TTL_INDIV_CONTRIB", "OTHER_POL_CMTE_CONTRIB", "POL_PTY_CONTRIB",
        "INDIV_REFUNDS", "CMTE_REFUNDS"
    ]
    for col_name in money_cols:
        df = df.withColumn(col_name, df[col_name].cast(DoubleType()))
    
    df = df.withColumn("CVG_END_DT", to_date(df["CVG_END_DT"], "MM/dd/yyyy"))
    
    return df

### Typecasting Operating expenditures df

In [23]:
def typecast_OpEx_df(df):
    
    df = df.withColumn("TRANSACTION_AMT", col("TRANSACTION_AMT").cast(DoubleType())) \
        .withColumn("SUB_ID", col("SUB_ID").cast(LongType())) \
        .withColumn("FILE_NUM", col("FILE_NUM").cast(IntegerType())) \
        .withColumn("RPT_YR", col("RPT_YR").cast(IntegerType())) \
        .withColumn("TRANSACTION_DT", to_date(col("TRANSACTION_DT"), "MM/dd/yyyy")) \
        .withColumn("ZIP_CODE", col("ZIP_CODE").cast("string"))

    return df

### Typecasting PAC and party summary df

In [24]:
def typecast_PacSum_df(df):
    # numeric cols
    numeric_cols = [
        "TTL_RECEIPTS", "TRANS_FROM_AFF", "INDV_CONTRIB", "OTHER_POL_CMTE_CONTRIB",
        "TTL_DISB", "TRANF_TO_AFF", "LOAN_REPAY", "COH_BOP", "COH_COP", "DEBTS_OWED_BY",
        "CONTRIB_TO_OTHER_CMTE", "IND_EXP", "PTY_COORD_EXP"
    ]
    for col_name in numeric_cols:
        df = df.withColumn(col_name, df[col_name].cast(DoubleType()))
    
    df = df.withColumn("CVG_END_DT", to_date(df["CVG_END_DT"], "MM/dd/yyyy"))
    
    return df

### Typecasting Candidate master df

In [25]:
def typecast_CandMast_df(df):
    df = df.withColumn("CAND_ELECTION_YR", col("CAND_ELECTION_YR").cast(IntegerType())) \
        .withColumn("CAND_OFFICE_DISTRICT", col("CAND_OFFICE_DISTRICT").cast(IntegerType())) \
        .withColumn("CAND_ZIP", col("CAND_ZIP").cast(StringType()))
    return df

## 3. Feature Engineering – derive new features, aggregate, normalize


### Feature Engineering for All candidates df
1. Extract Year, Month and Quarter from end coverage date and normalise state and district
2. Adjusting TTL_RECEIPTS & TTL_DISB for intra-committee transfers
3. Creating new features for Financial Behavior like net_cash_position, cash_delta, self_sufficiency_ratio, loan_dependence_ratio etc.
4. Aggregate Data by geography like candidate state and district

In [26]:
def features_AllCand_df(df):

    # Extract Year, Month and Quarter from end coverage date and normalise state and district
    df = df.withColumn('CVG_END_DT', to_date('CVG_END_DT','MM/dd/yyyy')) \
            .withColumn("YEAR", year('CVG_END_DT')) \
            .withColumn("MONTH", month('CVG_END_DT')) \
            .withColumn("QUARTER", quarter('CVG_END_DT')) \
            .withColumn("CAND_OFFICE_ST", upper(col("CAND_OFFICE_ST"))) \
            .withColumn("CAND_OFFICE_DISTRICT", upper(col("CAND_OFFICE_DISTRICT")))
    
    # Adjusting for intra-committee transfers
    df = df.withColumn("TTL_RECEIPTS", col("TTL_RECEIPTS") - col("TRANS_FROM_AUTH")) \
                   .withColumn("TTL_DISB", col("TTL_DISB") - col("TRANS_TO_AUTH"))
    
    # Financial Behavior Feature Engineering
    df = df.withColumn("net_cash_position", col("TTL_RECEIPTS") - col("TTL_DISB")) \
    .withColumn("cash_delta", col("COH_COP") - col("COH_BOP")) \
    .withColumn("self_sufficiency_ratio", when(col("TTL_RECEIPTS") != 0, col("CAND_CONTRIB") / col("TTL_RECEIPTS")).otherwise(0)) \
    .withColumn("loan_dependence_ratio", when(col("TTL_RECEIPTS") != 0, 
        (col("CAND_LOANS") + col("OTHER_LOANS")) / col("TTL_RECEIPTS")).otherwise(0)) \
    .withColumn("refund_ratio", when(col("TTL_RECEIPTS") != 0, 
        (col("INDIV_REFUNDS") + col("CMTE_REFUNDS")) / col("TTL_RECEIPTS")).otherwise(0)) \
    .withColumn("indiv_contrib_ratio", when(col("TTL_RECEIPTS") != 0, 
        col("TTL_INDIV_CONTRIB") / col("TTL_RECEIPTS")).otherwise(0))
    return df

def geo_AllCand_df(df):
    """
    Aggregate Data by geography.
    """
    # Aggregation by candidate state and district
    geo_agg_df = df.groupBy("CAND_OFFICE_ST", "CAND_OFFICE_DISTRICT").agg(
    _sum("TTL_RECEIPTS").alias("total_receipts"),
    _sum("TTL_DISB").alias("total_disbursements"),
    _sum("CAND_CONTRIB").alias("total_candidate_contributions"),
    _sum("CAND_LOANS").alias("total_candidate_loans"),
    _sum("OTHER_LOANS").alias("total_other_loans"),
    _sum("INDIV_REFUNDS").alias("total_indiv_refunds"),
    _sum("CMTE_REFUNDS").alias("total_cmte_refunds"),
    _avg("TTL_RECEIPTS").alias("avg_receipts_per_candidate"),
    _avg("TTL_DISB").alias("avg_disbursements_per_candidate")
    )
    return geo_agg_df

### Feature Engineering for Any transaction from one committee to another df
1. Normalise state, transaction type and entity type
2. Total transaction amount by state aggreagation
3. Total contributions by entity type aggreagation


In [27]:

def features_TranOneComToAno_df(df):
    
    # Normalize 
    df = df.withColumn("TRANSACTION_TP", upper(col("TRANSACTION_TP"))) \
        .withColumn("ENTITY_TP", upper(col("ENTITY_TP"))) \
        .withColumn("STATE", upper(col("STATE")))

    # Total transaction amount by state
    agg_by_state = df.groupBy("STATE").agg(
        _sum("TRANSACTION_AMT").alias("TOTAL_AMT_STATE")
    )

    # Total contributions by entity type
    agg_by_entity = df.groupBy("ENTITY_TP").agg(
        _sum("TRANSACTION_AMT").alias("TOTAL_AMT_ENTITY"),
        countDistinct("NAME").alias("UNIQUE_CONTRIBUTORS")
    )

    return df, agg_by_state, agg_by_entity


### Feature Engineering for Candidate-committee linkage df
1. Add committee type descriptions
2. Add committee designation categories (P=Principal, A=Authorized, etc.)


In [28]:

def features_CanComLink_df(df):
    # Add committee type descriptions
    df = df.withColumn("CMTE_TP_DESC", 
                       when(col("CMTE_TP") == "P", "Presidential")
                      .when(col("CMTE_TP") == "H", "House")
                      .when(col("CMTE_TP") == "S", "Senate")
                      .when(col("CMTE_TP") == "X", "Independent Expenditure")
                      .otherwise("Other"))
    # Add committee designation categories (P=Principal, A=Authorized, etc.)
    df = df.withColumn("CMTE_DSGN_DESC", 
                       when(col("CMTE_DSGN") == "P", "Principal Campaign Committee")
                      .when(col("CMTE_DSGN") == "A", "Authorized Committee")
                      .when(col("CMTE_DSGN") == "J", "Joint Fundraiser")
                      .when(col("CMTE_DSGN") == "U", "Unauthorized")
                      .otherwise("Other"))
    
    return df


### Feature Engineering for Contributions by individuals df
1. Extract year, month, week
2. Normalize state
3. Normalize donor names by removing middle name

In [29]:
def features_ConByInd_df(df):
    
    # Extract year, month, week and normalize State
    df = df.withColumn("YEAR", year(col("TRANSACTION_DT"))) \
           .withColumn("MONTH", month(col("TRANSACTION_DT"))) \
           .withColumn("WEEK", weekofyear(col("TRANSACTION_DT"))) \
           .withColumn("STATE", upper(col("STATE")))  # Normalize state names

    # Normalize donor names by removing middle name
    df = df.withColumn("NAME", regexp_replace("NAME", r"\s+[A-Z]\.$", ""))
    
    return df

### Feature Engineering for Contributions from committees to candidates df

1. Extract year, month, weekofyear
2. Normalize state, entity and transaction type


In [30]:
def features_ConFromComToCanIndExpen_df(df):

    # Extract year, month, week and normalize State
    df = df.withColumn("YEAR", year(col("TRANSACTION_DT"))) \
        .withColumn("MONTH", month(col("TRANSACTION_DT"))) \
        .withColumn("WEEK", weekofyear(col("TRANSACTION_DT"))) \
        .withColumn("STATE", upper(col("STATE"))) \
        .withColumn("ENTITY_TP", upper(col("ENTITY_TP"))) \
        .withColumn("TRANSACTION_TP", upper(col("TRANSACTION_TP")))
    
    return df


### Feature Engineering for Current campaigns for House and Senate df

1. Create Feature Total loans, repay, contributions and net reciepts


In [31]:
def features_HouSenCurCam_df(df):
    
    # Create Feature Total loans, repay, contributions and net reciepts
    df = df.withColumn("TOTAL_LOANS", col("CAND_LOANS") + col("OTHER_LOANS")) \
        .withColumn("TOTAL_REPAY", col("CAND_LOAN_REPAY") + col("OTHER_LOAN_REPAY")) \
        .withColumn("NET_RECEIPTS", col("TTL_RECEIPTS") - col("TTL_DISB")) \
        .withColumn("TOTAL_CONTRIB", col("TTL_INDIV_CONTRIB") + col("OTHER_POL_CMTE_CONTRIB") + col("POL_PTY_CONTRIB"))
    
    return df

### Feature Engineering for Operating expenditures df
1. Extract features YEAR and MONTH and normalise STATE , CATEGORY, 
   CATEGORY_DESC, ENTITY_TP in upper case


In [32]:
def features_OpEx_df(df):
    
    # Extract features YEAR and MONTH and normalise STATE , CATEGORY, 
    # CATEGORY_DESC, ENTITY_TP in upper case
    df = df.withColumn("YEAR", year(col("TRANSACTION_DT"))) \
        .withColumn("MONTH", month(col("TRANSACTION_DT"))) \
        .withColumn("STATE", upper(col("STATE"))) \
        .withColumn("CATEGORY", upper(col("CATEGORY"))) \
        .withColumn("CATEGORY_DESC", upper(col("CATEGORY_DESC"))) \
        .withColumn("ENTITY_TP", upper(col("ENTITY_TP")))
    return df

### Feature Engineering for PAC and party summary df
1. Calculating net cash flow, Contribution ratios, Running balances, Year extraction


In [33]:
def features_PacSum_df(df):
    
        # calculating net cash flow, Contribution ratios, Running balances, Year extraction
        df = df.withColumn("NET_CASH_FLOW", col("TTL_RECEIPTS") - col("TTL_DISB"))
        df = df.withColumn(
            "INDV_CONTRIB_RATIO",
            when( col("TTL_RECEIPTS") !=0, col("INDV_CONTRIB")/col("TTL_RECEIPTS") ).otherwise(0)
        )
        df = df.withColumn("YEAR", year(col("CVG_END_DT")))
        return df

### Feature Engineering for Candidate master df
1. Create flags and normalized versions 
2. Map CAND_ICI (Incumbent/Challenger/Open Seat) 


In [34]:
def features_CandMast_df(df):

    # Create flags and normalized versions
    # Map CAND_ICI (Incumbent/Challenger/Open Seat) 
    df = df .withColumn("CAND_OFFICE", upper(col("CAND_OFFICE"))) \
        .withColumn("CAND_PTY_AFFILIATION", upper(col("CAND_PTY_AFFILIATION"))) \
        .withColumn("IS_INCUMBENT", when(col("CAND_ICI") == "I", 1).otherwise(0)) \
        .withColumn("IS_CHALLENGER", when(col("CAND_ICI") == "C", 1).otherwise(0)) \
        .withColumn("IS_OPEN_SEAT", when(col("CAND_ICI") == "O", 1).otherwise(0)) 
    return df

### Feature Engineering for Committee master df
1. Flags for committee type and designation
2. Normalize party names
3. Flag candidate-linked committees


In [35]:
def features_CommMast_df(df):
    
    # Flags for committee type and designation
    # Normalize party names
    # Flag candidate-linked committees
    df = df.withColumn("CMTE_TP", upper(col("CMTE_TP"))) \
        .withColumn("CMTE_PTY_AFFILIATION", upper(col("CMTE_PTY_AFFILIATION"))) \
        .withColumn("IS_CAND_LINKED", when(col("CAND_ID").isNotNull(), 1).otherwise(0)) \
        .withColumn("IS_AUTH_CMTE", when(col("CMTE_DSGN") == "A", 1).otherwise(0)) \
        .withColumn("IS_PAC", when(col("CMTE_TP").isin("Q", "N", "O", "U"), 1).otherwise(0)) \
        .withColumn("IS_PARTY_CMTE", when(col("CMTE_TP") == "Y", 1).otherwise(0))
    return df


## 4. Write processed data to parquet format in processed directory

base_path = "gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver"

In [36]:
def write_df_to_parquet(df, file_name, base_path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver"):
    """
    Writes a Spark DataFrame to the specified GCS path in Parquet format.

    Args:
        df (DataFrame): Spark DataFrame to write.
        file_name (str): Folder name (like table name) for Parquet output.
        base_path (str): Base GCS path where data should be stored.
    """
    output_path = f"{base_path}/{file_name}"
    # write file in parquet in overwrite mode
    # Snappy compression for efficient storage 
    # and faster read/write with Parquet   
    df.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(output_path) 
    
    print(f"Data written to: {output_path}")


## Run cleaning and transformation pipeline and save data in processed layer

### For All candidates df

In [37]:
# All candidates

# Clean data
AllCand_df = base_clean(AllCand_df)
AllCand_df = clean_AllCand_df(AllCand_df)

# Type cast
AllCand_df = typecast_AllCand_df(AllCand_df)

# Feature Engineering
AllCand_df = features_AllCand_df(AllCand_df)

# Cache since AllCand_df reused in aggregation
AllCand_df = AllCand_df.cache()
print("AllCand_df cached")

# Derived AllCand_geoAgg_df using cached df above
AllCand_geoAgg_df = geo_AllCand_df(AllCand_df)

# Save Data in processed Layer 
write_df_to_parquet(AllCand_df,'AllCand_df')
write_df_to_parquet(AllCand_geoAgg_df,'AllCand_geoAgg_df')

# Uncache after writing to free memory
AllCand_df.unpersist()
print("AllCand_df uncached")


25/06/13 14:25:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


AllCand_df cached


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/AllCand_df


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/AllCand_geoAgg_df
AllCand_df uncached


In [38]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/AllCand_df"
AllCand_df = spark.read.parquet(path)

# Check Schema for correct dtypes
AllCand_df.printSchema()

# Check for null values
null_counts(AllCand_df).show()

root
 |-- CAND_ID: string (nullable = true)
 |-- CAND_NAME: string (nullable = true)
 |-- CAND_ICI: string (nullable = true)
 |-- PTY_CD: string (nullable = true)
 |-- CAND_PTY_AFFILIATION: string (nullable = true)
 |-- TTL_RECEIPTS: double (nullable = true)
 |-- TRANS_FROM_AUTH: double (nullable = true)
 |-- TTL_DISB: double (nullable = true)
 |-- TRANS_TO_AUTH: double (nullable = true)
 |-- COH_BOP: double (nullable = true)
 |-- COH_COP: double (nullable = true)
 |-- CAND_CONTRIB: double (nullable = true)
 |-- CAND_LOANS: double (nullable = true)
 |-- OTHER_LOANS: double (nullable = true)
 |-- CAND_LOAN_REPAY: double (nullable = true)
 |-- OTHER_LOAN_REPAY: double (nullable = true)
 |-- DEBTS_OWED_BY: double (nullable = true)
 |-- TTL_INDIV_CONTRIB: double (nullable = true)
 |-- CAND_OFFICE_ST: string (nullable = true)
 |-- CAND_OFFICE_DISTRICT: string (nullable = true)
 |-- OTHER_POL_CMTE_CONTRIB: double (nullable = true)
 |-- POL_PTY_CONTRIB: double (nullable = true)
 |-- CVG_END_D

### For Any transaction from one committee to another df

In [39]:
# Any transaction from one committee to another

# Clean data
TranOneComToAno_df = base_clean(TranOneComToAno_df)
TranOneComToAno_df = clean_TranOneComToAno_df(TranOneComToAno_df)

#Type cast
TranOneComToAno_df = typecast_TranOneComToAno_df(TranOneComToAno_df)

# Feature Engineering
TranOneComToAno_df, agg_by_state, agg_by_entity = features_TranOneComToAno_df(TranOneComToAno_df)

# Save Data in processed Layer 
write_df_to_parquet(TranOneComToAno_df,'TranOneComToAno_df')


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/TranOneComToAno_df


In [40]:
# Print result of aggregation by state and entity
agg_by_state.show(5)
agg_by_entity.show(5)

                                                                                

+-----+---------------+
|STATE|TOTAL_AMT_STATE|
+-----+---------------+
|   IL|   3.22336839E8|
|   AZ|   1.32964588E8|
|   KS|    6.3710138E7|
|   LA|    8.1493663E7|
|   KY|    6.1464421E7|
+-----+---------------+
only showing top 5 rows





+---------+----------------+-------------------+
|ENTITY_TP|TOTAL_AMT_ENTITY|UNIQUE_CONTRIBUTORS|
+---------+----------------+-------------------+
|      CCM|   1.561849791E9|               4896|
|      UNK|     2.5480294E7|               1352|
|      PAC|   1.638461975E9|              14283|
|      IND|   1.550589473E9|            1154734|
|      ORG|   4.202744081E9|               7070|
+---------+----------------+-------------------+
only showing top 5 rows



                                                                                

In [41]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/TranOneComToAno_df"
TranOneComToAno_df = spark.read.parquet(path)

# Check Schema for correct dtypes
TranOneComToAno_df.printSchema()

# Check for null values
null_counts(TranOneComToAno_df).show()


root
 |-- CMTE_ID: string (nullable = true)
 |-- AMNDT_IND: string (nullable = true)
 |-- RPT_TP: string (nullable = true)
 |-- TRANSACTION_PGI: string (nullable = true)
 |-- IMAGE_NUM: string (nullable = true)
 |-- TRANSACTION_TP: string (nullable = true)
 |-- ENTITY_TP: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIP_CODE: string (nullable = true)
 |-- TRANSACTION_DT: date (nullable = true)
 |-- TRANSACTION_AMT: double (nullable = true)
 |-- TRAN_ID: string (nullable = true)
 |-- FILE_NUM: integer (nullable = true)





+-------+---------+------+---------------+---------+--------------+---------+----+----+-----+--------+--------------+---------------+-------+--------+
|CMTE_ID|AMNDT_IND|RPT_TP|TRANSACTION_PGI|IMAGE_NUM|TRANSACTION_TP|ENTITY_TP|NAME|CITY|STATE|ZIP_CODE|TRANSACTION_DT|TRANSACTION_AMT|TRAN_ID|FILE_NUM|
+-------+---------+------+---------------+---------+--------------+---------+----+----+-----+--------+--------------+---------------+-------+--------+
|      0|        0|     0|              0|        0|             0|        0|   0|   0|    0|       0|             0|              0|      0|       0|
+-------+---------+------+---------------+---------+--------------+---------+----+----+-----+--------+--------------+---------------+-------+--------+



                                                                                

### For Candidate-committee linkages df

In [42]:
# Candidate-committee linkages

# Clean data
CanComLink_df = base_clean(CanComLink_df)
CanComLink_df = clean_CanComLink_df(CanComLink_df)

#Type cast
CanComLink_df = typecast_CanComLink_df(CanComLink_df)

# Feature Engineering
CanComLink_df = features_CanComLink_df(CanComLink_df)

# Save Data in processed Layer 
write_df_to_parquet(CanComLink_df,'CanComLink_df')


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/CanComLink_df


In [43]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/CanComLink_df"
CanComLink_df = spark.read.parquet(path)

# Check Schema for correct dtypes
CanComLink_df.printSchema()

# Check for null values
null_counts(CanComLink_df).show()

root
 |-- CAND_ID: string (nullable = true)
 |-- CAND_ELECTION_YR: integer (nullable = true)
 |-- FEC_ELECTION_YR: integer (nullable = true)
 |-- CMTE_ID: string (nullable = true)
 |-- CMTE_TP: string (nullable = true)
 |-- CMTE_DSGN: string (nullable = true)
 |-- LINKAGE_ID: integer (nullable = true)
 |-- CMTE_TP_DESC: string (nullable = true)
 |-- CMTE_DSGN_DESC: string (nullable = true)

+-------+----------------+---------------+-------+-------+---------+----------+------------+--------------+
|CAND_ID|CAND_ELECTION_YR|FEC_ELECTION_YR|CMTE_ID|CMTE_TP|CMTE_DSGN|LINKAGE_ID|CMTE_TP_DESC|CMTE_DSGN_DESC|
+-------+----------------+---------------+-------+-------+---------+----------+------------+--------------+
|      0|               0|              0|      0|      0|        0|         0|           0|             0|
+-------+----------------+---------------+-------+-------+---------+----------+------------+--------------+



### For Contributions by individuals df

In [44]:
# Contributions by individuals

# Clean data
ConByInd_df = base_clean(ConByInd_df)
ConByInd_df = clean_ConByInd_df(ConByInd_df)

#Type cast
ConByInd_df = typecast_ConByInd_df(ConByInd_df)

# Feature Engineering
ConByInd_df = features_ConByInd_df(ConByInd_df)

# Save Data in processed Layer 
write_df_to_parquet(ConByInd_df,'ConByInd_df')


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/ConByInd_df


In [45]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/ConByInd_df"
ConByInd_df = spark.read.parquet(path)

# Check Schema for correct dtypes
ConByInd_df.printSchema()

# Check for null values
null_counts(ConByInd_df).show()

root
 |-- CMTE_ID: string (nullable = true)
 |-- AMNDT_IND: string (nullable = true)
 |-- RPT_TP: string (nullable = true)
 |-- TRANSACTION_PGI: string (nullable = true)
 |-- IMAGE_NUM: string (nullable = true)
 |-- TRANSACTION_TP: string (nullable = true)
 |-- ENTITY_TP: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIP_CODE: string (nullable = true)
 |-- EMPLOYER: string (nullable = true)
 |-- OCCUPATION: string (nullable = true)
 |-- TRANSACTION_DT: date (nullable = true)
 |-- TRANSACTION_AMT: double (nullable = true)
 |-- TRAN_ID: string (nullable = true)
 |-- FILE_NUM: integer (nullable = true)
 |-- SUB_ID: long (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- WEEK: integer (nullable = true)





+-------+---------+------+---------------+---------+--------------+---------+----+----+-----+--------+--------+----------+--------------+---------------+-------+--------+------+----+-----+----+
|CMTE_ID|AMNDT_IND|RPT_TP|TRANSACTION_PGI|IMAGE_NUM|TRANSACTION_TP|ENTITY_TP|NAME|CITY|STATE|ZIP_CODE|EMPLOYER|OCCUPATION|TRANSACTION_DT|TRANSACTION_AMT|TRAN_ID|FILE_NUM|SUB_ID|YEAR|MONTH|WEEK|
+-------+---------+------+---------------+---------+--------------+---------+----+----+-----+--------+--------+----------+--------------+---------------+-------+--------+------+----+-----+----+
|      0|        0|     0|              0|        0|             0|        0|   0|   0|    0|       0|       0|         0|             0|              0|      0|       0|     0|   0|    0|   0|
+-------+---------+------+---------------+---------+--------------+---------+----+----+-----+--------+--------+----------+--------------+---------------+-------+--------+------+----+-----+----+



                                                                                

### For Contributions from committees to candidates & independent expenditure df

In [46]:
# Contributions from committees to candidates & independent expenditure

# Clean data
ConFromComToCanIndExpen_df = base_clean(ConFromComToCanIndExpen_df)
ConFromComToCanIndExpen_df = clean_ConFromComToCanIndExpen_df(ConFromComToCanIndExpen_df)

#Type cast
ConFromComToCanIndExpen_df = typecast_ConFromComToCanIndExpen_df(ConFromComToCanIndExpen_df)

# Feature Engineering
ConFromComToCanIndExpen_df = features_ConFromComToCanIndExpen_df(ConFromComToCanIndExpen_df)

# Save Data in processed Layer 
write_df_to_parquet(ConFromComToCanIndExpen_df,'ConFromComToCanIndExpen_df')


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/ConFromComToCanIndExpen_df


In [47]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/ConFromComToCanIndExpen_df"
ConFromComToCanIndExpen_df = spark.read.parquet(path)

# Check Schema for correct dtypes
ConFromComToCanIndExpen_df.printSchema()

# Check for null values
null_counts(ConFromComToCanIndExpen_df).show()

root
 |-- CMTE_ID: string (nullable = true)
 |-- AMNDT_IND: string (nullable = true)
 |-- RPT_TP: string (nullable = true)
 |-- TRANSACTION_PGI: string (nullable = true)
 |-- IMAGE_NUM: string (nullable = true)
 |-- TRANSACTION_TP: string (nullable = true)
 |-- ENTITY_TP: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIP_CODE: string (nullable = true)
 |-- TRANSACTION_DT: date (nullable = true)
 |-- TRANSACTION_AMT: double (nullable = true)
 |-- CAND_ID: string (nullable = true)
 |-- TRAN_ID: string (nullable = true)
 |-- FILE_NUM: integer (nullable = true)
 |-- SUB_ID: long (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- WEEK: integer (nullable = true)





+-------+---------+------+---------------+---------+--------------+---------+----+----+-----+--------+--------------+---------------+-------+-------+--------+------+----+-----+----+
|CMTE_ID|AMNDT_IND|RPT_TP|TRANSACTION_PGI|IMAGE_NUM|TRANSACTION_TP|ENTITY_TP|NAME|CITY|STATE|ZIP_CODE|TRANSACTION_DT|TRANSACTION_AMT|CAND_ID|TRAN_ID|FILE_NUM|SUB_ID|YEAR|MONTH|WEEK|
+-------+---------+------+---------------+---------+--------------+---------+----+----+-----+--------+--------------+---------------+-------+-------+--------+------+----+-----+----+
|      0|        0|     0|              0|        0|             0|        0|   0|   0|    0|       0|             0|              0|      0|      0|       0|     0|   0|    0|   0|
+-------+---------+------+---------------+---------+--------------+---------+----+----+-----+--------+--------------+---------------+-------+-------+--------+------+----+-----+----+



                                                                                

### For House Senate current campaigns df

In [48]:
# House Senate current campaigns

# Clean data
HouSenCurCam_df = base_clean(HouSenCurCam_df)
HouSenCurCam_df = clean_HouSenCurCam_df(HouSenCurCam_df)

#Type cast
HouSenCurCam_df = typecast_HouSenCurCam_df(HouSenCurCam_df)

# Feature Engineering
HouSenCurCam_df = features_HouSenCurCam_df(HouSenCurCam_df)

# Save Data in processed Layer 
write_df_to_parquet(HouSenCurCam_df,'HouSenCurCam_df')


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/HouSenCurCam_df


In [49]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/HouSenCurCam_df"
HouSenCurCam_df = spark.read.parquet(path)

# Check Schema for correct dtypes
HouSenCurCam_df.printSchema()

# Check for null values
null_counts(HouSenCurCam_df).show()

root
 |-- CAND_ID: string (nullable = true)
 |-- CAND_NAME: string (nullable = true)
 |-- CAND_ICI: string (nullable = true)
 |-- PTY_CD: string (nullable = true)
 |-- CAND_PTY_AFFILIATION: string (nullable = true)
 |-- TTL_RECEIPTS: double (nullable = true)
 |-- TRANS_FROM_AUTH: double (nullable = true)
 |-- TTL_DISB: double (nullable = true)
 |-- TRANS_TO_AUTH: double (nullable = true)
 |-- COH_BOP: double (nullable = true)
 |-- COH_COP: double (nullable = true)
 |-- CAND_CONTRIB: double (nullable = true)
 |-- CAND_LOANS: double (nullable = true)
 |-- OTHER_LOANS: double (nullable = true)
 |-- CAND_LOAN_REPAY: double (nullable = true)
 |-- OTHER_LOAN_REPAY: double (nullable = true)
 |-- DEBTS_OWED_BY: double (nullable = true)
 |-- TTL_INDIV_CONTRIB: double (nullable = true)
 |-- CAND_OFFICE_ST: string (nullable = true)
 |-- CAND_OFFICE_DISTRICT: string (nullable = true)
 |-- OTHER_POL_CMTE_CONTRIB: double (nullable = true)
 |-- POL_PTY_CONTRIB: double (nullable = true)
 |-- CVG_END_D

### For Operating expenditures df

In [50]:
# Operating expenditures

# Clean data
OpEx_df = base_clean(OpEx_df)
OpEx_df = clean_OpEx_df(OpEx_df)

#Type cast
OpEx_df = typecast_OpEx_df(OpEx_df)

# Feature Engineering
OpEx_df = features_OpEx_df(OpEx_df)

# Save Data in processed Layer 
write_df_to_parquet(OpEx_df,'OpEx_df')


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/OpEx_df


In [51]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/OpEx_df"
OpEx_df = spark.read.parquet(path)

# Check Schema for correct dtypes
OpEx_df.printSchema()

# Check for null values
null_counts(OpEx_df).show()

root
 |-- CMTE_ID: string (nullable = true)
 |-- AMNDT_IND: string (nullable = true)
 |-- RPT_YR: integer (nullable = true)
 |-- RPT_TP: string (nullable = true)
 |-- IMAGE_NUM: string (nullable = true)
 |-- LINE_NUM: string (nullable = true)
 |-- FORM_TP_CD: string (nullable = true)
 |-- SCHED_TP_CD: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIP_CODE: string (nullable = true)
 |-- TRANSACTION_DT: date (nullable = true)
 |-- TRANSACTION_AMT: double (nullable = true)
 |-- TRANSACTION_PGI: string (nullable = true)
 |-- PURPOSE: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- CATEGORY_DESC: string (nullable = true)
 |-- ENTITY_TP: string (nullable = true)
 |-- SUB_ID: long (nullable = true)
 |-- FILE_NUM: integer (nullable = true)
 |-- TRAN_ID: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)



[Stage 57:>                                                         (0 + 4) / 4]

+-------+---------+------+------+---------+--------+----------+-----------+----+----+-----+--------+--------------+---------------+---------------+-------+--------+-------------+---------+------+--------+-------+----+-----+
|CMTE_ID|AMNDT_IND|RPT_YR|RPT_TP|IMAGE_NUM|LINE_NUM|FORM_TP_CD|SCHED_TP_CD|NAME|CITY|STATE|ZIP_CODE|TRANSACTION_DT|TRANSACTION_AMT|TRANSACTION_PGI|PURPOSE|CATEGORY|CATEGORY_DESC|ENTITY_TP|SUB_ID|FILE_NUM|TRAN_ID|YEAR|MONTH|
+-------+---------+------+------+---------+--------+----------+-----------+----+----+-----+--------+--------------+---------------+---------------+-------+--------+-------------+---------+------+--------+-------+----+-----+
|      0|        0|     0|     0|        0|       0|         0|          0|   0|   0|    0|       0|             0|              0|              0|      0|       0|            0|        0|     0|       0|      0|   0|    0|
+-------+---------+------+------+---------+--------+----------+-----------+----+----+-----+--------+----

                                                                                

### For PAC summary df

In [52]:
# PAC summary df

# Clean data
PacSum_df = base_clean(PacSum_df)
PacSum_df = clean_PacSum_df(PacSum_df)

#Type cast
PacSum_df = typecast_PacSum_df(PacSum_df)

# Feature Engineering
PacSum_df = features_PacSum_df(PacSum_df)

# Save Data in processed Layer 
write_df_to_parquet(PacSum_df,'PacSum_df')


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/PacSum_df


In [53]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/PacSum_df"
PacSum_df = spark.read.parquet(path)

# Check Schema for correct dtypes
PacSum_df.printSchema()

# Check for null values
null_counts(PacSum_df).show()

root
 |-- CMTE_ID: string (nullable = true)
 |-- CMTE_NM: string (nullable = true)
 |-- CMTE_TP: string (nullable = true)
 |-- CMTE_FILING_FREQ: string (nullable = true)
 |-- TTL_RECEIPTS: double (nullable = true)
 |-- TRANS_FROM_AFF: double (nullable = true)
 |-- INDV_CONTRIB: double (nullable = true)
 |-- OTHER_POL_CMTE_CONTRIB: double (nullable = true)
 |-- TTL_DISB: double (nullable = true)
 |-- TRANF_TO_AFF: double (nullable = true)
 |-- LOAN_REPAY: double (nullable = true)
 |-- COH_BOP: double (nullable = true)
 |-- COH_COP: double (nullable = true)
 |-- DEBTS_OWED_BY: double (nullable = true)
 |-- CONTRIB_TO_OTHER_CMTE: double (nullable = true)
 |-- IND_EXP: double (nullable = true)
 |-- PTY_COORD_EXP: double (nullable = true)
 |-- CVG_END_DT: date (nullable = true)
 |-- NET_CASH_FLOW: double (nullable = true)
 |-- INDV_CONTRIB_RATIO: double (nullable = true)
 |-- YEAR: integer (nullable = true)

+-------+-------+-------+----------------+------------+--------------+------------+

### For Candidate master df

In [54]:
# Candidate master df

# Clean data
CandMast_df = base_clean(CandMast_df)
CandMast_df = clean_CandMast_df(CandMast_df)

#Type cast
CandMast_df = typecast_CandMast_df(CandMast_df)

# Feature Engineering
CandMast_df = features_CandMast_df(CandMast_df)

# Save Data in processed Layer 
write_df_to_parquet(CandMast_df,'CandMast_df')


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/CandMast_df


In [55]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/CandMast_df"
CandMast_df = spark.read.parquet(path)

# Check Schema for correct dtypes
CandMast_df.printSchema()

# Check for null values
null_counts(CandMast_df).show()

root
 |-- CAND_ID: string (nullable = true)
 |-- CAND_NAME: string (nullable = true)
 |-- CAND_PTY_AFFILIATION: string (nullable = true)
 |-- CAND_ELECTION_YR: integer (nullable = true)
 |-- CAND_OFFICE_ST: string (nullable = true)
 |-- CAND_OFFICE: string (nullable = true)
 |-- CAND_OFFICE_DISTRICT: integer (nullable = true)
 |-- CAND_ICI: string (nullable = true)
 |-- CAND_STATUS: string (nullable = true)
 |-- CAND_CITY: string (nullable = true)
 |-- CAND_ST: string (nullable = true)
 |-- CAND_ZIP: string (nullable = true)
 |-- IS_INCUMBENT: integer (nullable = true)
 |-- IS_CHALLENGER: integer (nullable = true)
 |-- IS_OPEN_SEAT: integer (nullable = true)

+-------+---------+--------------------+----------------+--------------+-----------+--------------------+--------+-----------+---------+-------+--------+------------+-------------+------------+
|CAND_ID|CAND_NAME|CAND_PTY_AFFILIATION|CAND_ELECTION_YR|CAND_OFFICE_ST|CAND_OFFICE|CAND_OFFICE_DISTRICT|CAND_ICI|CAND_STATUS|CAND_CITY|CA

### For Commitee master df

In [56]:
# Commitee master df

# Clean data
CommMast_df = base_clean(CommMast_df)
CommMast_df = clean_CommMast_df(CommMast_df)

# Feature Engineering
CommMast_df = features_CommMast_df(CommMast_df)

# Save Data in processed Layer 
write_df_to_parquet(CommMast_df,'CommMast_df')


                                                                                

Data written to: gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/CommMast_df


In [57]:
# Check data by reading back
path="gs://dataproc-staging-us-central1-40371648517-ndvgfbwp/notebooks/jupyter/FEC-Campaign-Analysis/FEC-Data/silver/CommMast_df"
CommMast_df = spark.read.parquet(path)

# Check Schema for correct dtypes
CommMast_df.printSchema()

# Check for null values
null_counts(CommMast_df).show()

root
 |-- CMTE_ID: string (nullable = true)
 |-- CMTE_NM: string (nullable = true)
 |-- TRES_NM: string (nullable = true)
 |-- CMTE_CITY: string (nullable = true)
 |-- CMTE_ST: string (nullable = true)
 |-- CMTE_ZIP: string (nullable = true)
 |-- CMTE_DSGN: string (nullable = true)
 |-- CMTE_TP: string (nullable = true)
 |-- CMTE_PTY_AFFILIATION: string (nullable = true)
 |-- CMTE_FILING_FREQ: string (nullable = true)
 |-- ORG_TP: string (nullable = true)
 |-- CONNECTED_ORG_NM: string (nullable = true)
 |-- CAND_ID: string (nullable = true)
 |-- IS_CAND_LINKED: integer (nullable = true)
 |-- IS_AUTH_CMTE: integer (nullable = true)
 |-- IS_PAC: integer (nullable = true)
 |-- IS_PARTY_CMTE: integer (nullable = true)

+-------+-------+-------+---------+-------+--------+---------+-------+--------------------+----------------+------+----------------+-------+--------------+------------+------+-------------+
|CMTE_ID|CMTE_NM|TRES_NM|CMTE_CITY|CMTE_ST|CMTE_ZIP|CMTE_DSGN|CMTE_TP|CMTE_PTY_AFFILI

#### Stop the spark session

In [58]:
spark.stop()