In [47]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
import pyspark.sql.functions as fn
import pyspark.ml.feature as ml

In [48]:
spark=SparkSession.builder.appName("Logistic Reggression").getOrCreate()

In [49]:
spark_df=spark.read.csv("/home/hdoop/Downloads/classification_2.csv",inferSchema=True,header=True)

                                                                                

In [50]:
spark_df.describe().show()

[Stage 104:>                                                        (0 + 1) / 1]

+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+-----------+------+
|summary|               Age|  Work_Class|      Final_Weight|    Education| Education_Number|Marital_Status|       Occupation|Relationship|              Race |    Sex|      Capital_Gain|    Capital_Loss|    Hours_per_Week|    Country|Income|
+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+-----------+------+
|  count|             32561|       32561|             32561|        32561|            32561|         32561|            32561|       32561|              32561|  32561|             32561|           32561|             32561|      32561| 32561|
|   mean| 38.58164675532078|        

                                                                                

In [51]:
spark_df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Work_Class: string (nullable = true)
 |-- Final_Weight: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Education_Number: integer (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Race : string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Capital_Gain: integer (nullable = true)
 |-- Capital_Loss: integer (nullable = true)
 |-- Hours_per_Week: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Income: string (nullable = true)



In [52]:
spark_df.show()

+---+-----------------+------------+-------------+----------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|Age|       Work_Class|Final_Weight|    Education|Education_Number|      Marital_Status|        Occupation|  Relationship|              Race |    Sex|Capital_Gain|Capital_Loss|Hours_per_Week|       Country|Income|
+---+-----------------+------------+-------------+----------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov|       77516|    Bachelors|              13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc|       83311|    Bachelors|              13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|  

<h2>PreProcessing

Removing The Null Value

In [53]:
print(spark_df.columns)

['Age', 'Work_Class', 'Final_Weight', 'Education', 'Education_Number', 'Marital_Status', 'Occupation', 'Relationship', 'Race ', 'Sex', 'Capital_Gain', 'Capital_Loss', 'Hours_per_Week', 'Country', 'Income']


In [54]:
# Here the values are "?"
for col in spark_df.columns:
    try:
        spark_df=spark_df.filter(~fn.col(col).isin("?"))
    except:
        continue
spark_df.show()

+---+-----------------+------------+-------------+----------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|Age|       Work_Class|Final_Weight|    Education|Education_Number|      Marital_Status|        Occupation|  Relationship|              Race |    Sex|Capital_Gain|Capital_Loss|Hours_per_Week|       Country|Income|
+---+-----------------+------------+-------------+----------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov|       77516|    Bachelors|              13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc|       83311|    Bachelors|              13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|  

<h3> Encoding The Classification Columns

In [55]:
colums=list(spark_df.columns)
colums=['Work_Class','Education','Occupation','Country']
print(colums)
indexer=[ml.StringIndexer(inputCol=col,outputCol=col+"_encoded") for col in colums]    


['Work_Class', 'Education', 'Occupation', 'Country']


Fitting in the data

In [56]:
from  pyspark.ml import Pipeline
pipeline=Pipeline(stages=indexer)
new_data=pipeline.fit(spark_df).transform(spark_df)
new_data.show()

                                                                                

+---+-----------------+------------+-------------+----------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+------------------+-----------------+------------------+---------------+
|Age|       Work_Class|Final_Weight|    Education|Education_Number|      Marital_Status|        Occupation|  Relationship|              Race |    Sex|Capital_Gain|Capital_Loss|Hours_per_Week|       Country|Income|Work_Class_encoded|Education_encoded|Occupation_encoded|Country_encoded|
+---+-----------------+------------+-------------+----------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+------------------+-----------------+------------------+---------------+
| 39|        State-gov|       77516|    Bachelors|              13|       Never-married|      Adm-clerical| Not-in-family|              White|

<h3>Scalling the Numeric Data

In [57]:
feature_column = ["Age","Hours_per_Week", "Capital_Gain"]

assemblers = [ml.VectorAssembler(inputCols=[col], outputCol=col + "_vector") for col in feature_column]
scalers = [ml.MinMaxScaler(inputCol=col + "_vector", outputCol=col + "_scaled") for col in feature_column]
pipeline = Pipeline(stages=assemblers+scalers)
scalerModel = pipeline.fit(new_data)
new_data = scalerModel.transform(new_data)

new_data.show()

                                                                                

+---+-----------------+------------+-------------+----------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+------------------+-----------------+------------------+---------------+----------+---------------------+-------------------+--------------------+---------------------+--------------------+
|Age|       Work_Class|Final_Weight|    Education|Education_Number|      Marital_Status|        Occupation|  Relationship|              Race |    Sex|Capital_Gain|Capital_Loss|Hours_per_Week|       Country|Income|Work_Class_encoded|Education_encoded|Occupation_encoded|Country_encoded|Age_vector|Hours_per_Week_vector|Capital_Gain_vector|          Age_scaled|Hours_per_Week_scaled| Capital_Gain_scaled|
+---+-----------------+------------+-------------+----------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+---

Replacing the columns

In [58]:
new_data=new_data.withColumn("Work_Class",fn.col("Work_Class_encoded"))
new_data=new_data.withColumn("Education",fn.col("Education_encoded"))
new_data=new_data.withColumn("Occupation",fn.col("Occupation_encoded"))
new_data=new_data.withColumn("Country",fn.col("Country_encoded"))
new_data=new_data.withColumn("Hours_per_Week",fn.col("Hours_per_Week_scaled"))
new_data=new_data.withColumn("Age",fn.col("Age_scaled"))
new_data=new_data.withColumn("Capital_Gain",fn.col("Capital_Gain_scaled"))


In [59]:
print(new_data.columns)

['Age', 'Work_Class', 'Final_Weight', 'Education', 'Education_Number', 'Marital_Status', 'Occupation', 'Relationship', 'Race ', 'Sex', 'Capital_Gain', 'Capital_Loss', 'Hours_per_Week', 'Country', 'Income', 'Work_Class_encoded', 'Education_encoded', 'Occupation_encoded', 'Country_encoded', 'Age_vector', 'Hours_per_Week_vector', 'Capital_Gain_vector', 'Age_scaled', 'Hours_per_Week_scaled', 'Capital_Gain_scaled']


In [60]:
new_data.show()

+--------------------+----------+------------+---------+----------------+--------------------+----------+--------------+-------------------+-------+--------------------+------------+--------------------+-------+------+------------------+-----------------+------------------+---------------+----------+---------------------+-------------------+--------------------+---------------------+--------------------+
|                 Age|Work_Class|Final_Weight|Education|Education_Number|      Marital_Status|Occupation|  Relationship|              Race |    Sex|        Capital_Gain|Capital_Loss|      Hours_per_Week|Country|Income|Work_Class_encoded|Education_encoded|Occupation_encoded|Country_encoded|Age_vector|Hours_per_Week_vector|Capital_Gain_vector|          Age_scaled|Hours_per_Week_scaled| Capital_Gain_scaled|
+--------------------+----------+------------+---------+----------------+--------------------+----------+--------------+-------------------+-------+--------------------+------------+--

<h4>Selecting the necessary columns for model 

In [61]:
columns=["Age","Work_Class","Education","Occupation","Country","Hours_per_Week","Capital_Loss","Capital_Gain","Income"]
processed_data=new_data.select(*[col for col in columns])

In [62]:
processed_data.show()

+--------------------+----------+---------+----------+-------+--------------------+------------+--------------------+------+
|                 Age|Work_Class|Education|Occupation|Country|      Hours_per_Week|Capital_Loss|        Capital_Gain|Income|
+--------------------+----------+---------+----------+-------+--------------------+------------+--------------------+------+
|[0.3013698630136986]|       4.0|      2.0|       3.0|    0.0|[0.39795918367346...|           0|[0.02174021740217...| <=50K|
|[0.4520547945205479]|       1.0|      2.0|       2.0|    0.0|[0.12244897959183...|           0|               [0.0]| <=50K|
|[0.2876712328767123]|       0.0|      0.0|       9.0|    0.0|[0.39795918367346...|           0|               [0.0]| <=50K|
|[0.4931506849315068]|       0.0|      5.0|       9.0|    0.0|[0.39795918367346...|           0|               [0.0]| <=50K|
|[0.1506849315068493]|       0.0|      2.0|       0.0|    9.0|[0.39795918367346...|           0|               [0.0]| <=50K|


Replacing Income By 0 and 1

In [63]:
processed_data=processed_data.withColumn('Income',fn.trim(fn.col('Income')))

In [64]:
processed_data=processed_data.withColumn('Income',fn.when(fn.col('Income') == '>50K',1).otherwise(0))
processed_data.show()

+--------------------+----------+---------+----------+-------+--------------------+------------+--------------------+------+
|                 Age|Work_Class|Education|Occupation|Country|      Hours_per_Week|Capital_Loss|        Capital_Gain|Income|
+--------------------+----------+---------+----------+-------+--------------------+------------+--------------------+------+
|[0.3013698630136986]|       4.0|      2.0|       3.0|    0.0|[0.39795918367346...|           0|[0.02174021740217...|     0|
|[0.4520547945205479]|       1.0|      2.0|       2.0|    0.0|[0.12244897959183...|           0|               [0.0]|     0|
|[0.2876712328767123]|       0.0|      0.0|       9.0|    0.0|[0.39795918367346...|           0|               [0.0]|     0|
|[0.4931506849315068]|       0.0|      5.0|       9.0|    0.0|[0.39795918367346...|           0|               [0.0]|     0|
|[0.1506849315068493]|       0.0|      2.0|       0.0|    9.0|[0.39795918367346...|           0|               [0.0]|     0|


<h3>Adding the feature

In [65]:
train,test=processed_data.randomSplit([0.8,0.3])

In [66]:
print(processed_data.columns)
feature_column=['Age', 'Work_Class', 'Education', 'Occupation', 'Country','Hours_per_Week', 'Capital_Loss', 'Capital_Gain']

['Age', 'Work_Class', 'Education', 'Occupation', 'Country', 'Hours_per_Week', 'Capital_Loss', 'Capital_Gain', 'Income']


In [67]:
assemblers = ml.VectorAssembler(inputCols=feature_column ,outputCol="features")

new_data = assemblers.transform(processed_data)

In [68]:
new_data.show()

+--------------------+----------+---------+----------+-------+--------------------+------------+--------------------+------+--------------------+
|                 Age|Work_Class|Education|Occupation|Country|      Hours_per_Week|Capital_Loss|        Capital_Gain|Income|            features|
+--------------------+----------+---------+----------+-------+--------------------+------------+--------------------+------+--------------------+
|[0.3013698630136986]|       4.0|      2.0|       3.0|    0.0|[0.39795918367346...|           0|[0.02174021740217...|     0|[0.30136986301369...|
|[0.4520547945205479]|       1.0|      2.0|       2.0|    0.0|[0.12244897959183...|           0|               [0.0]|     0|[0.45205479452054...|
|[0.2876712328767123]|       0.0|      0.0|       9.0|    0.0|[0.39795918367346...|           0|               [0.0]|     0|(8,[0,3,5],[0.287...|
|[0.4931506849315068]|       0.0|      5.0|       9.0|    0.0|[0.39795918367346...|           0|               [0.0]|     0|

<h3>Spliting The data 

In [69]:
train_df,test_df=new_data.randomSplit([0.8,0.2])

In [70]:
train_df = train_df.withColumn("Income", fn.col("Income").cast("double"))
test_df = test_df.withColumn("Income", fn.col("Income").cast("double"))


<h2> Fitting The model

In [71]:
from pyspark.ml.classification import LogisticRegression
regr = LogisticRegression(featuresCol='features', labelCol='Income')

# Fit the model to the training data
regr_model = regr.fit(train_df)


                                                                                

In [72]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [73]:
predections=regr_model.transform(test_df)

In [74]:
predections.show()

[Stage 161:>                                                        (0 + 1) / 1]

+-----+----------+---------+----------+-------+--------------------+------------+--------------------+------+--------------------+--------------------+--------------------+----------+
|  Age|Work_Class|Education|Occupation|Country|      Hours_per_Week|Capital_Loss|        Capital_Gain|Income|            features|       rawPrediction|         probability|prediction|
+-----+----------+---------+----------+-------+--------------------+------------+--------------------+------+--------------------+--------------------+--------------------+----------+
|[0.0]|       0.0|      0.0|       3.0|    0.0|[0.19387755102040...|           0|               [0.0]|   0.0|(8,[3,5],[3.0,0.1...|[3.32713002978421...|[0.96534789348172...|       0.0|
|[0.0]|       0.0|      0.0|       4.0|    0.0|[0.19387755102040...|           0|               [0.0]|   0.0|(8,[3,5],[4.0,0.1...|[3.45508065386226...|[0.96938229591713...|       0.0|
|[0.0]|       0.0|      0.0|       5.0|    1.0|[0.3469387755102041]|           0

                                                                                

Type casting

In [76]:
predect_data = predections.withColumn("prediction", predections["prediction"].cast("double"))
predect_data= predections.withColumn("label", predections["Income"].cast("double"))

<h4> Calculating the accuracy of model

In [78]:
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy=evaluator.evaluate(predect_data)
print(accuracy)

[Stage 173:>                                                        (0 + 1) / 1]

0.8021599624354359


                                                                                