In [96]:
from operator import add

from pyspark.mllib.linalg.distributed import RowMatrix

from pyspark.ml.feature import RFormula

from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.stat import Statistics

from pyspark.sql import Row
from pyspark.sql.types import *

# Functions

# Load test data

In [40]:
# Load data
wwine_rdd = sc.textFile("wine-data/winequality-white.csv")
wwine_rdd = wwine_rdd.map(lambda line: line.replace('"',''))
wwine_rdd = wwine_rdd.map(lambda line: line.replace(';',','))

In [41]:
# Get header
header_str = wwine_rdd.first()
header = header_str.split(',')
header = map(lambda h: h.replace(' ','_'),header)
Wine = Row(*header)


In [42]:
# Create colection of Rows
data = wwine_rdd.filter(lambda x: x!=header_str)
data = data.map(lambda w: w.split(','))
data = data.map(lambda w: map(float,w))
#data = data.map(lambda w: Wine(*w))

# Create DataFrame
df = sqlContext.createDataFrame(data, schema=header)
df.printSchema()

root
 |-- fixed_acidity: double (nullable = true)
 |-- volatile_acidity: double (nullable = true)
 |-- citric_acid: double (nullable = true)
 |-- residual_sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free_sulfur_dioxide: double (nullable = true)
 |-- total_sulfur_dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: double (nullable = true)



In [43]:
# Build feature assemble
predictors = ['fixed_acidity',
              'volatile_acidity',
              'citric_acid',
              'residual_sugar',
              'chlorides',
              'free_sulfur_dioxide',
              'total_sulfur_dioxide',
              'density',
              'pH',
              'sulphates',
              'alcohol']
formula = RFormula(
    formula = 'quality ~ ' + '+'.join(predictors),
    featuresCol = 'features',
    labelCol = 'target'
)
out_df = formula.fit(df).transform(df)

# Univariate selection

In [164]:
# Functions
def d_corr(v1, v2):
    return Statistics.corr(v1,v2)

def d_ftest(v, t):
    
    # calculate auxiliary variables
    n_samples = v.count()
    n_groups = t.distinct().count() # number of distinct groups
    overall_mean = vector.mean() # overall mean
    aux_mean = z_vector.aggregateByKey((0,0),
                                  lambda x,y: (x[0]+y,x[1]+1),
                                  lambda x,y: (x[0]+y[0],x[1]+y[1]))
    group_count = aux_mean.map(lambda (label,x): (label,x[1])) # per group mean
    group_mean = aux_mean.map(lambda (label,x): (label,x[0]/x[1])) # per group mean
    aux_within = z_vector.leftOuterJoin(group_mean)

    # between-group variability
    num = sum([nx[1]*(mx[1]-overall_mean)**2 for (nx,mx) in zip(group_count.collect(),
                                                                group_mean.collect())])/float(n_groups-1)
    
    # within-group variability
    den = aux_within.map(lambda (_,x): (x[0]-x[1])**2).reduce(add)/float(n_samples-n_groups)
    
    return num/den

In [165]:
# Extract features and target
feats = out_df.select('features').rdd
feats = feats.map(lambda x: x['features'])
target = out_df.select('target').rdd
target = target.map(lambda x: x['target'])

# Compute correlation
corr_scores = []
for feat in range(feats.first().size):
    vector = feats.map(lambda x: x[feat])
    corr_scores.append(d_corr(vector,target))
    
f_scores = []
for feat in range(feats.first().size):
    vector = feats.map(lambda x: x[feat])
    f_scores.append(d_ftest(vector,target))