In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn import linear_model 
import numpy as np
from pyspark.sql import functions as fn
from pyspark.ml import Pipeline
from pyspark.ml import regression
from pyspark.ml import feature
from pyspark.sql import SQLContext
import pyspark.ml.tuning as tune
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
%sh wget https://www.dropbox.com/s/9xf7gwlal3dj4bv/HEALTHRESPONACTISUMMARYMERGED.csv?dl=1 -O combinedHealth.csv

In [3]:
healthCombined = pd.read_csv('combinedHealth.csv')
healthCombined.head()
#healthCombined = spark.read.csv('file:///databricks/driver/combinedHealth.csv')

In [4]:
list(healthCombined)

In [5]:
healthCombinedEdited = healthCombined.loc[healthCombined['ERBMI'] != -1]

In [6]:
healthCombinedEdited.EUEATSUM.value_counts(dropna = False), healthCombinedEdited.EUEATSUM.count()
#healthCombinedEdited.loc[healthCombinedEdited.tewhere == -3].tewhere

In [7]:
healthCombinedEdited = healthCombinedEdited.loc[healthCombinedEdited['EUDIETSODA'].isin([1,2,3])]
#healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['TRERNWA'].isin([-1])]
healthCombinedEdited = healthCombinedEdited.loc[healthCombinedEdited['EUEXERCISE'].isin([1,2,])]
healthCombinedEdited = healthCombinedEdited.loc[healthCombinedEdited['EEINCOME1'].isin([1,2,3])]
healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['EUEXFREQ'].isin([-1])]
healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['EUFASTFD'].isin([-2, -3])]
#healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['EUFASTFDFRQ'].isin([-1,-2])]
healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['EUFDSIT'].isin([-2])]
healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['EUGENHTH'].isin([-2])]
healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['EUMEAT'].isin([-1,-2])]
#healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['EUMILK'].isin([-1])]
#healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['EUEATSUM'].isin([-1])]
healthCombinedEdited = healthCombinedEdited.loc[~healthCombinedEdited['tewhere'].isin([-1])]

#'TRERNWA', 'EUFASTFDFRQ','EUMILK','EUEATSUM',
healthCombinedCleaned = healthCombinedEdited[['ERBMI', 'ERTPREAT', 'ERTSEAT', 'EUDIETSODA',  'EUEXERCISE', 'TEAGE',  'EEINCOME1', 'EUEXFREQ', 'EUFASTFD',  'EUFFYDAY', 'EUFDSIT', 'EUGENHTH'
                                             , 'EUGROSHP', 'EUMEAT',  'EUPRPMEL', 'TUACTIVITY_N',  'tuactdur24', 'tewhere', 'TESEX']]
healthCombinedCleaned.info

In [8]:
plt.hist(healthCombinedCleaned.ERTPREAT)
display()

In [9]:
sns.jointplot( x = 'ERTPREAT', y = 'ERBMI', data = healthCombinedCleaned)
display()

In [10]:
plt.figure()
plt.hist(preprocessing.scale(healthCombinedCleaned.ERTPREAT))
display()

In [12]:
sqlContext = SQLContext(sc)

In [13]:
new_df = sqlContext.createDataFrame(healthCombinedCleaned)
display(new_df)

In [14]:
plt.figure()
pair = sns.pairplot(new_df.toPandas()[['ERBMI', 'ERTPREAT', 'ERTSEAT', 'EUDIETSODA',  'EUEXERCISE', 'TEAGE', 'TRERNWA', 'EEINCOME1', 'EUEXFREQ', 'EUFASTFD', 'EUFASTFDFRQ', 'EUFFYDAY', 'EUFDSIT', 'EUGENHTH'
                                             , 'EUGROSHP', 'EUMEAT', 'EUMILK', 'EUPRPMEL', 'TUACTIVITY_N', 'EUEATSUM', 'tuactdur24', 'tewhere', 'TESEX']])
display()

In [15]:
training, validation, test = new_df.randomSplit([0.6,0.3,0.1],0)

In [16]:
training.count()

In [17]:
validation.count()

In [18]:
test.count()

In [19]:
vecScaled = feature.VectorAssembler(inputCols = [ 'ERTPREAT', 'ERTSEAT', 'EUDIETSODA',  'EUEXERCISE', 'TEAGE',  'EEINCOME1', 'EUEXFREQ', 'EUFASTFD',  'EUFFYDAY', 'EUFDSIT', 'EUGENHTH'
                                             , 'EUGROSHP', 'EUMEAT',  'EUPRPMEL', 'TUACTIVITY_N',  'tuactdur24', 'tewhere', 'TESEX'], outputCol = 'features')

