# Milestone 2 - multiple correlations

Find correlations between at least p >= 3 vectors over a threshold t.

*   free to choose value of p
*   start with p = 3

As correlation measures, you should consider the following two:

*   total correlation
*   pearson correlation, where the vectors of the two sides are computed by linear combinations (e.g. averaged). For example, consider the multiple correlation of vectors representing
Rabobank and ING (denoted as R and I, respectively), with the vector representing Microsoft
(denoted as M). The multiple correlation is then defined as Corr(M, R+I/2) 

You also need to define an aggregation function. The functionality of the aggregation function is
(possibly) to reduce the number of input time series, e.g., by averaging as in the previous
example. Other aggregation functions that might be meaningful include maximum, minimum, the
identity function, variance, etc.

In particular:
- Build and implement an architecture that gets as input your dataset(s), a correlation function,
and a suitable aggregation function, and finds the sets with high multiple correlations (over a
threshold t). The value of t depends on the dataset and the correlation/aggregation function.
Set a value such that you return around 10 results. Your code should not consider solutions
where a vector appears more than once in the input parameters (e.g., in both in1 and in2).
- Implement the two described correlation functions with two meaningful aggregation functions,
and test your code with these. For testing, you can use the same dataset used in Milestone 1,
or a non-negligible subset of it.

Constraints:
1. You should define aggregation and correlation functions separately.
2. The aggregation function should implement one of the following generic interfaces (you can
slightly modify it if you have a good reason):
dataType aggrFunction(List<dataType> in), e.g., for averaging
List<dataType> aggrFunction(List<dataType> in), e.g., the identity function
3. The correlation function should implement one of the following generic interfaces (you can
slightly modify it if you have a good reason):
double correlationFunction(aggFunction(List<dataType> in))
double correlationFunction(aggFunction(List<dataType> in1),
aggFunction(List<dataType> in2))
4. The Spark code should take the correlation and aggregation function (any implementations
that satisfy the described interface) as input parameters.
5. Make sure that you write Spark code that scales out and avoids bottlenecks.
Hint: Think how you can efficiently enumerate all possible combinations and assign ranges of 2
comparisons to the individual workers.
6. When the correlation/aggregation functions are commutative and associative, you should
avoid redundant computations. This includes, e.g., computing avg(x,y)=avg(y,x) twice, or
computing corr((x,y), (z,w))=corr((w,z), (y,x))=corr((w,z), (x,y))=... multiple times.

In [1]:
import os
import pandas as pd
import numpy as np

In [2]:
import findspark

findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "12g").getOrCreate()

In [3]:
df = pd.read_pickle("data_processed.pkl")

In [4]:
temp = []
names = []
for i in df:
  try:
    test = spark.createDataFrame(df[i])
  except:
    names.append(i)

for i in names:
  del df[i]

In [5]:
import pyspark.sql.functions as F

dataframes = []
for key in df.keys():
  dataframes.append(df[key])

new_df = pd.concat(dataframes)

In [6]:
# Create Spark dataframe
sparkdf = spark.createDataFrame(new_df)

In [7]:
clean_df = sparkdf.drop('highest price').drop('lowest price').drop('closing price').drop('volumes')
dataframe_sp = clean_df.groupBy("name").agg(F.collect_list("opening price").alias("op. price"))

In [8]:
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.ml.stat import Correlation
from pyspark.mllib.stat import Statistics
from pyspark.ml.linalg import SparseVector, DenseVector
from scipy.stats.stats import pearsonr

conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

In [16]:
# Create subset
subset_dataframe = spark.createDataFrame(dataframe_sp.take(200))
pandas_subset = dataframe_sp.take(200)

In [17]:
tuple_list = []
for name, value in pandas_subset:
    tuple_list.append((name, value))

#print(tuple_list)

In [11]:
from scipy.stats.stats import pearsonr
def pearson(data1, data2):
  name1_1 = data1[0]
  name2_1 = data2[0]
  return (pearsonr(data1[1],data2[1]), name1_1 + " X " + name2_1)


def subsets_eq_k(A,K):
    subsets = []
    N = len(A)

    # iterate over subsets of size K
    mask = (1<<K)-1     # 2^K - 1 is always a number having exactly K 1 bits
    while mask < (1<<N):
        subset = []
        for n in range(N):
            if ((mask>>n)&1) == 1:
                subset.append(A[n])
 
        subsets.append(subset)
 
        # catch special case
        if mask == 0:
            break
 
        # determine next mask with Gosper's hack
        a = mask & -mask                # determine rightmost 1 bit
        b = mask + a                    # determine carry bit
        mask = int(((mask^b)>>2)/a) | b # produce block of ones that begins at the least-significant bit

    return subsets

def subsets_leq_k(A,K):
    #subsets = []
    collection_subsets = [[] for i in range(K)]
    N = len(A)
 
    # iterate over subsets of size less or equal K
    mask = 0
    while mask < (1<<N): 
        subset = []
        for n in range(N):
            if ((mask>>n)&1) == 1:
                subset.append(A[n])
        if len(subset) > 0:
            collection_subsets[len(subset)-1].append(subset)
        #subsets.append(subset)
 
        # catch special case when K is zero
        if K == 0:
            break
 
        # determine next mask
        if bin(mask).count("1") < K:
            mask += 1
        else:
            mask = (mask|(mask-1))+1

    return collection_subsets 
 
def calculate_average(x):
    length = len(x)
    t = np.zeros(len(x[0][1]))
    names = []
    for i in range(length):
        t = np.add(t,x[i][1])
        names.append(x[i][0])
    t = t/length
    final_name = ""
    for name in names:
      if final_name == "":
        final_name = final_name + name
      else:
        final_name = final_name + "->" + name
    return [[(final_name,t.tolist())]]


