<a href="https://colab.research.google.com/github/beejhay31/ML-With-Apache/blob/main/Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark



In [2]:
import pyspark

In [3]:
pyspark.__version__

'3.2.0'

In [4]:
"""
Loading important package of spark 
"""
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline,PipelineModel
from pyspark.sql.functions import *
from pyspark.ml.pipeline import Transformer,Estimator
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName('Titanic Data') \
    .getOrCreate()

In [6]:
"""
Load data function for loading data..
@param - 
        path - path of file
        header_value - header value, incase true first row will be header
        
@return - dataframe of loaded intended data.
"""

def load_data(path,header_value):
  df = spark.read.csv(path,inferSchema=True,header=header_value)
  return df
df = load_data('train.csv',True) 
df_test = load_data('test.csv',True)


In [7]:
'''
Custom Transformer class for tranformation implementation .

@param - 
       Transformer - Transformer class refrence 
       df - dataframe in which operation need to be carried ( passed through tranform function)
       A - A class for variable sharing.

@return -
       df - a dataframe which contains prediction value as well with featured value. 

'''

class preprocess_transform(Transformer):
  
    def _transform(self,df):
      print("********************************  in Transform method ...************************************")
      
      
      """
      Generate feature column in dataframe based on specific logic

      @param - 
               df - dataframe for operation.

      @return - 
               df - dataframe with generated feature.
      """
      
      
      def feature_generation(self,df):
        df = df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
        df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
                        ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])
        df = df.withColumn("Family_Size",col('SibSp')+col('Parch'))
        #df = df.withColumn('Alone',lit(0))
        #df = df.withColumn("Alone",when(df["Family_Size"] ==0, 1).otherwise(df["Alone"]))
        return df


      """
      Impute Age based on Age mean of specific gender. ex for male mean is 46 update all null male row with 46, similarly for others

      @param - 
              df - dataframe for operation

      @return -
             df - with imputed value

      """
  
      def Age_impute(self,df):
        Age_mean = df.groupBy("Initial").avg('Age')
        Age_mean = Age_mean.withColumnRenamed('avg(Age)','mean_age')
        Initials_list = Age_mean.select("Initial").rdd.flatMap(lambda x: x).collect()
        Mean_list = Age_mean.select("mean_age").rdd.flatMap(lambda x: x).collect()
        for i,j in zip(Initials_list,Mean_list):
            df = df.withColumn("Age",when((df["Initial"] == i) & (df["Age"].isNull()), j).otherwise(df["Age"]))

        return df
        
        
      """
      Impute Embark based on mode of embark column
      @param - 
              df - dataframe for operation

      @return -
             df - with imputed value

      """
      def Embark_impute(self,df):
        mode_value = df.groupBy('Embarked').count().sort(col('count').desc()).collect()[0][0]
        df = df.fillna({'Embarked':mode_value})
        return df
      
      
      """
      Impute Fare based on the class which he/she had sat ex: class 3rd has mean fare 9 and null fare belong to 3rd class so fill 9
      @param - 
              df - dataframe for operation

      @return -
             df - with imputed value

      """
      def Fare_impute(self,df):
        Select_pclass = df.filter(col('Fare').isNull()).select('Pclass')
        if Select_pclass.count() > 0:
          Pclass = Select_pclass.rdd.flatMap(lambda x: x).collect()
          for i in Pclass:
            mean_pclass_fare = df.groupBy('Pclass').mean().select('Pclass','avg(Fare)').filter(col('Pclass')== i).collect()[0][1]
            df = df.withColumn("Fare",when((col('Fare').isNull()) & (col('Pclass') == i),mean_pclass_fare).otherwise(col('Fare')))
        return df
      
      
      '''
      combining all column imputation together..

      @param - 
            df - a dataframe for operation.

      @return - 
            df - dataframe with imputed value.

      '''
      def all_impute_together(df):
        df = Age_impute(self,df)
        df = Embark_impute(self,df)
        df = Fare_impute(self,df)
        return df
      
      
      '''
      converting string to numeric values.

      @param - 
               df - dataframe contained all columns.
               col_list - list of column need to be 

      @return - 
              df - transformed dataframe.
      '''
      def stringToNumeric_conv(df,col_list):
        indexer = [StringIndexer(inputCol=column,outputCol=column+"_index").fit(df) for column in col_list]
        string_change_pipeline = Pipeline(stages=indexer)
        df = string_change_pipeline.fit(df).transform(df)
        return df

      
      """
      Drop column from dataframe
      @param -
             df - dataframe 
             col_name - name of column which need to be dropped.
      @return -
             df - a dataframe except dropped column
      """
      def drop_column(df,col_list):
        for i in col_list:
            df = df.drop(col(i))
        return df
      
      
      col_list = ["Sex","Embarked", "Initial"]
      dataset = feature_generation(self,df)
      df_impute = all_impute_together(dataset)
      df_numeric = stringToNumeric_conv(df_impute,col_list)
      df_final = drop_column(df_numeric,['Cabin','Name','Ticket','Family_Size','SibSp','Parch','Sex','Embarked', 'Initial'])
      return df_final

