In [5]:
import pandas as pd
from neo4j import GraphDatabase
from tqdm.notebook import tqdm
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.config(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    )
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .config("spark.driver.memory", "10g")
    .config("spark.sql.shuffle.partitions", "10")
    .getOrCreate()
)

In [51]:
class Neo4jConnection:
    def __init__(self):
        self.uri = None
        self.username = 'neo4j'
        self.password = None

    def connect_to_graph(self):
        try:
            if self.password is None:
                self.driver = GraphDatabase.driver(self.uri)
            else:
                self.driver = GraphDatabase.driver(
                    self.uri, auth=(self.username, self.password), max_connection_lifetime=200
                )
            print("Connected to Neo4j")
        except Exception as e:
            print("Failed to establish connection", e)
    def close(self):
        if self.driver is not None:
            self.driver.close()

    def query(self, query, parameters=None, db=None):
        assert self.driver is not None, "Driver not initialized!"
        session = None
        response = None
        try:
            session = (
                self.driver.session(database=db)
                if db is not None
                else self.driver.session()
            )
            response = list(session.run(query, parameters))
        except Exception as e:
            if "equivalent constraint already exists" in str(e):
                print("Uniqueness constraint already exists")
            else:
                print("Query failed:", e)
        finally:
            if session is not None:
                session.close()
        return response

