In [2]:
import pandas as pd 
import numpy as np 
import os

# Load the dataset
data = spark.read.csv("wasb://bigdatahadoophdistorage1-container@bigdatahadoophdistorage1.blob.core.windows.net/stockdata_small.csv", header = True, mode = "DROPMALFORMED", inferSchema = True)

# Convert into pandas Dataframe
df = data.toPandas()

# Create Vertices Dataframe
node = df.loc[:,['Ticker','Name','Price']]
node.columns = ['id','id_name','id_price']
node.head(10)

# Convert into wide format table then again from table to pandas dataframe
dfwide = df.pivot(index='Date', columns = 'Ticker', values = 'PxChange')
df = pd.DataFrame(dfwide.to_records())

# Drop the first Date column for Correlation
df_drop = df.drop(['Date'], axis=1)

# Find the correlation between stocks
df_corr = df_drop.corr()

# Convert the correlation dataframe into matrix
matrix = df_corr.as_matrix()

# Find the number of rows and columns of the matrix
count_row = df_corr.shape[0]  
count_col = df_corr.shape[1]  

# Create empty Adjacency Data Frame with 3 columns as TickerA, TickerB and Correlation
adj_df = pd.DataFrame(columns=['TickerAA', 'dst', 'weight'])

# Create a node dataframe for graph frame
ticker = list(df_corr.index)

# Store the correlation values in the Adjacency Data Frame from correlation matrix
i = 0
count = 0;
for i in range(count_row):
    j = 0
    for j in range(i):
        if(j < i and j < count_col):
            adj_df.loc[count,"src"] = ticker[i]
            adj_df.loc[count,"dst"] = ticker[i-j-1]
            adj_df.loc[count,"weight"] = matrix[i,j]
            count = count + 1

adj_df.shape   # see Adjacency Dataframe shape

# Remove the column with NaN values
adj_df_drop = adj_df.drop(['TickerAA'], axis=1)

# Convert the correlation column to float
adj_df_drop["weight"]= adj_df_drop["weight"].astype(float)

# Two new columns to Adjacency Dataframe :- abs_weight and Color
adj_df_drop['abs_weight'] = abs(adj_df_drop['weight'])
adj_df_drop['Color'] = np.where(adj_df_drop['weight'] > 0, 'Red', 'Blue')

# Rearange Adjancy Dataframe columns
new_order = [2,0,1,3,4]
adjacency_df = adj_df_drop[adj_df_drop.columns[new_order]]

adjacency_df.head(10)

# Convert Link and Node dataframe into spark dataframe
edge = spark.createDataFrame(adjacency_df)
node = spark.createDataFrame(node)

# Save it into blob storage container
edge.repartition(1)\
.write.format("csv")\
.option("header", True)\
.mode("overwrite")\
.save("wasb://bigdatahadoophdistorage1-container@bigdatahadoophdistorage1.blob.core.windows.net/edge",header = 'true')

node.repartition(1)\
.write.format("csv")\
.option("header", True)\
.mode("overwrite")\
.save("wasb://bigdatahadoophdistorage1-container@bigdatahadoophdistorage1.blob.core.windows.net/node",header = 'true')