In [8]:
pip install mlflow



In [9]:
import mlflow

In [10]:
from pyspark.ml.classification import GBTClassifier 
from pyspark.ml.classification import RandomForestClassifier 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator  

# initialization for pipeline setup 
my_model = preprocess_transform() 
df = my_model.transform(df) 
feature = VectorAssembler(inputCols=['Pclass','Age','Fare','Sex_index','Embarked_index'],outputCol="features")  
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10) 

''' pipeline stages initilization , fit and transform. '''

pipeline = Pipeline(stages=[feature,rf])  
model = pipeline.fit(df)  
paramGrid = ParamGridBuilder().addGrid(rf.numTrees,[100,300]).build()  
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")  
crossval = CrossValidator(estimator=pipeline,                           estimatorParamMaps=paramGrid,                        evaluator=evaluator,numFolds=3)  
# use 3+ folds in practice  # Run cross-validation, and choose the best set of parameters. 
cvModel = crossval.fit(df)
df_test = my_model.transform(df_test) 
prediction = cvModel.transform(df_test)   
mlflow.spark.log_model(model, "spark-model16") 
mlflow.spark.save_model(model, "spark-model_test") 

********************************  in Transform method ...************************************
********************************  in Transform method ...************************************


In [15]:
from flask import Flask
#from sklearn.externals import joblib
from flask import Flask, jsonify, request
import pandas as pd
import pyspark
from pyspark.ml import Pipeline,PipelineModel
#import preprocess_file
#import train_model
from pyspark.sql import SparkSession
from pyspark.sql import Row,SQLContext
from collections import OrderedDict

app = Flask(__name__)

st = SparkSession \
        .builder \
        .appName('Titanic') \
        .getOrCreate()

sqlContext = SQLContext(st.sparkContext)

@app.route('/')
def home():
    return render_template("home.html")

@app.route('/')
def train_model_fun():
    train_model.training_stage()
    return 'model_train is done'

@app.route('/predict',methods=['POST'])
def predict_func():
    test_data = request.get_json()
    
    # test_df = pd.DataFrame.from_dict(,orient='index')
    # test_df = test_df.T
    new_df = sqlContext.read.json(st.sparkContext.parallelize(test_data))
    model = PipelineModel.load('spark-model_test')
    process_data = preprocess_transform().transform(new_df)
    predicted_data = model.transform(process_data)
    #print("predicted_data",predicted_data.show())
    print("my prediction",predicted_data.select("PassengerId","prediction").show())
    output = {'PassengerId':predicted_data.select("PassengerId").collect()[0][0],'Survived':predicted_data.select("prediction").collect()[0][0]}
    print(output)
    return jsonify(output)

cols = ['age', 'sex', 'bmi', 'children', 'smoker', 'region']

@app.route('/')
def home():
    return render_template("home.html")

@app.route('/predict',methods=['POST'])
def predict():
    int_features = [x for x in request.form.values()]
    final = np.array(int_features)
    data_unseen = pd.DataFrame([final], columns = cols)
    prediction = predict_model(model, data=data_unseen, round = 0)
    prediction = int(prediction.Label[0])
    return render_template('home.html',pred='Expected Bill will be {}'.format(prediction))

@app.route('/predict_api',methods=['POST'])
def predict_api():
    data = request.get_json(force=True)
    data_unseen = pd.DataFrame([data])
    prediction = predict_model(model, data=data_unseen)
    output = prediction.Label[0]
    return jsonify(output)

if __name__ == '__main__':
    app.run(debug=True)




In [28]:
if __name__ == "__main__":
    app.run(debug=True)

 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: on


OSError: ignored