class FECParser:
    def __init__(self):
        self.conn = None
        self.expenditure_data = 'raw_data/oppexp.txt'
        self.minimum_transaction = 5000
        self.candidate_list = 'raw_data/cn.txt'
        self.committee_list = 'raw_data/cm.txt'
        self.committee_contributions_filepath = 'raw_data/itpas2.txt'
        self.candidate_committee_linkages_filepath = 'raw_data/ccl.txt'
        self.committee_transactions_filepath = 'raw_data/itoth.txt'
        self.committees = []
        self.entities = []
        self.DISBURSES_TO = []
        self.candidates = []

    def load_expenditures(self): 
        self.operating_expenditures = pd.read_csv(self.bulk_data, sep='|', header=None)
        #For the sake of simplicity, I am manually labeling these columns here. Each bulk file is accompanied by a header file, from which these columns were derived.
        self.operating_expenditures.columns=['CMTE_ID',
         'AMNDT_IND',
         'RPT_YR',
         'RPT_TP',
         'IMAGE_NUM',
         'LINE_NUM',
         'FORM_TP_CD',
         'SCHED_TP_CD',
         'NAME',
         'CITY',
         'STATE',
         'ZIP_CODE',
         'TRANSACTION_DT',
         'TRANSACTION_AMT',
         'TRANSACTION_PGI',
         'PURPOSE',
         'CATEGORY',
         'CATEGORY_DESC',
         'MEMO_CD',
         'MEMO_TEXT',
         'ENTITY_TP',
         'SUB_ID',
         'FILE_NUM',
         'TRAN_ID',
         'BACK_REF_TRAN_ID',
         'placeholder']
        return self
    
    def filter_by_transaction_size(self):
        self.operating_expenditures = self.operating_expenditures.loc[self.operating_expenditures['TRANSACTION_AMT'] >= self.minimum_expenditure]
        return self

    def load_candidates(self):
        self.candidate_data =  pd.read_csv('raw_data/cn.txt', sep='|', header=None)
        self.candidate_data.columns = ['CAND_ID',
        'CAND_NAME',
        'CAND_PTY_AFFILIATION',
        'CAND_ELECTION_YR',
        'CAND_OFFICE_ST',
        'CAND_OFFICE',
        'CAND_OFFICE_DISTRICT',
        'CAND_ICI',
        'CAND_STATUS',
        'CAND_PCC',
        'CAND_ST1',
        'CAND_ST2',
        'CAND_CITY',
        'CAND_ST',
        'CAND_ZIP']
        return self
    
    def write_candidates(self):
        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0
            for index, rows in tqdm(self.candidate_data.iterrows(), total = len(self.candidate_data), desc = 'Inserting candidates'):
                candidate_id = '"' + str(rows['CAND_ID']) + '"'
                candidate_name = '"' + str(rows['CAND_NAME']).replace('"', '') + '"'
                candidate_party = '"' + str(rows['CAND_PTY_AFFILIATION']).replace('"', '') + '"'
                candidate_office_state = '"' + str(rows['CAND_OFFICE_ST']).replace('"', '') + '"'
                candidate_district = '"' + str(str(rows['CAND_OFFICE_DISTRICT'])).replace('"', '') + '"'
                candidate_office = str(rows['CAND_OFFICE']).replace('"', '').replace('H', 'house').replace('P', 'president').replace('S', 'senate') #Expand these labels to be more easily read
                candidate_ici = '"' + str(rows['CAND_ICI']).replace('"', '') + '"'
                candidate_status = '"' + str(rows['CAND_STATUS']).replace('"', '') + '"'
                
                
                cypher_statement = f"""MERGE (can:candidate:{candidate_office} {{candidate_id: {candidate_id}}})
                                    ON CREATE SET can.candidate_name = {candidate_name},
                                    can.candidate_party = {candidate_party},
                                    can.candidate_office_state = {candidate_office_state},
                                    can.candidate_district = {candidate_district},
                                    can.candidate_office = {'"' + candidate_office + '"'},
                                    can.candidate_ici = {candidate_ici},
                                    can.candidate_status = {candidate_status}
                                    """
                tx.run(cypher_statement)
                commit_limit +=1
                if commit_limit == 100:
                    tx.commit()
                    commit_limit = 0
                    tx = session.begin_transaction()
        session.close()
        return self

    def load_committees(self):
        self.committees_data = pd.read_csv(self.committee_list, sep='|', header=None)
        self.committees_data.columns = [
        'CMTE_ID',	
        'CMTE_NM',	
        'TRES_NM',	
        'CMTE_ST1',	
        'CMTE_ST2',	
        'CMTE_CITY',	
        'CMTE_ST',	
        'CMTE_ZIP',	
        'CMTE_DSGN',	
        'CMTE_TP',	
        'CMTE_PTY_AFFILIATION',	
        'CMTE_FILING_FREQ',	
        'ORG_TP',	
        'CONNECTED_ORG_NM',	
        'CAND_ID'
        ]
        self.committees_data.fillna('Not Included in Data', inplace=True)
        return self

    def write_committees(self):
        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0
            for index, rows in tqdm(self.committees_data.iterrows(), total = len(self.committees_data), desc='Writing Committees'):
                committee_id = '"' + str(rows['CMTE_ID']) + '"'
                committee_name = '"' + str(rows['CMTE_NM']).replace('"', '') + '"'
                committee_party = '"' + str(rows['CMTE_PTY_AFFILIATION']) + '"'
                committee_type = '"' + str(rows['ORG_TP']) + '"'
                committee_designation = '"' + str(rows['CMTE_DSGN']).replace('"', '') + '"'
                cypher_statement = f"""MERGE (c:committee {{committee_id: {committee_id}}})
                                ON CREATE SET c.committee_name = {committee_name},
                                c.committee_party = {committee_party},
                                c.committee_type = {committee_type},
                                c.committee_designation = {committee_designation}
                                """
                tx.run(cypher_statement)
                commit_limit +=1
                if commit_limit == 100:
                    tx.commit()
                    commit_limit = 0
                    tx = session.begin_transaction()
        session.close()
        return self

    def write_affiliated_organizations(self):
        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0
            for index, rows in tqdm(self.committees_data.iterrows(), total=len(self.committees_data), desc='Writing affiliated organizations'):
                connected_organization = str(rows['CONNECTED_ORG_NM'])
                if connected_organization != 'Not Included in Data' and connected_organization != "NONE":
                    cypher_statement = f"""MERGE (o:organization {{organization_name: {'"' + connected_organization.replace('"', '') + '"'}}})"""
                    tx.run(cypher_statement)
                        
                    commit_limit +=1
                    if commit_limit == 100:
                        tx.commit()
                        commit_limit = 0
                        tx = session.begin_transaction()
            session.close()
            return self 

    def write_AFFILIATED_WITH(self):
        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0
            for index, rows in tqdm(self.committees_data.iterrows(), total = len(self.committees_data), desc = 'Writing committee affiliaitons'):
                committee_id = '"' + str(rows['CMTE_ID']) + '"'
                connected_organization = str(rows['CONNECTED_ORG_NM'])
                if connected_organization != 'Not Included in Data' and connected_organization != "NONE":
                    cypher_statement = f"""MATCH (c:committee {{committee_id: {committee_id}}})
                                            MATCH (o:organization {{organization_name: {'"' + connected_organization.replace('"', '') + '"'}}})
                                            MERGE (c)-[:AFFILIATED_WITH]->(o)"""
                    tx.run(cypher_statement)
                        
                    commit_limit +=1
                    if commit_limit == 100:
                        tx.commit()
                        commit_limit = 0
                        tx = session.begin_transaction()
            session.close()
            return self 

    def write_PRINCIPAL_COMMITTEE_OF(self):
        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0
            for index, rows in tqdm(self.candidate_data.iterrows(), total=len(self.candidate_data), desc='Inserting primary committee relationships'):
                candidate_id = '"' + rows['CAND_ID'] + '"'
                candidate_pcc = '"' + str(rows['CAND_PCC']).replace('"', '') + '"'
                cypher_statement = f"""MATCH (can:candidate {{candidate_id: {candidate_id}}})
                                    MATCH (c:committee {{committee_id: {candidate_pcc}}})
                                    MERGE (c)-[:PRINCIPAL_COMMITTEE_OF]->(can)"""        
                tx.run(cypher_statement)
                commit_limit +=1
                if commit_limit == 100:
                    tx.commit()
                    commit_limit = 0
                    tx = session.begin_transaction()
        session.close()
        return self

    def parse_expenditures(self):
        for index, rows in tqdm(self.operating_expenditures.iterrows(), total = len(self.operating_expenditures), desc='parsing data'):
            committee_id = rows['CMTE_ID']
            self.committees.append(committee_id)
            entity_name = rows['NAME']
            entity_type = rows['ENTITY_TP']
            entity_row = {}
            entity_row['entity_name'] = entity_name
            entity_row['entity_type'] = entity_type
            self.entities.append(entity_row)
            DISBURSES_TO_row = {}
            DISBURSES_TO_row['committee_id'] = committee_id
            DISBURSES_TO_row['entity_name'] = entity_name
            DISBURSES_TO_row['amount'] = rows['TRANSACTION_AMT']
            DISBURSES_TO_row['purpose'] = rows['PURPOSE']
            DISBURSES_TO_row['category'] = rows['CATEGORY']
            DISBURSES_TO_row['date'] = rows['TRANSACTION_DT']
            DISBURSES_TO_row['transaction_id'] = rows['TRAN_ID']
            self.DISBURSES_TO.append(DISBURSES_TO_row)
        return self
    
    def write_entities(self):
        with self.conn.driver.session() as session:
                tx  = session.begin_transaction()
                commit_limit = 0
                for item in tqdm(self.entities, total = len(self.entities), desc='Inserting entities'):
                    try:
                        entity_name = '"' + str(item['entity_name']).replace('"', '').lower() + '"'
                        entity_type = str(item['entity_type']).replace('"', '').lower()
                        cypher_statement = f"""MERGE (e:entity:{entity_type} {{entity_name: {entity_name}}})
                        ON CREATE SET e.entity_type = {'"' + entity_type + '"'}"""                        
                        tx.run(cypher_statement)
                        commit_limit +=1
                        if commit_limit == 100:
                            tx.commit()
                            commit_limit = 0
                            tx = session.begin_transaction()
                    except Exception as e:
                        print(str(e))
        session.close()
        return self  

    def write_DISBURSES_TO(self):
        with self.conn.driver.session() as session:
                tx  = session.begin_transaction()
                commit_limit = 0
                for item in tqdm(self.DISBURSES_TO, total = len(self.DISBURSES_TO), desc='Inserting entities'):
                    try:
                        entity_name = '"' + str(item['entity_name']).replace('"', '').lower() + '"'
                        committee_id = '"' + str(item['committee_id']) + '"'
                        purpose = '"' + str(item['purpose']).replace('"', '') + '"'
                        category = '"' + str(item['category']).replace('"', '') + '"'
                        date = '"' + str(item['date']) + '"'
                        transaction_id = '"' + str(item['transaction_id']) + '"'
                        amount = item['amount']

                        cypher_statement = f"""MATCH (e:entity {{entity_name: {entity_name}}})
                                            MATCH (c:committee {{committee_id: {committee_id}}})
                                            MERGE (c)-[d:DISBURSES_TO]->(e)
                                            SET d.amount = {amount},
                                            d.purpose = {purpose},
                                            d.category = {category},
                                            d.date = {date},
                                            d.transaction_id = {transaction_id}
                                            """
                        tx.run(cypher_statement)
                        commit_limit +=1
                        if commit_limit == 100:
                            tx.commit()
                            commit_limit = 0
                            tx = session.begin_transaction()
                    except Exception as e:
                        print(str(e))
        session.close()
        return self

    def load_committee_contributions(self):
        self.committee_contributions = pd.read_csv(self.committee_contributions_filepath, sep='|', header=None)
        self.committee_contributions.columns = [
        'CMTE_ID',
        'AMNDT_IND',
        'RPT_TP',
        'TRANSACTION_PGI',
        'IMAGE_NUM',
        'TRANSACTION_TP',
        'ENTITY_TP',
        'NAME',
        'CITY',
        'STATE',
        'ZIP_CODE',
        'EMPLOYER',
        'OCCUPATION',
        'TRANSACTION_DT',
        'TRANSACTION_AMT',
        'OTHER_ID',
        'CAND_ID',
        'TRAN_ID',
        'FILE_NUM',
        'MEMO_CD',
        'MEMO_TEXT',
        'SUB_ID']
        return self

    def handle_timeout(default_response):
        def decorator(func):
            
    def write_COMMITTEE_CONTRIBUTES_TO(self, starting_point=0):
        with self.conn.driver.session() as session:
                tx  = session.begin_transaction()
                commit_limit = 0
                insert_data = self.committee_contributions[starting_point:]
                for index, rows in tqdm(self.insert_data.iterrows(), total = len(self.committee_contributions), desc='Inserting committee contributions'):
                    source_committee_id = '"' + str(rows['CMTE_ID']) + '"'
                    date = '"' + str(rows['TRANSACTION_DT']) + '"'
                    amount = rows['TRANSACTION_AMT']
                    candidate_id = '"' + str(rows['CAND_ID']) + '"'
                    target_committee_id = '"' + str(rows['OTHER_ID']) + '"'
                    transaction_id = '"' + str(rows['TRAN_ID']) + '"'
                    transaction_type = '"' + str(rows['TRANSACTION_TP']) + '"'

                    cypher_statement = f"""MATCH (c1:committee {{committee_id: {source_committee_id}}})
                                MATCH (c2:committee {{committee_id: {target_committee_id}}})
                                MERGE (c1)-[t:CONTRIBUTES_TO]->(c2)
                                SET t.date = {date},
                                t.transaction_id = {transaction_id},
                                t.amount = {amount},
                                t.transaction_type = {transaction_type}"""
                    tx.run(cypher_statement)
                    commit_limit +=1
                    if commit_limit == 100:
                        tx.commit()
                        commit_limit = 0
                        tx = session.begin_transaction()
                
        session.close()
        return self

    def write_LINKED_TO(self):
        candidate_committee_linkages = pd.read_csv(self.candidate_committee_linkages_filepath, sep='|', header=None)
        candidate_committee_linkages.columns = [
        'CAND_ID',	
        'CAND_ELECTION_YR',
        'FEC_ELECTION_YR',
        'CMTE_ID',
        'CMTE_TP',
        'CMTE_DSGN',
        'LINKAGE_ID']

        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0 
            for index, rows in tqdm(candidate_committee_linkages.iterrows(), total = len(candidate_committee_linkages), desc='Writing candidate-committee linkages'):
                candidate_id = '"' + str(rows['CAND_ID']) + '"'
                committee_id = '"' + str(rows['CMTE_ID']) + '"'
                linkage_id = '"' + str(rows['LINKAGE_ID']) + '"'
                cypher_statement = f"""MATCH (can:candidate {{candidate_id: {candidate_id}}})
                                    MATCH (c:committee {{committee_id: {committee_id}}})
                                    MERGE (can)-[l:LINKED_TO]->(c)
                                    SET l.linkage_id = {linkage_id}"""
                tx.run(cypher_statement)
                commit_limit +=1
                if commit_limit == 100:
                    tx.commit()
                    commit_limit = 0
                    tx = session.begin_transaction()
                
        session.close()
        return self

    def load_committee_transactions(self):
        self.committee_transactions = pd.read_csv(self.committee_transactions_filepath, sep='|', header=None)
        self.committee_transactions.columns = [
        'CMTE_ID',
        'AMNDT_IND',
        'RPT_TP',
        'TRANSACTION_PGI',
        'IMAGE_NUM',
        'TRANSACTION_TP',
        'ENTITY_TP',
        'NAME',
        'CITY',
        'STATE',	
        'ZIP_CODE',	
        'EMPLOYER',
        'OCCUPATION',	
        'TRANSACTION_DT',
        'TRANSACTION_AMT',
        'OTHER_ID',
        'TRAN_ID',
        'FILE_NUM',
        'MEMO_CD',
        'MEMO_TEXT',
        'SUB_ID']
        return self

    def write_transaction_entities(self):
        
        #There is almost certainly a more elegant way to do this than looping twice, but the goal is to eliminate duplicates that jam up Neo4j transaction time
        entities_list = []
        for index, rows in tqdm(self.committee_transactions.iterrows(), total = len(self.committee_transactions), desc="Inserting committee transactions"):
                entity_row = {}
                entity_row['entity_name'] = str(rows['NAME']).replace('"', '')
                entity_row['entity_type'] = str(rows['ENTITY_TP'])
                if rows['ENTITY_TP'] == 'IND':
                    if rows['OCCUPATION'] is not None:
                        entity_row['occupation'] = str(rows['OCCUPATION']).replace('"', '')
                    else:
                        entity_row['occupation'] = 'None Listed'
                    
                    if rows['EMPLOYER'] is not None:
                        entity_row['employer'] = str(rows['EMPLOYER']).replace('"', '')
                    else:
                        entity_row['employer'] = 'None Listed'
                else:
                    entity_row['occupation'] = 'N/A'
                    entity_row['employer'] = 'N/A'
                
                entities_list.append(entity_row)

        entities_df = pd.DataFrame(entities_list)
        entities_df.drop_duplicates(inplace=True)

        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0
            for index, rows in entities_df.iterrows():
                cypher_statement = f"""MERGE (e:entity {{entity_name: {'"' + rows['entity_name'] + '"'}}})
                                    ON CREATE SET e.entity_type = {'"' + rows['entity_type'] + '"'},
                                    e.occupation = {'"' + rows['occupation'] + '"'},
                                    e.employer = {'"' + rows['employer'] + '"'}"""
                tx.run(cypher_statement)
                commit_limit +=1
                if commit_limit == 100:
                    tx.commit()
                    commit_limit = 0
                    tx = session.begin_transaction()
                
        session.close()
        return self

    def write_committee_transactions(self):
        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0
            for index, rows in tqdm(self.committee_transactions.iterrows(), total=len(self.committee_transactions), desc="Inserting committee transactions"):
                committee_id = '"' + str(rows['CMTE_ID']) + '"'
                amount = rows['TRANSACTION_AMT']
                date = '"' + str(rows['TRANSACTION_DT']) + '"'
                transaction_type = '"' + str(rows['TRANSACTION_TP']) + '"'
                report_type = '"' + str(rows['RPT_TP']) + '"'
                entity_name = '"' + str(rows['NAME']).replace('"', '') + '"'
                transaction_id = '"' + str(rows['TRAN_ID']) + '"'
                memo_text = '"' + str(rows['MEMO_TEXT']).replace('"', '') + '"'

                cypher_statement = f"""MATCH (c:committee {{committee_id: {committee_id}}})
                                MATCH (e:entity {{entity_name: {entity_name}}})
                                MERGE (c)-[t:DISBURSES_TO]->(e)
                                SET t.amount = {amount},
                                t.date = {date},
                                t.transaction_type = {transaction_type},
                                t.report_type = {report_type},
                                t.transaction_id = {transaction_id}
                                t.memo_text = {memo_text}
                                """
                tx.run(cypher_statement)
                commit_limit +=1
                if commit_limit == 100:
                    tx.commit()
                    commit_limit = 0
                    tx = session.begin_transaction()
                
        session.close()
        return self
    
    def load_individual_contributions(self):
        #This dataset is so enormous that it requires PySpark to effectively filter
        individual_contributions = spark.read.csv('raw_data/itcont.txt', sep='|')
        new_columns = [
        'CMTE_ID',
        'AMNDT_IND',
        'RPT_TP',
        'TRANSACTION_PGI',
        'IMAGE_NUM',
        'TRANSACTION_TP',
        'ENTITY_TP',
        'NAME',
        'CITY',
        'STATE',
        'ZIP_CODE',
        'EMPLOYER',
        'OCCUPATION',
        'TRANSACTION_DT',
        'TRANSACTION_AMT',
        'OTHER_ID',
        'TRAN_ID',
        'FILE_NUM',
        'MEMO_CD',
        'MEMO_TEXT',
        'SUB_ID']

        individual_contributions = individual_contributions.toDF(*new_columns)
        filtered = individual_contributions.filter(individual_contributions['TRANSACTION_AMT'] >= self.minimum_transaction)
        self.individual_contributions = filtered.toPandas()
        return self

    def write_contributors(self):
        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0
            for index, rows in tqdm(self.individual_contributions.iterrows(), total = len(self.individual_contributions), desc='Inserting individual Donors'):
                entity_name = '"' + str(rows['NAME']).replace('"', '') + '"'
                entity_type = '"' + str(rows['ENTITY_TP']) + '"'
                if rows['ENTITY_TP'] == 'IND':
                    if rows['OCCUPATION'] is not None:
                        occupation = str(rows['OCCUPATION']).replace('"', '')
                    else:
                        occupation = 'None Listed'

                    if rows['EMPLOYER'] is not None:
                        employer = str(rows['EMPLOYER']).replace('"', '')
                    else:
                        employer = 'None Listed'
                else:
                    occupation = 'N/A'
                    employer = 'N/A'

                cypher_statement = f"""MERGE (e:entity {{entity_name: {entity_name}}})
                                    ON CREATE SET e.entity_type = {entity_type},
                                    e.occupation = {'"' + occupation + '"'},
                                    e.employer = {'"' + employer + '"'}"""
                tx.run(cypher_statement)
                commit_limit +=1
                if commit_limit == 100:
                    tx.commit()
                    commit_limit = 0
                    tx = session.begin_transaction()
        session.close()
        return self
    
    def write_CONTRIBUTES_TO(self):
        with self.conn.driver.session() as session:
            tx  = session.begin_transaction()
            commit_limit = 0
            for index, rows in tqdm(self.individual_contributions.iterrows(), total = len(self.individual_contributions), desc='Inserting donations'):
                entity_name = '"' + str(rows['NAME']).replace('"', '') + '"'
                committee_id = '"' + str(rows['CMTE_ID']).replace('"', '') + '"'
                transaction_type = '"' + str(rows['TRANSACTION_TP']) + '"'
                date = int(str(rows['TRANSACTION_DT']).replace('"', ''))
                amount = rows['TRANSACTION_AMT']
                transaction_id = '"' + str(rows['TRAN_ID']) + '"'
                
                cypher_statement = f"""MATCH (e:entity {{entity_name: {entity_name}}})
                                    MATCH (c:committee {{committee_id: {committee_id}}})
                                    MERGE (e)-[r:CONTRIBUTES_TO]->(c)
                                    SET r.transaction_type = {transaction_type},
                                    r.date = {date},
                                    r.amount = {amount},
                                    r.transaction_id = {transaction_id}"""
                tx.run(cypher_statement)
                commit_limit +=1
                if commit_limit == 100:
                    tx.commit()
                    commit_limit = 0
                    tx = session.begin_transaction()
        session.close()
        return self
    
    def run_inserts(self):
        self.load_candidates()
        self.write_candidates()
        self.load_committees()
        self.write_committees()
        self.write_affiliated_organizations()
        self.write_AFFILIATED_WITH()
        self.write_PRINCIPAL_COMMITTEE_OF()
        self.load_committee_contributions()
        self.write_COMMITTEE_CONTRIBUTES_TO()
        self.write_LINKED_TO()
        self.load_committee_transactions()
        self.write_committee_transactions()
        self.load_individual_contributions()
        self.load_CONTRIBUTES_TO()
        
        return self

Failed to write data to connection ResolvedIPv4Address(('34.121.155.65', 7687)) (ResolvedIPv4Address(('34.121.155.65', 7687)))


In [52]:
conn = Neo4jConnection()
conn.uri = 'bolt://myneo4j:7687'
conn.password = 'password'
conn.connect_to_graph()

Connected to Neo4j


In [53]:
parser = FECParser()
parser.conn = conn
parser.run_inserts()

Inserting committee contributions:   0%|          | 0/64511 [00:00<?, ?it/s]

Failed to read from defunct connection IPv4Address(('7668c613.databases.neo4j.io', 7687)) (ResolvedIPv4Address(('34.121.155.65', 7687)))


SessionExpired: Failed to read from defunct connection IPv4Address(('7668c613.databases.neo4j.io', 7687)) (ResolvedIPv4Address(('34.121.155.65', 7687)))