In [18]:
import math
import time
from itertools import groupby
from operator import itemgetter
import functools 

def reduce_mapping(x):
  def pearson(data1, data2):
    name1_1 = data1[1][0]
    name2_1 = data2[1][0]
    return (pearsonr(data1[1][1],data2[1][1]), name1_1 + " X " + name2_1)
  
  return [functools.reduce(pearson, group) for _, group in groupby(sorted(x), key=itemgetter(0))]

def milestone_2(aggregation, correlation, p, data):

    #instances = 1#int(conf.get("spark.executor.instances"))
    #cores = int(conf.get("spark.executor.cores"))
    #total_cores = instances * cores
    #partitions = total_cores * 2

    subsets = subsets_leq_k(data,p-1)
    pair_averages = [subsets[0]]
    for i in range(1, len(subsets)):
        temp = sc.parallelize(spark.createDataFrame(subsets[i]).rdd.flatMap(lambda x: (aggregation(x))).collect()).collect()
        pair_averages.append(temp)

    partition = int(len(subsets[0]) * len(subsets[1]) / 5)
    print("Pairs created....moving on....")
    t = 0
    all_combinations = [[] for i in range(partition)]
    for x in range(math.floor(p/2)):
      for s in range(p):
        length_in_subset_1 = x+1
        length_in_subset_2 = s+1
        if (length_in_subset_1 + length_in_subset_2) == p:
          for a in range(len(pair_averages[x])):
            for b in range(len(pair_averages[s])):
              names1 = pair_averages[s][b][0][0].split("->")
              names2 = pair_averages[x][a][0][0].split("->")
              if not any((True for x in names1 if x in names2)):
                index = t % partition
                tmp1 = (t, (pair_averages[s][b][0][0], pair_averages[s][b][0][1]))
                tmp2 = (t, (pair_averages[x][a][0][0], pair_averages[x][a][0][1]))
                all_combinations[index].append(tmp1)
                all_combinations[index].append(tmp2)
                if t % 100000 == 0:
                  print(t)
                t=t+1

    print("We have arrived at the reduce part of this assignment, best regards. Me, Stefan,Arngrimur")
    start = time.time()
    #res = sc.parallelize(all_combinations).partitionBy(6, lambda k: int(k)).map(lambda x: reduce_mapping(x)).filter(lambda line: abs(line[0]) >= threshold).collect()#.collect()
    res = sc.parallelize(all_combinations)
    res = res.flatMap(lambda x: reduce_mapping(x)).filter(lambda line: abs(line[0][0]) >= 0.7).collect()#.filter(lambda line: abs(line[0][0]) >= 0.7).reduce( lambda x,y: x + y )#.collect()
    end = time.time()
    print("With partitioning --> ", end-start)
    print(res[:5])
      
milestone_2(calculate_average, pearson, 3, tuple_list)
#new_dataframe = sc.parallelize(subset_dataframe.rdd.zipWithIndex().flatMap(lambda x: create_index_pair(x, stored_dataframe)).collect())

Pairs created....moving on....
0
100000
200000
300000
400000
500000
600000
700000
800000
900000
1000000
1100000
1200000
1300000
1400000
1500000
1600000
1700000
1800000
1900000
2000000
2100000
2200000
2300000
2400000
2500000
2600000
2700000
2800000
2900000
3000000
3100000
3200000
3300000
3400000
3500000
3600000
3700000
3800000
3900000
We have arrived at the reduce part of this assignment, best regards. Me, Stefan,Arngrimur


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.api.python.PythonRDD$.readRDDFromInputStream(PythonRDD.scala:188)
	at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:175)
	at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
import math
import time
def milestone_2(aggregation, correlation, p, data):
    A = [4,5,7,9]
    K = p-1


    subsets = subsets_leq_k(data,p-1)
    pair_averages = [subsets[0]]
    for i in range(1, len(subsets)):
        temp = sc.parallelize(spark.createDataFrame(subsets[i]).rdd.flatMap(lambda x: (aggregation(x))).collect()).collect()
        pair_averages.append(temp)

    print("Pairs created....moving on....")
    t = 0
    all_combinations = []
    for x in range(math.floor(p/2)):
      for s in range(p):
        length_in_subset_1 = x+1
        length_in_subset_2 = s+1
        if (length_in_subset_1 + length_in_subset_2) == p:
          for a in range(len(pair_averages[x])):
            for b in range(len(pair_averages[s])):
              names1 = pair_averages[s][b][0][0].split("->")
              names2 = pair_averages[x][a][0][0].split("->")
              if not any((True for x in names1 if x in names2)):
                tmp1 = (t, (pair_averages[s][b][0][0], pair_averages[s][b][0][1]))
                tmp2 = (t, (pair_averages[x][a][0][0], pair_averages[x][a][0][1]))
                all_combinations.append(tmp1)
                all_combinations.append(tmp2)
                if t % 100000 == 0:
                  print(t)
                t=t+1
    print(t)
    print("We have arrived at the reduce part of this assignment, best regards. Me, Stefan, Arngrimur")
    start = time.time()
    res = sc.parallelize(all_combinations).reduceByKey(correlation).collect()
    end = time.time()
    print("Took --> ", end-start)
    print(res[:5])
      
milestone_2(calculate_average, pearson, 3, tuple_list)

#new_dataframe = sc.parallelize(subset_dataframe.rdd.zipWithIndex().flatMap(lambda x: create_index_pair(x, stored_dataframe)).collect())