In [1]:
import bus_times
import os
import define
#import analyze
import prepare
import feature_selection
import evaluate
import tools

from pyspark.ml.feature import StringIndexer
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.sql.types import *

#name = "datasets/buses_10000_filtered.csv"
#name = "hdfs://King:9000/user/bdata/cern/hepmass_2000000_report.csv"
current_path = os.getcwd()


data_path = os.path.join(current_path, 'hepmass.csv')
response = "label"
#cluster_manager = 'yarn'
cluster_manager = 'local[*]'

spark_session = SparkSession.builder \
.master(cluster_manager)\
.appName("Sparkmach") \
.config("spark.driver.allowMultipleContexts", "true")\
.getOrCreate()


spark_session.sparkContext.addPyFile(os.path.join(current_path, 'define.py'))
spark_session.sparkContext.addPyFile(os.path.join(current_path, 'prepare.py'))
spark_session.sparkContext.addPyFile(os.path.join(current_path, 'feature_selection.py'))
spark_session.sparkContext.addPyFile(os.path.join(current_path, 'evaluate.py'))
spark_session.sparkContext.addPyFile(os.path.join(current_path, 'tools.py'))
##### Benchmark starting #####
print('Benchmark starting:')

list_n_samples = [10, 100, 1000]
list_n_features = [5, 10, 15]
print('List number of samples:', list_n_samples)
print('List number of features:', list_n_features)

for n_samples in list_n_samples:
    for n_features in list_n_features:
        
        ##### Generate the dataframe #####
        print('Generating the binary labeled dataframe: shape:', n_samples, n_features)
        df = tools.generate_dataframe(spark_session, n_samples=n_samples, 
                                      n_features=n_features, seed=42)
        # df.show(3)

        ##### Run the models #####
        print('Running the models')

        # STEP 0: Define workflow parameters
        #definer = define.Define(spark_session, data_path=data_path, response=response).pipeline()
        definer = define.Define(spark_session, df=df, response='response').pipeline()

        # STEP 1: Analyze data by ploting it
        #analyze.Analyze(definer).pipeline()

        # STEP 2: Prepare data by scaling, normalizing, etc. 
        preparer = prepare.Prepare(definer).pipeline()

        #STEP 3: Feature selection
        featurer = feature_selection.FeatureSelection(definer).pipeline()

        #STEP4: Evalute the algorithms by using the pipelines
        evaluator = evaluate.Evaluate(definer, preparer, featurer).pipeline()

Benchmark starting:
List number of samples: [10, 100, 1000]
List number of features: [5, 10, 15]
Generating the binary labeled dataframe: shape: 10 5
Running the models
Model:  LogisticRegression
Model:  RandomForestClassifier
Model:  DecisionTreeClassifier
Model:  GBTClassifier
Model:  LinearSVC
+----------------------+-----+-------------------+
|Model                 |Score|Time               |
+----------------------+-----+-------------------+
|DecisionTreeClassifier|0.75 |0.07020989656448365|
|GBTClassifier         |0.75 |0.37749836047490437|
|LogisticRegression    |0.25 |0.22561467091242474|
|RandomForestClassifier|0.25 |0.08697145779927572|
|LinearSVC             |0.25 |0.08187240759531657|
|Total time            |0.0  |0.8562029065688452 |
+----------------------+-----+-------------------+

Generating the binary labeled dataframe: shape: 10 10
Running the models
Model:  LogisticRegression
Model:  RandomForestClassifier


KeyboardInterrupt: 

In [2]:
0.11530292828877767 + 0.23404855330785115

0.3493514815966288

In [4]:
definer.da

# Generate data

In [29]:
from pyspark.sql.functions import rand, randn, when

def generate_dataframe(n_samples, n_features, seed=42):
    df = spark_session.range(0, n_samples)
    t = [when(df['id'] < n_samples/2, 0).otherwise(1).alias("response")]
    t += [rand(seed=seed+i).alias("f_"+str(i)) for i in range(n_features)]
    df = df.select(t)

    return df

In [31]:
df = generate_dataframe(n_samples=10, n_features=3, seed=42)
df.show()

+--------+-------------------+-------------------+-------------------+
|response|                f_0|                f_1|                f_2|
+--------+-------------------+-------------------+-------------------+
|       0| 0.6661236774413726| 0.3856203005100328|0.47129200262114224|
|       0| 0.3856203005100328|0.47129200262114224|0.46779322384305533|
|       0|0.47129200262114224|0.46779322384305533| 0.8244413413402464|
|       0|0.46779322384305533| 0.8244413413402464|0.26547180193280995|
|       0| 0.8826789945738182| 0.5750003757719865|0.22508188677695096|
|       1| 0.8244413413402464|0.26547180193280995| 0.8642043127063618|
|       1|0.26547180193280995| 0.8642043127063618| 0.8401446809670715|
|       1| 0.8642043127063618| 0.8401446809670715| 0.5665388805573012|
|       1| 0.8401446809670715| 0.5665388805573012|  0.748698453025968|
|       1| 0.3562475874559726| 0.9021459462713273| 0.5736208594213481|
+--------+-------------------+-------------------+-------------------+

