In [1]:
#We import the data
dfraw = spark.read.  \
         option("header", "true"). \
         option("nullValue", "?"). \
         option("inferSchema", "true"). \
         option("sep", ";"). \
         csv("/FileStore/tables/powerDataForTP.csv") 

df = spark.read.  \
         option("header", "true"). \
         option("nullValue", "?"). \
         option("inferSchema", "true"). \
         option("sep", ";"). \
         csv("/FileStore/tables/powerDataForTP.csv") 


In [2]:
from pyspark.sql.functions import col, avg

####We compute the mean of column and replace the missing value with it
mean_dict = { col: 'mean' for col in df.columns }
col_avgs = df.agg( mean_dict ).collect()[0].asDict()
col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() }
dfmodified=df.fillna( col_avgs )

In [3]:
display(dfmodified)

In [4]:

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

vec= VectorAssembler(
  inputCols= [
  'AT',
  'V',
  'AP',
  'RH'
  ],
  outputCol = 'features'
)
data= vec.transform(dfmodified)
print(data)
modelData = data.select('features','PE')
trainData, testData = modelData.randomSplit([0.7,0.3])

#Visualization of the variables
modelData.describe().show()
trainData.describe().show()
testData.describe().show()

#Model Creation
lr= LinearRegression(labelCol='PE',featuresCol='features',regParam=0.01)

#We fit the model
lrModel=lr.fit(trainData)

#we create a summary and display it
summary=lrModel.summary
summary.predictions.show(truncate=False)


#Evaluator
evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")

#Estimator
lr1= LinearRegression(labelCol='PE',featuresCol='features')
lr2= LinearRegression(labelCol='PE',featuresCol='features')
lr3= LinearRegression(labelCol='PE',featuresCol='features')

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid1 = ParamGridBuilder().\
    addGrid(lr1.regParam, [0, 0.2, 0.4,0.6, 0.8, 1]).\
    addGrid(lr1.elasticNetParam, [0]).\
    build()
param_grid2 = ParamGridBuilder().\
    addGrid(lr2.regParam, [0, 0.2, 0.4,0.6, 0.8, 1]).\
    addGrid(lr2.elasticNetParam, [1]).\
    build()
param_grid3 = ParamGridBuilder().\
    addGrid(lr2.regParam, [0, 0.2, 0.4,0.6, 0.8, 1]).\
    addGrid(lr2.elasticNetParam, [0.2, 0.4,0.6, 0.8]).\
    build()
# cross-validation model
from pyspark.ml.tuning import CrossValidator
cv1 = CrossValidator(estimator=lr1, estimatorParamMaps=param_grid1, evaluator=evaluator, numFolds=4)
cv2 = CrossValidator(estimator=lr2, estimatorParamMaps=param_grid2, evaluator=evaluator, numFolds=4)
cv3 = CrossValidator(estimator=lr3, estimatorParamMaps=param_grid3, evaluator=evaluator, numFolds=4)

#Fit cross-validation model
cv_model1 = cv1.fit(trainData)
cv_model2 = cv2.fit(trainData)
cv_model3 = cv3.fit(trainData)

#Prediction
pred_training_cv1 = cv_model1.transform(trainData)
pred_test_cv1 = cv_model1.transform(testData)

pred_training_cv2 = cv_model2.transform(trainData)
pred_test_cv2 = cv_model2.transform(testData)

pred_training_cv3 = cv_model3.transform(trainData)
pred_test_cv3 = cv_model3.transform(testData)

#Evaluation
# performance on training data
print("Train data performance Lasso = ", evaluator.evaluate(pred_training_cv1))
print("Train data performance Ridge = ", evaluator.evaluate(pred_training_cv2))
print("Train data performance Elastic = ", evaluator.evaluate(pred_training_cv3))

# performance on test data
print("Test data performance Lasso = ", evaluator.evaluate(pred_test_cv1))
print("Test data performance Ridge = ", evaluator.evaluate(pred_test_cv2))
print("Test data performance Elastic = ", evaluator.evaluate(pred_test_cv3))

#Intercept and coefficients
print("Best intercept Lasso = ", cv_model1.bestModel.intercept)
#print("Best coefficients Lasso = ", cv_model1.bestModel.coefficients)

print("Best intercept Ridge = ", cv_model2.bestModel.intercept)
#print("Best coefficients Ridge = ", cv_model2.bestModel.coefficients)

print("Best intercept Elastic = ", cv_model3.bestModel.intercept)
#print("Best coefficients Elastic = ", cv_model3.bestModel.coefficients)


In [5]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.ml.regression import DecisionTreeRegressor,RandomForestRegressor
from pyspark.ml.classification import DecisionTreeClassifier

trainData2, testData2 = dfmodified.randomSplit([0.7,0.3])

dt = DecisionTreeRegressor(maxDepth=3)
rf = RandomForestRegressor(numTrees=3, maxDepth=3)

