Imports everything from the neo4j driver

In [None]:
import csv
import json
import os
from neo4j import GraphDatabase

neo4j_url = os.getenv("NEO4J_CONNECTION_URL")
neo4j_user = os.getenv("NEO4J_USERNAME")
neo4j_password = os.getenv("NEO4J_PASSWORD")
print(neo4j_url, neo4j_user, neo4j_password)

driver = GraphDatabase.driver(neo4j_url, auth=(neo4j_user, neo4j_password))
session = driver.session()

Import customers into the database

In [5]:
with open("customers.csv", "r") as f:
  reader = csv.DictReader(f)
  for row in reader:
    session.run("CREATE (n:Customer {customer_id: $customer_id, owner_name: $owner_name, address1: $address1, address2: $address2, city: $city, state: $state, zip: $zip})", customer_id=row["customer_id"], owner_name=row["owner_name"], address1=row["address1"], address2=row["address2"], city=row["city"], state=row["state"], zip=row["zip"])



Import accounts into Neo4j and match them with customers

In [None]:
with open("accounts.csv", "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            session.run("CREATE (n:Account {account_id: $account_id, customer_id: $customer_id, account_type: $account_type, channel: $channel})", account_id=row["account_id"], customer_id=row["customer_id"], account_type=row["account_type"], channel=row["channel"])
            session.run("MATCH (c:Customer {customer_id: $customer_id}), (a:Account {account_id: $account_id}) CREATE (c)-[:HAS]->(a)", customer_id=row["customer_id"], account_id=row["account_id"])

In [None]:
with open("funds.csv", "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            session.run("CREATE (n:Fund {fund_name: $fund_name, ticker: $ticker, assets: $assets, manager: $manager, inception_date: $inception_date, company: $company, expense_ratio: $expense_ratio})", fund_name=row["fund_name"], ticker=row["ticker"], assets=row["assets"], manager=row["manager"], inception_date=row["inception_date"], company = row["company"], expense_ratio=row["expense_ratio"])

In [None]:
with open("stock_ticker.csv", "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            session.run("CREATE (n:Stock {ticker: $ticker, holding_company: $holding_company})", ticker=row["ticker"], holding_company=row["holding_company"])

In [None]:
with open("account_purchases.csv", "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            session.run("MATCH (a:Account {account_id: $account_id}), (f:Fund {ticker: $ticker}) CREATE (a)-[:PURCHASED {quantity: $quantity, purchase_date: $purchase_date}]->(f)", account_id=row["account_id"], ticker=row["ticker"], quantity=row["number_of_shares"], purchase_date=row["purchase_date"])

In [None]:
with open("account_purchases.csv", "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            session.run("MATCH (a:Account {account_id: $account_id}), (s:Stock {ticker: $ticker}) CREATE (a)-[:PURCHASED {quantity: $quantity, purchase_date: $purchase_date}]->(s)", account_id=row["account_id"], ticker=row["ticker"], quantity=row["number_of_shares"], purchase_date=row["purchase_date"])

In [None]:
with open("fund_holdings.csv", "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            session.run("MATCH (f:Fund {ticker: $fund_ticker}), (s:Stock {ticker: $holding_ticker}) CREATE (f)-[:HOLDS {percent: $percent}]->(s)", fund_ticker=row["fund_ticker"], holding_ticker=row["holding_ticker"], percent=row["percentage"])

In [None]:
# Function to process a batch of rows
def process_batch(batch):
    # Assuming you have a function to create a session
    # You might need to adjust the creation of the session depending on your setup
    driver = GraphDatabase.driver(neo4j_url, auth=(neo4j_user, neo4j_password))
    session = driver.session()
    for row in batch:
        session.run("CREATE (n:DayClose {ticker: $ticker, date: $date, close: $close, volume: $volume, open: $open, high: $high, low: $low})", 
                    ticker=row["ticker"], date=row["date"], close=row["close"], volume=row["volume"], open=row["open"], high=row["high"], low=row["low"])
        session.run("MATCH (s:Stock {ticker: $ticker}) CREATE (s)-[:DAILY_CLOSE]->(n)", ticker=row["ticker"])
    session.close()

# Function to read the CSV and divide it into batches
def process_csv_in_parallel(file_path, batch_size=1000):
    with open(file_path, "r") as f:
        reader = csv.DictReader(f)
        batch = []
        for row in reader:
            batch.append(row)
            if len(batch) >= batch_size:
                yield batch
                batch = []
        if batch:
            yield batch

# Main function to run the batches in parallel
def main():
    batches = process_csv_in_parallel("daily_close.csv")
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(process_batch, batches)
