In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("PipelineOps") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

In [2]:
df = spark.read \
.option("header","True") \
.option("inferSchema","True") \
.option("sep",",") \
.csv("C:/Users/Rulokat/Desktop/GitHub/apache-spark/Mllib-ApacheSpark/adult_preprocessed.csv") \

In [3]:
df.toPandas().head()

Unnamed: 0,workclass,education,education_merged,marital_status,occupation,relationship,race,sex,native_country,age,fnlwgt,education_num,capital_gain,capital_loss,hours_per_week,output
0,State-gov,Bachelors,Under-Graduate,Never-married,Adm-clerical,Not-in-family,White,Male,United-States,39,77516.0,13.0,2174.0,0.0,40.0,<=50K
1,Self-emp-not-inc,Bachelors,Under-Graduate,Married-civ-spouse,Exec-managerial,Husband,White,Male,United-States,50,83311.0,13.0,0.0,0.0,13.0,<=50K
2,Private,HS-grad,HS-grad,Divorced,Handlers-cleaners,Not-in-family,White,Male,United-States,38,215646.0,9.0,0.0,0.0,40.0,<=50K
3,Private,11th,High-School,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,United-States,53,234721.0,7.0,0.0,0.0,40.0,<=50K
4,Private,Bachelors,Under-Graduate,Married-civ-spouse,Prof-specialty,Wife,Black,Female,Cuba,28,338409.0,13.0,0.0,0.0,40.0,<=50K


### Data Preprocessing

In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler

### StringIndexer

In [5]:
workclass_indexer = StringIndexer() \
.setInputCol("workclass") \
.setOutputCol("workclass_index") \
.setHandleInvalid("skip")

In [6]:
education_merged_indexer = StringIndexer() \
.setInputCol("education_merged") \
.setOutputCol("education_merged_index") \
.setHandleInvalid("skip")

In [7]:
marital_status_indexer = StringIndexer() \
.setInputCol("marital_status") \
.setOutputCol("marital_status_index") \
.setHandleInvalid("skip")

In [8]:
occupation_indexer = StringIndexer() \
.setInputCol("occupation") \
.setOutputCol("occupation_index") \
.setHandleInvalid("skip")

In [9]:
relationship_indexer = StringIndexer() \
.setInputCol("relationship") \
.setOutputCol("relationship_index") \
.setHandleInvalid("skip")

In [10]:
race_indexer = StringIndexer() \
.setInputCol("race") \
.setOutputCol("race_index") \
.setHandleInvalid("skip")

In [11]:
native_country_indexer = StringIndexer() \
.setInputCol("native_country") \
.setOutputCol("native_country_index") \
.setHandleInvalid("skip")

### OneHotEncoder

In [12]:
encoder = OneHotEncoderEstimator() \
.setInputCols(["workclass_index","education_merged_index","marital_status_index","occupation_index","relationship_index",
               "race_index", "native_country_index"]) \
.setOutputCols(["workclass_index_enc","education_merged_index_enc","marital_status_index_enc","occupation_index_enc",
                "relationship_index_enc", "race_index_enc", "native_country_index_enc"])

In [13]:
all_features = ["workclass_index_enc","education_merged_index_enc","marital_status_index_enc","occupation_index_enc",
                "relationship_index_enc", "race_index_enc", "native_country_index_enc","age","fnlwgt","education_num","capital_gain","capital_loss","hours_per_week"]

In [14]:
all_features

['workclass_index_enc',
 'education_merged_index_enc',
 'marital_status_index_enc',
 'occupation_index_enc',
 'relationship_index_enc',
 'race_index_enc',
 'native_country_index_enc',
 'age',
 'fnlwgt',
 'education_num',
 'capital_gain',
 'capital_loss',
 'hours_per_week']

In [15]:
assembler = VectorAssembler() \
.setInputCols(all_features) \
.setOutputCol("vectorized_features")

### Label Indexer

In [16]:
label_indexer = StringIndexer() \
.setInputCol("output") \
.setOutputCol("label")

### Feature Scaling

In [17]:
scaler = StandardScaler() \
.setInputCol("vectorized_features") \
.setOutputCol("features")

### Classifier

In [18]:
from pyspark.ml.classification import LogisticRegression

In [19]:
lr_object = LogisticRegression() \
.setFeaturesCol("features") \
.setLabelCol("label") \
.setPredictionCol("prediction")

### Data Split

In [20]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=142)

### Pipeline

In [21]:
from pyspark.ml import Pipeline

In [22]:
pipeline_object = Pipeline() \
.setStages([workclass_indexer, education_merged_indexer, marital_status_indexer, occupation_indexer, relationship_indexer, 
            race_indexer, native_country_indexer, encoder, assembler, label_indexer, scaler, lr_object])

In [23]:
pipeline_model = pipeline_object.fit(train_df)

In [24]:
result_df = pipeline_model.transform(test_df).select("label","prediction")
result_df.toPandas().head()

Unnamed: 0,label,prediction
0,0.0,0.0
1,0.0,1.0
2,0.0,0.0
3,0.0,0.0
4,0.0,0.0


In [25]:
from sklearn.metrics import accuracy_score

In [26]:
y_pred = result_df.select("prediction").toPandas()
y_true = result_df.select("label").toPandas()

In [27]:
accuracy_score(y_true, y_pred)

0.8471119929453262