In [1]:
# -*- coding: utf-8 -*-
"""
Created on Mon jan  6 15:43:48 2019

@author: Ali, Mohammed
"""

from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel,Pipeline
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer,IndexToString, StringIndexer, VectorIndexer,StandardScaler,OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
import functools
from pyspark.sql.functions import udf
from numpy.random import randint
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
import pandas as pd
import matplotlib.pyplot as plt
from imblearn.over_sampling import *
import seaborn as sns
from pyspark.sql import *
import time

In [2]:
# function to create a SparkSession or get it if is done
def get_spark():
    return (SparkSession.builder
                .master("local")
                .appName("mineropredict_v1")
                .getOrCreate())

In [3]:
# as petrophysicien prefer to rescale RHOB2 in the scale of dif to interpret their result
# we scale features RHOB2 in the scale of dif
def preprocess(dataframe):
    dataframe = dataframe.withColumn("RHOB2",0.6*(dataframe.RHOB-1.95)-0.15 )
    dataframe = dataframe.withColumn("dif",dataframe.RHOB2-dataframe.NPHI )    
    dataframe=dataframe.na.drop()
    return dataframe

In [4]:
# upload model for each mineral from the output file
def upload_models(output_folder,minerals_set=["Halite","Anhydrite","Illite","Quartz","Dolomite","Calcite"]):
    models={}  
    for i in minerals_set:
        models[i]=PipelineModel.load("{}/{}".format(output_folder,i))
    return models

In [19]:
# function to subsample data to get balanced classes
# also use to augment the data size
def balance(df, sqlContext, classes_sizes, label):
    df_ = df.toPandas()  
    
    over_sambler = SMOTE(sampling_strategy=classes_sizes, random_state=69
                       )
    
    print(df_[label].unique())
    features = list(set(df_.columns) - set(label))
    X_resampled, y_resampled = over_sambler.fit_resample(df_[features].values,
                                                df_[label].values)
    df_sampled = np.append(y_resampled.reshape(-1, 1),
                                        X_resampled, axis=1).tolist()
    return sqlContext.createDataFrame(df_sampled, schema = df.columns)

In [6]:
# function to evaluate the accuracy of the model in the test data
def evaluate(testData,mineral):
    evaluator = MulticlassClassificationEvaluator(labelCol="index_"+mineral, 
                                predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(testData)
    print("Test accuracy for {} = {}".format(mineral,accuracy))

In [22]:
def pipeline_simple(out,data):
#threshold to binarize the mineral 1 if it's bigger else 0
    cuts={"Halite":0.7,"Anhydrite":0.75,"Illite":0.3,"Calcite":0.45,"Dolomite":0.5,"Quartz":0.4}
# names features groubed by set
    cols_now2 = ['dif','RHOB2','NPHI']
    cols_now3 = ["DEEPRES","GR"]
    cols_now4 = ['RHOB2','NPHI',"scaled_features"]
# convert float (0 or 1) into string 
    string_indexer = StringIndexer(inputCol="index"+out, outputCol="index_"+out).fit(data)
    if out=="Halite" or out=="Anhydrite":
# reshape features to list of tuplet 
        assembler_features = VectorAssembler(inputCols=cols_now2, outputCol='features')
# we model our data with random forest 
        rf = RF(labelCol='index_'+out, featuresCol='features',numTrees=150,maxDepth=20)
# to convert type of output (string to float)        
        labelConverter = IndexToString(inputCol="prediction", outputCol="predicted_"+out,labels=string_indexer.labels)
        tmp = [string_indexer,assembler_features,rf,labelConverter]
        pipeline = Pipeline(stages=tmp)
    else:
        assembler_features = VectorAssembler(inputCols=cols_now3, outputCol='features1')
# scale features mean 0 and variance 1
        scaler = StandardScaler(inputCol="features1", outputCol="scaled_features", withStd=True, withMean=True)
        assembler_features2 = VectorAssembler(inputCols=cols_now4, outputCol='features')
        rf = RF(labelCol='index_'+out, featuresCol='features',numTrees=150,maxDepth=20)
        labelConverter = IndexToString(inputCol="prediction", outputCol="predicted_"+out,labels=string_indexer.labels)
        tmp=[string_indexer,assembler_features,scaler,assembler_features2,rf,labelConverter]
        pipeline=Pipeline(stages=tmp)

    return pipeline.fit(data)

In [23]:
def learn(dataframe, output_folder, sqlContext, classes_sizes, evaluation=True,cuts=
          {"Halite":0.7,"Anhydrite":0.75,"Illite":0.3,"Calcite":0.45,
           "Dolomite":0.5,"Quartz":0.4}):
# preprocess dataframe
    time_ = []
    t = time.time()
    dataframe=preprocess(dataframe)
    for out in cuts:
# binarize the mineral 1 if it's bigger else 0
            cut = lambda x:1 if x>cuts[out] else 0
            cutoff = udf(cut)
            df=dataframe.withColumn("index"+out,cutoff(out))
            df_subsample = balance(df, sqlContext, classes_sizes, out)
            if evaluation==False:
                model=pipeline_simple(out,df_subsample)
                model.write().overwrite().save("{}/{}".format(output_folder,out))
                t = time.time() - t
                time_.append(t)
                t = time.time()
            else:
                trainingData, testData = df_subsample.randomSplit([0.7,0.3], seed=69) 
                model=pipeline_simple(out,trainingData)
                testData=model.transform(testData)
                evaluate(testData,out)
                model.write().overwrite().save("{}/{}".format(output_folder,out))
                t = time.time() - t
                time_.append(t)
                t = time.time()
    return time_

In [9]:
def predict(models_folder,dataframe,minerals_set=["Halite","Anydrite","Calcite","Quartz","Illite","Dolomite"],visualize=True):
    models=upload_models(models_folder,minerals_set)
    for mineral in minerals_set:
        if mineral in dataframe.columns:
            dataframe=dataframe.drop(mineral)
        dataframe=preprocess(dataframe)
        dataframe=models[mineral].transform(dataframe)
        dataframe = dataframe.withColumn("probability_"+mineral,dataframe.probability)
        dataframe=dataframe.drop("prediction","probability","rawPrediction","features","features1","scaled_features")
    if visualize:
        for mineral in minerals_set:
            fig,ax=plt.subplots()
            df_visu=dataframe.toPandas()
            pd.to_numeric(df_visu["predicted_"+mineral]).plot()
            plt.title(mineral)
            ax.set_xlabel("depth")
            ax.set_ylabel("predictions")
            plt.show()

    result = dataframe#.toPandas()  
    
    return result