/
dlsa.py
94 lines (82 loc) · 3.67 KB
/
dlsa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#! /usr/bin/env python3
import os, zipfile, pathlib
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
##--------------------------------------------------------------------------------------
# Python version - Simplified output version
##--------------------------------------------------------------------------------------
def dlsa_mapreduce(model_mapped_sdf, sample_size):
'''
MapReduce for partitioned data with given model
Calculate global estimator
'''
# Spark data frame to Pandas data frame
#--------------------------------------
model_mapped_sdf = model_mapped_sdf.drop('par_id')
model_mapped_pdf = model_mapped_sdf.toPandas()
# Sum of each column
#--------------------------------------
p = model_mapped_pdf.shape[1] - 1
model_mapped_sum = np.array(model_mapped_pdf.apply(lambda x: x.sum())).reshape(1, p+1)
# Extract required results
#--------------------------------------
Sig_inv_sum_value = np.array(model_mapped_sum[:,0]).reshape(1, 1) # 1-by-1
Sig_invMcoef_sum = np.array(model_mapped_sum[:,1:]).reshape(p, 1) # p-by-1
# Generate diag according Sig_inv_sum_value
#--------------------------------------
Sig_inv_sum_inv = 1/Sig_inv_sum_value * np.identity(p) # p-by-p
# Get Theta_tilde and Sig_tilde
#--------------------------------------
Theta_tilde = Sig_inv_sum_inv.dot(Sig_invMcoef_sum) # p-by-1
Sig_tilde = Sig_inv_sum_inv*sample_size # p-by-p
# Out
#--------------------------------------
out = pd.DataFrame(np.concatenate((Theta_tilde, Sig_tilde), 1),
columns= ["Theta_tilde"] + model_mapped_sdf.columns[1:])
return out
##--------------------------------------------------------------------------------------
# Python version - Standard output version
##--------------------------------------------------------------------------------------
#def dlsa_mapreduce(model_mapped_sdf, sample_size):
# '''
# MapReduce for partitioned data with given model
# Calculate global estimator
# '''
# ##----------------------------------------------------------------------------------------
# ## MERGE
# ##----------------------------------------------------------------------------------------
# groupped_sdf = model_mapped_sdf.groupby("par_id")
# groupped_sdf_sum = groupped_sdf.sum(*model_mapped_sdf.columns[1:]) #TODO: Error with Python < 3.7 for > 255 arguments. Location 0 is 'par_id'
# groupped_pdf_sum = groupped_sdf_sum.toPandas().sort_values("par_id")
#
# p = groupped_pdf_sum.shape[0]
#
# if groupped_pdf_sum.shape[0] == 0: # bad chunked models
#
# raise Exception("Zero-length grouped pandas DataFrame obtained, check the input.")
#
# else:
#
# # Extract required results
# #--------------------------------------
# Sig_invMcoef_sum = groupped_pdf_sum.iloc[:,2] # p-by-1
# Sig_inv_sum = groupped_pdf_sum.iloc[:,3:] # p-by-p
#
# Sig_inv_sum_inv = np.linalg.inv(Sig_inv_sum) # p-by-p
#
# # Get Theta_tilde and Sig_tilde
# #--------------------------------------
# Theta_tilde = Sig_inv_sum_inv.dot(Sig_invMcoef_sum) # p-by-1
# Sig_tilde = Sig_inv_sum_inv*sample_size # p-by-p
#
# # Reshape
# #--------------------------------------
# Theta_tilde = np.array(Theta_tilde).reshape(p, 1)
# Sig_tilde = np.array(Sig_tilde).reshape(p, p)
#
# out = pd.DataFrame(np.concatenate((Theta_tilde, Sig_tilde), 1),
# columns= ["Theta_tilde"] + model_mapped_sdf.columns[3:])
#
# return out