In [1]:
import findspark
findspark.init( '/usr/local/spark' )
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
def list_dataframes() :
    from pyspark.sql import DataFrame
    return [ k for ( k, v ) in globals().items() if isinstance( v, DataFrame ) ]

##### Creating a spark session

In [3]:
Myspark = SparkSession.builder.master( 'local' ).appName( 'Features' ).getOrCreate()
print( Myspark.version )

2.4.0


##### Reading the file

In [4]:
DATA = Myspark.read.csv( '/home/demetrius/Documents/GitHub/feature-selection-spark/data.csv', 
                        inferSchema=True, header=True )

In [5]:
DATA.dtypes

[('label', 'double'),
 ('a1', 'int'),
 ('a2', 'double'),
 ('b1', 'int'),
 ('b2', 'double'),
 ('c1', 'double'),
 ('c2', 'double'),
 ('c3', 'double'),
 ('d1', 'double'),
 ('d2', 'double'),
 ('d3', 'double')]

In [6]:
DATA.show( 1, vertical=True )

-RECORD 0---------------------
 label | 0.05505462852698298  
 a1    | 1                    
 a2    | 1.45847822002572     
 b1    | 0                    
 b2    | -1.529300753353182   
 c1    | 0.1574185974126949   
 c2    | 1.8302946297668106   
 c3    | -0.08012381649940253 
 d1    | -0.5668537223849737  
 d2    | 2.400231562805219    
 d3    | 3.2270143265957785   
only showing top 1 row



In [8]:
groups = 12

In [9]:
leaf = round( DATA.count() / groups )

In [10]:
print( leaf )

833


In [11]:
featurenames = DATA.columns[ 1: ]

In [12]:
vector = VectorAssembler( inputCols=featurenames, outputCol='features' )

In [13]:
tree = RandomForestRegressor( numTrees=1, minInstancesPerNode=leaf, 
                             maxDepth=30, seed=333 )

In [14]:
pipe = Pipeline( stages=[ vector, tree ] )

In [20]:
model = pipe.fit( DATA )

In [18]:
evaluator = RegressionEvaluator( metricName='mae' )

In [21]:
evaluator.evaluate( model.transform( DATA ) )

0.7150469803437026

In [22]:
model.stages[ 1 ].featureImportances.indices

array([4, 9], dtype=int32)

In [8]:
from pyspark.ml.regression import RandomForestRegressor

In [None]:
Myspark.stop()