In [7]:
import os
import pprint
import json
import time
from datetime import timedelta
import numpy as np
import pandas as pd

# DB Import
import pymongo
import redis
import neo4j

# Data Load

In [2]:
df = pd.read_csv('./../data/sp500_preprocessed.csv')
df.head()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Name
0,2020-11-13,140.401337,142.357864,139.866226,141.96489,117.566223,2167272,MMM
1,2020-11-16,144.3311,145.392975,142.29097,145.367889,120.384392,2911423,MMM
2,2020-11-17,144.682281,145.903015,142.859528,145.183945,120.232063,3097520,MMM
3,2020-11-18,145.903015,146.070236,143.252502,143.311035,118.68103,3128497,MMM
4,2020-11-19,142.61705,143.712372,141.555191,143.486618,119.854362,3571256,MMM


# Load Raw Data to MongoDB

In [3]:
client = pymongo.MongoClient("mongodb://mongo:27017/")
db_list = client.list_database_names()
db_list

['acme_gourmet_meals', 'admin', 'config', 'local']

In [6]:
# DB / Collection
db = client["sp500"]           # DB 이름: sp500
collection = db["prices"]      # 컬렉션 이름: prices

# dictionarize by row -> list[dict] 
records = df.to_dict("records")

# 5) 기존 컬렉션 비우고 다시 넣고 싶으면:
collection.delete_many({})

# 6) MongoDB에 insert
result = collection.insert_many(records)

print(f"Inserted {len(result.inserted_ids)} documents into MongoDB.")

Inserted 329803 documents into MongoDB.


# MongoDB to Neo4j

### 1. Read Raw Data from MongoDB & Preprocess

In [11]:
client = pymongo.MongoClient("mongodb://mongo:27017/")
db = client["sp500"]          
prices_collection = db["prices"]           

cursor = prices_collection.find({}, {"_id": 0})
df = pd.DataFrame(list(cursor))

print(f"Data Sample from MongoDB: {len(df)} rows")
df.head()

Data Sample from MongoDB: 329803 rows


Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Name
0,2020-11-13,140.401337,142.357864,139.866226,141.96489,117.566223,2167272,MMM
1,2020-11-16,144.3311,145.392975,142.29097,145.367889,120.384392,2911423,MMM
2,2020-11-17,144.682281,145.903015,142.859528,145.183945,120.232063,3097520,MMM
3,2020-11-18,145.903015,146.070236,143.252502,143.311035,118.68103,3128497,MMM
4,2020-11-19,142.61705,143.712372,141.555191,143.486618,119.854362,3571256,MMM


In [14]:
def preprocess_basic(df):
    df["Date"] = pd.to_datetime(df["Date"])
    df = df.sort_values(["Name", "Date"])
    return df

def calculate_return(df):
    df["return"] = (
        df.groupby("Name")["Adj Close"]
          .pct_change()
    )
    df = df.dropna(subset=["return"])  # first day is NaN
    return df

def pivot_return(df):
    # pivot: row = Date, col = Name, value = return
    pivot = df.pivot(index="Date", columns="Name", values="return")
    pivot = pivot.dropna(axis=0, how="any")
    return pivot

In [13]:
df = preprocess_basic(df)
df = calculate_return(df)
df.head()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Name,return
17557,2020-11-16,12.87,12.96,12.41,12.79,12.79,104867800,AAL,0.044935
17558,2020-11-17,12.48,12.81,12.23,12.7,12.7,61956500,AAL,-0.007037
17559,2020-11-18,12.91,13.36,12.73,12.74,12.74,99888500,AAL,0.00315
17560,2020-11-19,12.73,13.04,12.63,12.79,12.79,58809400,AAL,0.003925
17561,2020-11-20,12.8,12.91,12.46,12.53,12.53,58685500,AAL,-0.020328


In [15]:
pivot = pivot_return(df)
pivot.head()

Name,AAL,AAPL,ABBV,ABT,ACN,ADBE,ADM,AFL,AIG,AIZ,...,WAB,WAT,WDC,WHR,WMB,WRB,WST,WYNN,XOM,ZBRA
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2020-11-16,0.044935,0.008721,-0.006866,0.009503,0.013111,-0.017876,0.011031,0.041185,0.031351,0.026964,...,0.028453,-0.003482,0.05088,-0.000689,0.009481,0.023377,0.003574,0.02766,0.05765,0.000347
2020-11-17,-0.007037,-0.007565,0.008642,-0.005367,-0.004613,0.015186,-0.004959,0.001164,-0.001819,0.010794,...,-0.002515,-0.002804,-0.019734,0.013097,0.029659,-0.00525,-0.017876,-0.005383,0.013365,-0.015001
2020-11-18,0.00315,-0.011391,-0.004939,-0.029099,-0.016775,-0.018122,-0.010643,-0.003256,0.003384,-0.02143,...,0.017369,-0.028115,-0.005384,0.016224,-0.039846,-0.017116,-0.019178,0.014963,-0.037755,-0.009419
2020-11-19,0.003925,0.005168,0.009623,0.011205,0.014475,0.015104,-0.007104,0.020065,-0.000259,-0.005161,...,-0.003855,0.007388,0.002824,-0.001866,0.008,-0.029749,-0.000249,0.032518,0.005107,0.061645
2020-11-20,-0.020328,-0.010957,0.011738,-0.000991,0.000987,-0.007483,-0.006337,0.002973,0.001816,-0.007634,...,-0.018936,-0.004197,-0.004459,0.001818,0.008929,-0.02064,0.012482,-0.035038,-0.0123,-0.009487