In [20]:
scaled = feature.StandardScaler(inputCol='features', outputCol='sclaedFeatures')

In [21]:
regScaled = regression.LinearRegression(labelCol = 'ERBMI', featuresCol = 'sclaedFeatures', maxIter=5)

In [22]:
regUnscaled = regression.LinearRegression(labelCol = 'ERBMI', featuresCol = 'features', regParam=0, elasticNetParam = 0)

In [23]:
vecIntercept = feature.VectorAssembler(inputCols=[], outputCol='emptyFeatures')

In [24]:
regIntercept = regression.LinearRegression(labelCol= 'ERBMI', featuresCol= 'emptyFeatures')

In [25]:
pipeIntercept = Pipeline(stages = [vecIntercept, regIntercept])

In [26]:
interceptModel = pipeIntercept.fit(training)

In [27]:
rmse = fn.sqrt(fn.avg((fn.col('ERBMI') - fn.col('prediction'))**2))

In [28]:
interceptModel.transform(test).select(rmse).show()

In [29]:
PipeUnscaled = Pipeline(stages = [vecScaled, regUnscaled])

In [30]:
PipeScaled = Pipeline(stages = [vecScaled, scaled, regScaled])

In [31]:
unScaledModel = PipeUnscaled.fit(training)

In [32]:
unScaledModel.transform(test).select(rmse).show()

In [33]:
linModelUnscaled = unScaledModel.stages[-1]

In [34]:
scaledModel = PipeScaled.fit(training)

In [35]:
scaledModel.transform(test).select(rmse).show()

In [36]:
linModelUnscaled.coefficients

In [37]:
linModelScaled.coefficients

In [38]:
valuesDF = pd.DataFrame( healthCombinedCleaned.drop('ERBMI', axis = 1).columns)

In [39]:
valuesDF['NotScaled'] = linModelUnscaled.coefficients

In [40]:
valuesDF['Scaled'] = linModelScaled.coefficients

In [41]:
valuesDF

In [42]:
valuesDF.columns = ['feature', 'notScaled', 'Scaled']

In [43]:
valuesDF.columns

In [44]:
plt.figure()
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.barplot( y = 'Scaled', x = 'feature', data = valuesDF)
plt.xticks(rotation = 60)
display()

In [45]:
plt.figure()
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.barplot( y = 'notScaled', x = 'feature', data = valuesDF)
plt.xticks(rotation = 60)
display()

In [46]:
indexDf = valuesDF.set_index('feature')
indexDf

In [47]:
plt.figure()
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')

ax = fig.add_subplot(111) 
ax2 = ax.twinx() 

width = 0.4
indexDf.notScaled.plot(kind='bar', color='red', ax=ax, width=width, position=0, legend = True)
indexDf.Scaled.plot(kind='bar', color='blue', ax=ax, width=width, position=1, legend = True)

display()

In [48]:
plt.figure()
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')

ax = fig.add_subplot(111) 
ax2 = ax.twinx() 

width = 0.4
indexDf.abs().notScaled.plot(kind='bar', color='red', ax=ax, width=width, position=0, legend = True)
indexDf.abs().Scaled.plot(kind='bar', color='blue', ax=ax, width=width, position=1, legend = True)



display()

In [49]:
grid = tune.ParamGridBuilder()

In [50]:
grid = grid.addGrid(reg.elasticNetParam, [0, 0.2, 0.4, 0.6, 0.8, 1])

In [51]:
grid = grid.addGrid(reg.regParam, np.arange(0,.1,.01))

In [52]:
grid = grid.build()

In [53]:
evaluator = RegressionEvaluator(labelCol=reg.getLabelCol(), predictionCol=reg.getPredictionCol())

In [54]:
crossPipe = Pipeline(stages = [vecScaled, scaled, regScaled])

In [55]:
#cv = tune.CrossValidator(estimator = crossPipe, estimatorParamMaps = grid, evaluator= evaluator, numFolds = 3)

In [56]:
from pyspark.sql.functions import rand
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel

In [57]:
a = list()
b = list()
c = list()

In [58]:
class CrossValidatorVerbose(CrossValidator):
    
    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)
        zzs = dict()
        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()

        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                # TODO: duplicate evaluator to take extra params from input
                metric = eva.evaluate(model.transform(validation, paramMap))
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % (  
                  {param.name: val for (param, val) in paramMap.items()},
                    metricName, metric, avgSoFar))

                for (param, val) in paramMap.items():
                  a.append(param.name)
                  b.append(val)
                  c.append(metric)
                
        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % (
            {param.name: val for (param, val) in bestParams.items()},
            metricName, bestAvg))
        
        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics))

