In [1]:
#dbutils.library.installPyPI("koalas")
#dbutils.library.restartPython()

In [2]:
def run_linearreg(data, rows, cols):
  import time
  start_time = float(time.time())
  from pyspark.ml.feature import VectorAssembler
  X_cols = []
  for i in range (2,cols+1):
    X_cols.append('_' + str(i))
  data_vectorized = VectorAssembler(inputCols=X_cols, outputCol="features" ).transform(data)
  train, test = data_vectorized.randomSplit([1.0, 0.0])

  from pyspark.ml.regression import LinearRegression

  lr = LinearRegression(maxIter=1, regParam=0.3, elasticNetParam=0.8)
  lr.setLabelCol('_1')
  lr.setFeaturesCol("features")

  # Train a model!
  lrModel = lr.fit(train)
  end_time = float(time.time())
  return end_time-start_time


In [3]:
def run_logregclass(data, rows, cols):
  import time
  from pyspark.sql.functions import col
  import pyspark.sql.functions as func
  data2 = data.withColumn("_1", func.round(data["_1"],0))
  
  start_time = float(time.time())
  from pyspark.ml.feature import VectorAssembler
  X_cols = []
  for i in range (2,cols+1):
    X_cols.append('_' + str(i))
  data_vectorized = VectorAssembler(inputCols=X_cols, outputCol="features" ).transform(data2)
  train, test = data_vectorized.randomSplit([1.0, 0.0])

  from pyspark.ml.classification import LogisticRegression

  lr = LogisticRegression(maxIter=1, regParam=0.3, elasticNetParam=0.8)
  lr.setLabelCol('_1')
  lr.setFeaturesCol("features")

  # Train a model!
  lrModel = lr.fit(train)
  end_time = float(time.time())
  return end_time-start_time





In [4]:
def run_dectreeclass(data, rows, cols):
  import time
  from pyspark.sql.functions import col
  import pyspark.sql.functions as func
  data2 = data.withColumn("_1", func.round(data["_1"],0))
  
  start_time = float(time.time())
  from pyspark.ml.feature import VectorAssembler
  X_cols = []
  for i in range (2,cols+1):
    X_cols.append('_' + str(i))
  data_vectorized = VectorAssembler(inputCols=X_cols, outputCol="features" ).transform(data2)
  train, test = data_vectorized.randomSplit([1.0, 0.0])

  from pyspark.ml.classification import DecisionTreeClassifier

  dtc = DecisionTreeClassifier(maxDepth=10)
  dtc.setLabelCol('_1')
  dtc.setFeaturesCol("features")

  # Train a model!
  dtcModel = dtc.fit(train)
  end_time = float(time.time())
  return end_time-start_time


In [5]:
def run_dectreereg(data, rows, cols):
  import time
  start_time = float(time.time())
  from pyspark.ml.feature import VectorAssembler
  X_cols = []
  for i in range (2,cols+1):
    X_cols.append('_' + str(i))
  data_vectorized = VectorAssembler(inputCols=X_cols, outputCol="features" ).transform(data)
  train, test = data_vectorized.randomSplit([1.0, 0.0])

  from pyspark.ml.regression import DecisionTreeRegressor

  dtr = DecisionTreeRegressor(maxDepth=10)
  dtr.setLabelCol('_1')
  dtr.setFeaturesCol("features")

  # Train a model!
  dtrModel = dtr.fit(train)
  end_time = float(time.time())
  return end_time-start_time


In [6]:
def run_naivebayesclass(data, rows, cols):
  import time
  from pyspark.sql.functions import col
  import pyspark.sql.functions as func
  data2 = data.withColumn("_1", func.round(data["_1"],0))
  
  start_time = float(time.time())
  from pyspark.ml.feature import VectorAssembler
  X_cols = []
  for i in range (2,cols+1):
    X_cols.append('_' + str(i))
  data_vectorized = VectorAssembler(inputCols=X_cols, outputCol="features" ).transform(data2)
  train, test = data_vectorized.randomSplit([1.0, 0.0])

  from pyspark.ml.classification import NaiveBayes

  nbc = NaiveBayes(smoothing=1.0, modelType="multinomial")
  nbc.setLabelCol('_1')
  nbc.setFeaturesCol("features")

  # Train a model!
  nbcModel = nbc.fit(train)
  end_time = float(time.time())
  return end_time-start_time


In [7]:
from pyspark.mllib.random import RandomRDDs
rows_list = []
import time
import databricks.koalas as ks


for rows in range(100000,1000001,100000):
  for cols in range(100,1001,100):
    print("generating a dataframe of {} rows x {} cols".format(rows,cols))
    data  = RandomRDDs.uniformVectorRDD(sc, rows,cols).map(lambda a : a.tolist()).toDF()
    print("running on dataframe of {} rows x {} cols".format(rows,cols))

    t = run_linearreg(data, rows, cols)
    print('linreg executed in {} seconds'.format(round(t,3)))
    result = {"model":"linearreg","modeltype":"regression","rows": rows, "cols": cols, "time": t, "nodes":2, "lib":"mllib" }
    rows_list.append(result)
    time.sleep(10)

#    t = run_dectreereg(data, rows, cols)
#    print('dectreereg executed in {} seconds'.format(round(t,3)))
#    result = {"model":"dectree","modeltype":"regression","rows": rows, "cols": cols, "time": t, "nodes":2, "lib":"mllib" }
#    rows_list.append(result)
#    time.sleep(10)
    
    t = run_logregclass(data, rows, cols)
    print('logregclass executed in {} seconds'.format(round(t,3)))
    result = {"model":"logreg","modeltype":"classification","rows": rows, "cols": cols, "time": t, "nodes":2, "lib":"mllib" }
    rows_list.append(result)
    time.sleep(10)

#    t = run_dectreeclass(data, rows, cols)
#    print('dectreeclass executed in {} seconds'.format(round(t,3)))
#    result = {"model":"dectree","modeltype":"classification","rows": rows, "cols": cols, "time": t, "nodes":2, "lib":"mllib" }
#    rows_list.append(result)
#    time.sleep(10)
    
    t = run_naivebayesclass(data, rows, cols)
    print('naivebayesclass executed in {} seconds'.format(round(t,3)))
    result = {"model":"naivebayes","modeltype":"classification","rows": rows, "cols": cols, "time": t, "nodes":2, "lib":"mllib" }
    rows_list.append(result)
    time.sleep(10)

    df = ks.DataFrame(rows_list, columns=['model','modeltype','nodes','lib','rows', 'cols', 'time'])
    df.to_csv("/FileStore/tables/mlib_2n_nodectree_20200308.csv")    
    
print(rows_list)
    
    

In [8]:
import pandas as pd
df = pd.DataFrame(rows_list, columns=['model','modeltype','nodes','lib','rows', 'cols', 'time']) 
display(df)

model,modeltype,nodes,lib,rows,cols,time
linearreg,regression,2,mllib,100000,100,9.7911696434021
logreg,classification,2,mllib,100000,100,2.5348896980285645
naivebayes,classification,2,mllib,100000,100,3.220440626144409
linearreg,regression,2,mllib,100000,200,7.452404499053955
logreg,classification,2,mllib,100000,200,2.8204562664031982
naivebayes,classification,2,mllib,100000,200,2.4518041610717773
linearreg,regression,2,mllib,100000,300,10.303661108016968
logreg,classification,2,mllib,100000,300,4.182396411895752
naivebayes,classification,2,mllib,100000,300,3.5619900226593018
linearreg,regression,2,mllib,100000,400,14.174526929855348