In [16]:
# Pearson correlation by Name
corr = pivot.corr()
corr.head()

Name,AAL,AAPL,ABBV,ABT,ACN,ADBE,ADM,AFL,AIG,AIZ,...,WAB,WAT,WDC,WHR,WMB,WRB,WST,WYNN,XOM,ZBRA
Name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
AAL,1.0,0.358587,0.105113,0.154355,0.340835,0.277675,0.193886,0.330466,0.398398,0.27551,...,0.44471,0.262909,0.391045,0.384523,0.219133,0.203442,0.115683,0.492826,0.214489,0.42612
AAPL,0.358587,1.0,0.155794,0.302663,0.493741,0.51844,0.181448,0.278763,0.288941,0.268568,...,0.401087,0.35519,0.384953,0.367298,0.186899,0.180615,0.28252,0.357867,0.180384,0.490968
ABBV,0.105113,0.155794,1.0,0.317016,0.197907,0.117421,0.202256,0.256149,0.24646,0.21893,...,0.178418,0.263413,0.105617,0.118685,0.199857,0.266136,0.168508,0.093407,0.214454,0.118142
ABT,0.154355,0.302663,0.317016,1.0,0.375968,0.317319,0.19176,0.291029,0.252522,0.247224,...,0.22697,0.401943,0.156563,0.218799,0.156394,0.246784,0.29874,0.14855,0.08064,0.269295
ACN,0.340835,0.493741,0.197907,0.375968,1.0,0.519886,0.202839,0.363977,0.368828,0.324126,...,0.441089,0.416808,0.328423,0.328969,0.229416,0.283415,0.313081,0.315691,0.18435,0.480112


In [43]:
def get_edges_by_threshold(corr, threshold=0.8):
    new_corr = corr.rename_axis(index="src", columns="dst")
    edges = (
        new_corr.where(lambda x: x.abs() >= threshold)   
            .stack()                                 # stack by index(src, dst)
            .reset_index()                           
            .rename(columns={0: "corr"})
    )

    # delete themselves
    edges = edges[edges["src"] != edges["dst"]]

    # left only one pair between (A,B) and (B,A)
    edges["pair_key"] = edges.apply(
        lambda row: tuple(sorted([row["src"], row["dst"]])),
        axis=1
    )
    edges = edges.drop_duplicates(subset=["pair_key"])
    edges = edges.reset_index(drop=True)
    return edges

In [44]:
edges = get_edges_by_threshold(corr)
edges.head()

Unnamed: 0,src,dst,corr,pair_key
0,AMP,RJF,0.80805,"(AMP, RJF)"
1,APA,DVN,0.842056,"(APA, DVN)"
2,APA,FANG,0.822818,"(APA, FANG)"
3,AVB,EQR,0.935682,"(AVB, EQR)"
4,BAC,C,0.814273,"(BAC, C)"


### 2. Load Data to Neo4j

In [45]:
# To get driver, use 7687 port(Bolt)
driver = neo4j.GraphDatabase.driver(uri="neo4j://neo4j:7687", auth=("neo4j","ucb_mids_w205"))

In [47]:
def create_schema(tx):
    # Stock label name unique constraint
    tx.run("""
        CREATE CONSTRAINT IF NOT EXISTS
        FOR (s:Stock)
        REQUIRE s.name IS UNIQUE
    """)

def create_stock_nodes(tx, names):
    tx.run("""
        UNWIND $names AS name
        MERGE (s:Stock {name: name})
    """, names=names)

def create_correlation_edges(tx, edge_list):
    tx.run("""
        UNWIND $edge_list AS edge
        MATCH (a:Stock {name: edge.src})
        MATCH (b:Stock {name: edge.dst})
        MERGE (a)-[r:CORRELATED_WITH]-(b)
        SET r.corr = edge.corr
    """, edge_list=edge_list)

In [48]:
with driver.session(database="neo4j") as session:
    # Create Schema
    session.execute_write(create_schema)

    # Create Nodes
    tickers = df["Name"].unique().tolist()
    session.execute_write(create_stock_nodes, tickers)

    # Create Edges
    edge_records = edges[["src", "dst", "corr"]].to_dict("records")
    print("Edge number to load to Neo4j:", len(edge_records))

    # If data get larger, upload by batch_size will be needed
    batch_size = 1000
    for i in range(0, len(edge_records), batch_size):
        batch = edge_records[i:i+batch_size]
        session.execute_write(create_correlation_edges, batch)
        print(f"{i + len(batch)} / {len(edge_records)} edges loaded")

driver.close()

Neo4j에 올릴 엣지 수: 44
44 / 44 edges loaded