In [59]:
cvVer = CrossValidatorVerbose(estimator = crossPipe, estimatorParamMaps = grid, evaluator= evaluator, numFolds = 3)

In [60]:
cvVer.fit(training).transform(test)

In [61]:
fitWithCrossValidation = cvVer.fit(training)

In [62]:
modelOnTest = fitWithCrossValidation.transform(test)

In [63]:
newDict = {}
for i in range(0, len(a), 2):
  newDict[a[i] + " " + `b[i]` + " " + a[i + 1] + " " + `b[i + 1]`] = c[i]

In [64]:
for key, value in sorted(newDict.iteritems(), key=lambda (k,v): (v,k)):
    print "%s: %s" % (key, value)

In [66]:
finalModelFit =  cv.fit(training)

In [67]:
evaluator.evaluate(finalModelFit.transform(test))

In [68]:
pred =  finalModelFit.transform(test)

In [69]:
pred.select('ERBMI', 'prediction').show(500)

In [70]:
pred.select(rmse).show()

In [71]:
best = finalModelFit.bestModel.stages[-1]

In [72]:
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel

In [73]:
class CrossValidatorVerbose(CrossValidator):

    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)

        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()

        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                # TODO: duplicate evaluator to take extra params from input
                metric = eva.evaluate(model.transform(validation, paramMap))
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % (
                    {param.name: val for (param, val) in paramMap.items()},
                    metricName, metric, avgSoFar))

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % (
            {param.name: val for (param, val) in bestParams.items()},
            metricName, bestAvg))

        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics))

In [74]:
finalModelFit

In [75]:
finalModelFit.explainParams()

In [76]:
BestModel = finalModelFit.bestModel.stages[-1]
BestModel

In [77]:
BestModel.coefficients

In [78]:
indexDf['BestScaled'] = BestModel.coefficients

In [79]:
valuesDF['BestScaled'] = BestModel.coefficients

In [80]:
indexDf

In [81]:
plt.figure()
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')

ax = fig.add_subplot(111) 
ax2 = ax.twinx() 

width = 0.4
indexDf.BestScaled.plot(kind='bar', color='red', ax=ax, width=width, position=0, legend = True)
indexDf.Scaled.plot(kind='bar', color='blue', ax=ax, width=width, position=1, legend = True)

display()

In [82]:
plt.figure()
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.barplot( y = 'BestScaled', x = 'feature', data = valuesDF)
plt.xticks(rotation = 60)
display()

In [83]:
BestModel.hasSummary

In [84]:
BestModel.intercept

In [85]:
BestModel.summary.meanSquaredError

In [86]:
BestModel.summary.r2

In [87]:
BestModel.summary.pValues

In [88]:
BestModel.summary.rootMeanSquaredError

In [89]:
BestModel.summary.residuals.count()

In [90]:
BestModel.params

In [91]:
BestModel.summary.predictions.selectExpr('cast(prediction as float) pre').collect()

In [92]:
BestModel.summary.predictions.select('prediction')

In [93]:
BestModel.summary.residuals.selectExpr('cast(residuals as float) res').show()

In [94]:
BestModel

In [95]:
BestModel.summary.predictionCol

In [96]:
pred = BestModel.summary.predictions.toPandas()

In [97]:
type(pred)

In [98]:
pred.head()

In [99]:
a = pred.prediction

In [100]:
resd = BestModel.summary.residuals.toPandas()

In [101]:
type(resd)

In [102]:
b = resd.residuals

In [103]:
dfa = pd.DataFrame(data = a)

In [104]:
dfa.columns

In [105]:
dfa.dtypes

In [106]:
dfa['re'] = b

In [107]:
dfa.prediction

In [108]:
dfa.plot.scatter('prediction', 're')
display() 

In [109]:
dfa

In [110]:
dfa.describe()

In [113]:
rmse = fn.sqrt(fn.avg((fn.col('ERBMI') - fn.col('prediction'))**2))

In [114]:
testModel.transform(test).select(rmse).show()

In [115]:
testModel.transform(test).show(500)

In [116]:
testModel.stages[1].coefficients

In [117]:
testModel.stages[1].intercept

In [118]:
testModel.stages[1].

In [121]:
healthCombinedCleaned = healthCombinedEdited.loc[~healthCombinedEdited['TRERNWA'].isin([-1])]
healthCombinedCleaned.TRERNWA.value_counts(dropna = False)