dt.setLabelCol("PE")\
  .setPredictionCol("prediction")\
  .setFeaturesCol("features")

rf.setLabelCol("PE")\
  .setPredictionCol("prediction")\
  .setFeaturesCol("features")
  
#rf = RandomForestRegressor(labelCol="PE", featuresCol="features", predictionCol("prediction"))

# We create the Pipeline
dtPipeline = Pipeline()
rfPipeline = Pipeline()

# We set the parameters of the pipeline
dtPipeline.setStages([vec, dt])
rfPipeline.setStages([vec, rf])

eval = RegressionEvaluator(predictionCol="prediction", labelCol="PE", metricName="rmse")
crossval = CrossValidator(estimator=dtPipeline, evaluator=eval, numFolds=4)
crossval2 = CrossValidator(estimator=rfPipeline, evaluator=eval, numFolds=4)
crossval.setEstimator(dtPipeline)
crossval2.setEstimator(rfPipeline)

# We try different maxDepth
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,3,4,5])
             .build())

paramGrid2 = (ParamGridBuilder()
             .addGrid(rf.maxBins, [20,40,60,80,100])
             .build())

# We add the grid
crossval.setEstimatorParamMaps(paramGrid)
crossval2.setEstimatorParamMaps(paramGrid2)

# We generate the best model
dtModel = crossval.fit(trainData2).bestModel
rfModel = crossval2.fit(trainData2).bestModel

#Prediction
pred_training_cv3 = dtModel.transform(trainData2)
pred_test_cv3 = dtModel.transform(testData2)

pred_training_cv4 = rfModel.transform(trainData2)
pred_test_cv4 = rfModel.transform(testData2)

# performance on training data
print("Train data performance Decision Tree = ", evaluator.evaluate(pred_training_cv3))
print("Train data performance Random Forest = ", evaluator.evaluate(pred_training_cv4))

# performance on test data
print("Test data performance Decision Tree = ", evaluator.evaluate(pred_test_cv3))
print("Test data performance Random Forest = ", evaluator.evaluate(pred_test_cv4))


In [6]:
#################################################################PART2

In [7]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer, Tokenizer, RegexTokenizer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import NGram
from pyspark.sql.functions import udf 
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
import numpy as np
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.ml.regression import DecisionTreeRegressor,RandomForestRegressor
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.feature import CountVectorizer


#We read the file
rdd=sc.textFile("/FileStore/tables/SMSSpamCollection.txt").map(lambda line: line.split('\t')) 
rddMap = rdd.map(lambda scmr:(1 if scmr[0]=="spam" else 0,scmr[1]))
#We convert it to dataframe
dfSpam = rddMap.toDF(["label","message"])#we create the dataframe

#We only select word of length>3
regex_tokenizer = RegexTokenizer(inputCol='message', outputCol='words', pattern='\\W').setMinTokenLength(3)
regex_df = regex_tokenizer.transform(dfSpam)
regex_df.show(truncate=False)

#We create 2 and 3 gram
ngram2 = NGram(n=2, inputCol='words', outputCol='2grams')
gramdf2 = ngram2.transform(regex_df)
ngram3 = NGram(n=3, inputCol='words', outputCol='3grams')
gramdf3 =ngram3.transform(gramdf2)
#We count the words
count_words = udf(lambda words: len(words), IntegerType())
messageTreatedDf=gramdf3.withColumn('counts', count_words('words'))
messageTreatedDf.show()

#We create the therm frequency of the 3 columns words, 2gram and 3grams
tf1 = HashingTF(inputCol='words', outputCol='wordsf')
tf_df1 = tf1.transform(messageTreatedDf)
tf2 = HashingTF(inputCol='2grams', outputCol='2gramsf')
tf_df2 = tf2.transform(tf_df1)
tf3 = HashingTF(inputCol='3grams', outputCol='3gramsf')
tf_df3 = tf3.transform(tf_df2)
#We create the coresponding IDF
idf1 = IDF(inputCol='wordsf', outputCol='features')
idf2 = IDF(inputCol='2gramsf', outputCol='2gramsfIDF')
idf3 = IDF(inputCol='3gramsf', outputCol='3gramsfIDF')
idf_model1 = idf1.fit(tf_df3)
dataIDF1=idf_model1.transform(tf_df3)
idf_model2 = idf2.fit(dataIDF1)
dataIDF2=idf_model2.transform(dataIDF1)
idf_model3 = idf3.fit(dataIDF2)
dataIDF3=idf_model3.transform(dataIDF2)
#display(dataIDF3)

vec= VectorAssembler(
  inputCols= [
  'wordsfIDF'
  ],
  outputCol = 'features'
)

#data= vec.transform(dataIDF3)
#display(data)
modelData = dataIDF3.select('label','features')
trainData3, testData3 = modelData.randomSplit([0.7,0.3])



