In [1]:
!pip install sparkmagic
!pip install pymongo
!pip install mlflow





In [2]:
import mlflow
import mlflow.spark
from mlflow import spark
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = (SparkSession.builder.config("spark.jars.packages", "org.mlflow.mlflow-spark").getOrCreate())

In [3]:
SparkContext.setSystemProperty('spark.executor.memory', '8g')
SparkContext.setSystemProperty('spark.driver.memory','3g')
SparkContext.setSystemProperty('spark.executor.heartbeatInterval','10000s')
SparkContext.setSystemProperty('spark.default.parallelism', '40')
SparkContext.setSystemProperty('spark.executor.cores', '2')
SparkContext.setSystemProperty("spark.executor.instances", '2')
SparkContext.setSystemProperty("spark.sql.shuffle.partitions", '40')


sc = SparkContext.getOrCreate();
sc.getConf().getAll()

[('spark.sql.shuffle.partitions', '40'),
 ('spark.files',
  'file:///Users/alkeshbharati/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-2.4.2.jar,file:///Users/alkeshbharati/.ivy2/jars/org.mongodb_mongo-java-driver-3.12.5.jar'),
 ('spark.app.id', 'local-1597442978919'),
 ('spark.jars',
  'file:///Users/alkeshbharati/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-2.4.2.jar,file:///Users/alkeshbharati/.ivy2/jars/org.mongodb_mongo-java-driver-3.12.5.jar'),
 ('spark.executor.id', 'driver'),
 ('spark.executor.cores', '2'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.driver.port', '61035'),
 ('spark.repl.local.jars',
  'file:///Users/alkeshbharati/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-2.4.2.jar,file:///Users/alkeshbharati/.ivy2/jars/org.mongodb_mongo-java-driver-3.12.5.jar'),
 ('spark.driver.memory', '3g'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.executor.instances', '2'),
 ('spark.jars.packages', 'org.mlfl

In [4]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/test.flight_test").load()
#df.repartition(40)
#print('partitions: ' + str(df.rdd.getNumPartitions()))
#print(type(df))


In [5]:
d = df.select("DAY_OF_MONTH", "DAY_OF_WEEK", "OP_CARRIER", "ORIGIN", "DEST", "DEP_DELAY", "ARR_DELAY" )
d = d.withColumn('DEP_DELAY', when(col('DEP_DELAY') == '', None).otherwise(col('DEP_DELAY')))
d = d.withColumn('ARR_DELAY', when(col('ARR_DELAY') == '', None).otherwise(col('ARR_DELAY')))

In [6]:
from pyspark.sql.functions import col
d = d.where(col("DEP_DELAY").isNotNull())
d = d.where(col("ARR_DELAY").isNotNull())

In [7]:
data = d.select("DAY_OF_MONTH", "DAY_OF_WEEK", "OP_CARRIER", "ORIGIN", "DEST", "DEP_DELAY", ((col("ARR_DELAY") > 15).cast("Int").alias("label")))
db=d.select( "OP_CARRIER", "ORIGIN", "DEST")
data.show(10)
from pyspark.sql.types import IntegerType
data = data.withColumn("DEP_DELAY", data["DEP_DELAY"].cast(IntegerType()))
data = data.withColumn("label", data["label"].cast(IntegerType()))

+------------+-----------+----------+------+----+---------+-----+
|DAY_OF_MONTH|DAY_OF_WEEK|OP_CARRIER|ORIGIN|DEST|DEP_DELAY|label|
+------------+-----------+----------+------+----+---------+-----+
|           1|          3|        WN|   ONT| SJC|       -4|    0|
|           1|          3|        WN|   ONT| SMF|       59|    1|
|           1|          3|        WN|   ONT| SMF|       -5|    0|
|           1|          3|        WN|   ONT| SMF|        0|    0|
|           1|          3|        WN|   ONT| SMF|       -3|    0|
|           1|          3|        WN|   ONT| SMF|       -5|    0|
|           1|          3|        WN|   ONT| SMF|       -4|    0|
|           1|          3|        WN|   ONT| SMF|        5|    0|
|           1|          3|        WN|   ORF| BWI|       60|    1|
|           1|          3|        WN|   ORF| BWI|       -1|    0|
+------------+-----------+----------+------+----+---------+-----+
only showing top 10 rows



In [8]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
#train_rows = train.count()
#test_rows = test.count()
#print("Training Rows:", train_rows, " Testing Rows:", test_rows)

In [9]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, VectorSlicer
from pyspark.ml.classification import  RandomForestClassifier

experiment = 'Flight_Prediction'
#mlflow.set_tracking_uri('file:/Users/alkeshbharati/Desktop/Flight_Prediction_MLflow')
mlflow.set_experiment(experiment)

with mlflow.start_run(run_name='First-Model'):
    encoding_var = [i[0] for i in db.dtypes if (i[1]=='string') & (i[0]!='y')]
    string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep') for c in encoding_var]
    #onehot_indexes = [OneHotEncoderEstimator(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c]) for c in encoding_var]
    #label_indexes = StringIndexer(inputCol = 'label', outputCol = 'label', handleInvalid = 'keep')
    assembler = VectorAssembler(inputCols = ["DAY_OF_MONTH" ,"DAY_OF_WEEK"] + ['IDX_' + c for c in encoding_var], outputCol = "catFeatures")
    catIdx = VectorIndexer(inputCol = assembler.getOutputCol(), outputCol = "idxCatFeatures")
    numVect = VectorAssembler(inputCols = ["DEP_DELAY"], outputCol="numFeatures")
    minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
    featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
    lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
    #rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed = 8464,
    #                            numTrees=10, cacheNodeIds = True, subsamplingRate = 0.7)
    pipeline = Pipeline(stages= string_indexes + [ assembler, catIdx, numVect, minMax, featVect, lr])
    piplineModel = pipeline.fit(train)
    prediction = piplineModel.transform(test)
    predicted = prediction.select("features", "prediction", "trueLabel")
    predicted.show(100, truncate=False)
    evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    aur = evaluator.evaluate(prediction)
    print ("AUR = ", aur)
    mlflow.log_param("dataset_shape", data.shape)
    mlflow.log_metric("AUR", aur)
    mlflow.spark.log_model(piplineModel, "Pre-model")
    mlflow.end_run()

+--------------------------------------------+----------+---------+
|features                                    |prediction|trueLabel|
+--------------------------------------------+----------+---------+
|[1.0,2.0,1.0,1.0,5.0,0.02526395173453997]   |0.0       |0        |
|[1.0,2.0,1.0,31.0,0.0,0.02564102564102564]  |0.0       |0        |
|[1.0,2.0,1.0,31.0,5.0,0.03506787330316742]  |0.0       |0        |
|[1.0,2.0,1.0,31.0,6.0,0.024509803921568627] |0.0       |1        |
|[1.0,2.0,1.0,55.0,17.0,0.027526395173453996]|0.0       |0        |
|[1.0,2.0,1.0,16.0,17.0,0.02526395173453997] |0.0       |0        |
|[1.0,2.0,1.0,62.0,3.0,0.023755656108597284] |0.0       |0        |
|[1.0,2.0,1.0,3.0,54.0,0.03619909502262444]  |0.0       |0        |
|[1.0,2.0,1.0,3.0,22.0,0.026772247360482653] |0.0       |0        |
|[1.0,2.0,1.0,3.0,42.0,0.026018099547511313] |0.0       |0        |
|[1.0,2.0,1.0,3.0,19.0,0.027149321266968326] |0.0       |0        |
|[1.0,2.0,1.0,3.0,86.0,0.024509803921568627] |0.

AttributeError: 'DataFrame' object has no attribute 'shape'

In [10]:
with mlflow.start_run(run_name='Final-Model'):
    paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, 
                                                                                            [0.4, 0.3]).build()
    cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, 
                    numFolds=2)
    mlflow.log_param("Threshold", 0.4)
    mlflow.log_param("No of Iteration", 10)
    mlflow.log_param("regularization", 0.3)
    model = cv.fit(train)
    newPrediction = model.transform(test)
    # Recalculate the Area Under ROC
    evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
    aur2 = evaluator.evaluate(prediction)
    print( "AUR2 = ", aur2)
    pip_model = model.bestModel
    mlflow.spark.save_model(pip_model, "/Users/alkeshbharati/Desktop/Flight_Prediction_MLflow")
    mlflow.log_metric("AUR", aur2)
    mlflow.spark.log_model(pip_model, "Final-model")
    mlflow.end_run()

AUR2 =  0.9188037667594018