In [122]:
X_train, X_test, y_train, y_test = train_test_split(healthCombinedCleaned.drop('ERBMI', axis = 1), healthCombinedCleaned.ERBMI, test_size = 0.3, random_state = 21 )

In [123]:
reg = linear_model.LinearRegression()

In [124]:
reg.fit(X_train, y_train)

In [125]:
reg.score(X_test, y_test)

In [126]:
healthCombinedEdited.head()

In [127]:
healthCombinedEdited.ERINCOME.value_counts()

In [128]:
healthCombinedEdited[healthCombinedEdited.ERINCOME]

In [129]:
mostEating = healthCombinedEdited.ERTSEAT.value_counts(dropna = False)[0:25].index.tolist()
tryingEating = healthCombinedEdited.loc[healthCombinedEdited['ERTSEAT'].isin(mostEating)]

In [130]:
healthCombinedEdited.loc[healthCombinedEdited['ERTSEAT'].isin(mostEating)].ERTSEAT.value_counts(dropna = False)

In [131]:
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.regplot( x = tryingEating["ERTSEAT"], y = tryingEating["ERBMI"], fit_reg = False)
display()

In [132]:
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot( x = "ERTSEAT", y = "ERBMI", data = tryingEating)
display()

In [133]:
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
tryingExer = healthCombinedEdited.loc[healthCombinedEdited['EUEXERCISE'].isin([1,2])]
sns.boxplot( x = "EUEXERCISE", y = "ERBMI", data = tryingExer)
display()

In [134]:
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot( x = "TESEX", y = "ERBMI", data = healthCombinedEdited)
display()

In [135]:
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'EEINCOME1', y =  "ERBMI", data = healthCombinedEdited.loc[healthCombinedEdited['EEINCOME1'].isin([1,2, 3])])
display()

In [136]:
healthCombinedEdited.ERTPREAT.max()

In [137]:
labels = ['Very High', 'High', 'Med', 'Low', 'Very Low']
labels

In [138]:
healthCombinedEdited['TimeSecondaryEating'] = pd.cut(healthCombinedEdited.ERTPREAT, 5, right = False, labels = labels)

In [139]:
healthCombinedEdited['TimeSecondaryEating'].value_counts()

In [140]:
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'TimeSecondaryEating', y =  "ERBMI", data = healthCombinedEdited)
display()

In [141]:
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'EUDIETSODA', y =  "ERBMI", data = healthCombinedEdited.loc[healthCombinedEdited['EUDIETSODA'].isin([1,2, 3])])
display()

In [142]:
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'EUDRINK', y =  "ERBMI", data = healthCombinedEdited.loc[healthCombinedEdited['EUDRINK'].isin([1,2])])
display()

In [143]:
healthCombinedEdited.EUFASTFD.value_counts()
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'EUFASTFD', y =  "ERBMI", data = healthCombinedEdited.loc[healthCombinedEdited['EUFASTFD'].isin([1,2])])
display()

In [144]:
healthCombinedEdited.EUFASTFDFRQ.value_counts()
healthCombinedEdited['FastFoodFrequrency'] = pd.cut(healthCombinedEdited.EUFASTFDFRQ, 5, right = False, labels = labels)
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'FastFoodFrequrency', y =  "ERBMI", data = healthCombinedEdited)
display()

In [145]:
healthCombinedEdited.EUGENHTH.value_counts(dropna = False)
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'EUGENHTH', y =  "ERBMI", data = healthCombinedEdited.loc[healthCombinedEdited['EUGENHTH'].isin([1,2,3,4,5])])
display()

In [146]:
healthCombinedEdited.TESCHENR.value_counts(dropna = False)
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'TESCHENR', y =  "ERBMI", data = healthCombinedEdited.loc[healthCombinedEdited['TESCHENR'].isin([1,2])])
display()

In [147]:
healthCombinedEdited.EUFASTFDFRQ.value_counts()
healthCombinedEdited['FastFoodFrequrency'] = pd.cut(healthCombinedEdited.EUFASTFDFRQ, 5, right = False, labels = labels)
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'FastFoodFrequrency', y =  "ERBMI", data = healthCombinedEdited)
display()

In [148]:
tempDF = healthCombinedEdited.loc[healthCombinedEdited['TRERNWA'] != -1]
tempDF['Earnings'] = pd.cut(tempDF.TRERNWA, 5, right = False, labels = labels)
fig=plt.figure(figsize=(18, 16), dpi= 80, facecolor='w', edgecolor='k')
sns.boxplot(x = 'Earnings', y =  "ERBMI", data = tempDF)
display()