In [8]:
dataS = dataIDF3.select('label','words')
count_vec = CountVectorizer(inputCol='words', outputCol='features',  minDF=1)
modelC = count_vec.fit(dataS)
data2 = modelC.transform(dataS)
modelData2 = data2.select('label','features')
trainData4, testData4 = modelData2.randomSplit([0.7,0.3])

In [9]:
#Linear Regression
#Model Creation
lr= LinearRegression(labelCol='label',featuresCol='features',regParam=0.01)

#We fit the model
lrModel=lr.fit(trainData3)

#we create a summary and display it
summary=lrModel.summary
summary.predictions.show(truncate=False)

testResults = lrModel.evaluate(testData3)

testResults.residuals.show(n=10) #diff y true & y pref = accuracy
testResults.residuals.groupBy().avg().show()
pred=testResults.predictions
df=testResults.residuals


In [10]:
#Logistic regression
#Model Creation
lr= LogisticRegression(labelCol='label',featuresCol='features',regParam=0.01)

#We fit the model
lrModel=lr.fit(trainData4)

#We set our evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# performance on training data
pred_training=lrModel.transform(trainData4)
print("Train data performance Logistic regression = ", evaluator.evaluate(pred_training))

# performance on test data
pred_test=lrModel.transform(testData4)
print("Test data performance Logistic regression = ", evaluator.evaluate(pred_test))


In [11]:
#Decision tree
dt = DecisionTreeClassifier(labelCol='label',featuresCol='features', predictionCol="prediction",maxDepth=30)

#We fit our model
dtModel=dt.fit(trainData4)

#We set our evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# performance on training data
pred_training=dtModel.transform(trainData4)
print("Train data performance Decision Tree = ", evaluator.evaluate(pred_training))

# performance on test data
pred_test=dtModel.transform(testData4)
print("Test data performance Decision Tree = ", evaluator.evaluate(pred_test))

In [12]:
#Random forest
rf = RandomForestClassifier(labelCol='label',featuresCol='features', predictionCol="prediction", numTrees=1, maxDepth=30)

#We fit our model
rfModel=rf.fit(trainData4)

#We set our evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# performance on training data
pred_training=rfModel.transform(trainData4)
print("Train data performance Logistic regression = ", evaluator.evaluate(pred_training))

# performance on test data
pred_test=rfModel.transform(testData4)
print("Test data performance Logistic regression = ", evaluator.evaluate(pred_test))

In [13]:
#Naive bayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

nb.setLabelCol("label")\
  .setPredictionCol("prediction")\
  .setFeaturesCol("features")

#We fit our model
nbModel=nb.fit(trainData4)

#testResultsnb = nbModel.transform(testData3)

#We set our evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# performance on training data
pred_training=nbModel.transform(trainData4)
print("Train data performance Logistic regression = ", evaluator.evaluate(pred_training))

# performance on test data
pred_test=nbModel.transform(testData4)
print("Test data performance Logistic regression = ", evaluator.evaluate(pred_test))

In [14]:
#Model optimisation: cross validation on set of TF-IDF

#Evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

#Estimator
lr1= LogisticRegression(labelCol='label',featuresCol='features',regParam=0.01)

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid1 = ParamGridBuilder().\
    addGrid(lr1.regParam, [0.0001,0.001, 0.01, 0.1, 0.3, 0.5, 0.7, 0.9, 1]).\
    build()

    
# cross-validation model
from pyspark.ml.tuning import CrossValidator
cv1 = CrossValidator(estimator=lr1, estimatorParamMaps=param_grid1, evaluator=evaluator, numFolds=4)

#Fit cross-validation model
cv_model1 = cv1.fit(trainData3)

#Prediction
pred_training_cv1 = cv_model1.transform(trainData3)
pred_test_cv1 = cv_model1.transform(testData3)


#Evaluation
# performance on training data
print("Train data performance Logistic regression = ", evaluator.evaluate(pred_training_cv1))

# performance on test data
print("Test data performance Logistic regression = ", evaluator.evaluate(pred_test_cv1))


In [15]:
#Model optimisation: cross validation on set of Count Vectoriser

#Evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

#Estimator
lr1= LogisticRegression(labelCol='label',featuresCol='features',regParam=0.01)

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid1 = ParamGridBuilder().\
    addGrid(lr1.regParam, [0.0001,0.001, 0.01, 0.1, 0.3, 0.5, 0.7, 0.9, 1]).\
    build()

    
# cross-validation model
from pyspark.ml.tuning import CrossValidator
cv1 = CrossValidator(estimator=lr1, estimatorParamMaps=param_grid1, evaluator=evaluator, numFolds=4)

#Fit cross-validation model
cv_model1 = cv1.fit(trainData4)

#Prediction
pred_training_cv1 = cv_model1.transform(trainData4)
pred_test_cv1 = cv_model1.transform(testData4)


#Evaluation
# performance on training data
print("Train data performance Logistic regression = ", evaluator.evaluate(pred_training_cv1))

# performance on test data
print("Test data performance Logistic regression = ", evaluator.evaluate(pred_test_cv1))