SALARY PREDICTION

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
!tar xf 'gdrive/MyDrive/Colab Notebooks/CBDT/SalaryPrediction/spark-3.1.1-bin-hadoop3.2.tgz'

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [5]:
!pip install -q findspark

In [6]:
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import seaborn as sns
import math

In [7]:
import pandas as pd
import numpy as np
from scipy.stats import chi2_contingency
from scipy.stats import f_oneway
from sklearn.preprocessing import StandardScaler

In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [9]:
path = '/content/gdrive/MyDrive/Colab Notebooks/CBDT/SalaryPrediction/'

In [10]:
df_csv = spark.read.csv(path+'SalaryData.csv', header=True, sep=",", inferSchema=True)

In [11]:
df = df_csv\
  .withColumn("INCOME_CHARGABLE_SAL_18",col("INCOME_CHARGABLE_SAL_18").cast(IntegerType()))\
  .withColumn("CNT_YRS_EXP",col("CNT_YRS_EXP").cast(IntegerType()))\
  .withColumn("INCOME_CHARGABLE_SAL_17",col("INCOME_CHARGABLE_SAL_17").cast(IntegerType()))\
  .withColumn("EMP_PRD_MNTH",col("EMP_PRD_MNTH").cast(IntegerType()))\
  .withColumn("GRS_TI_DDCTR",col("GRS_TI_DDCTR").cast(IntegerType()))\
  .withColumn("TRNVR_DDCTR",col("TRNVR_DDCTR").cast(IntegerType()))

In [12]:
df.printSchema()
# df.columns

root
 |-- _c0: integer (nullable = true)
 |-- GROSS_SALARY_18: integer (nullable = true)
 |-- INCOME_CHARGABLE_SAL_18: integer (nullable = true)
 |-- FORM_ID_18: string (nullable = true)
 |-- RES_STATUS18: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- PST_OFC: string (nullable = true)
 |-- SUB_DISTRICT_NM: string (nullable = true)
 |-- DISTRICT_NM: string (nullable = true)
 |-- STATE_NM: string (nullable = true)
 |-- Lbk_Pncd: string (nullable = true)
 |-- Pncd_Ctgry: string (nullable = true)
 |-- Pncd_Ctgry_Dscrptn: string (nullable = true)
 |-- TOP_30_CITY: string (nullable = true)
 |-- CNT_YRS_EXP: integer (nullable = true)
 |-- DEDUCTOR_TYPE: integer (nullable = true)
 |-- INCOME_CHARGABLE_SAL_17: integer (nullable = true)
 |-- Taxpayer_Type_17: string (nullable = true)
 |-- FORM_ID_17: string (nullable = true)
 |-- RES_STATUS17: string (nullable = true)
 |-- EMP_MONTHS_BY_DT: integer (nullable = true)
 |-- EMP_PRD_MNTH: intege

In [None]:
df.show(5)

+---+---------------+-----------------------+----------+------------+---+---+------------------+------------------+-----------+--------------+--------+----------+--------------------+-----------+-----------+-------------+-----------------------+----------------+----------+------------+----------------+------------+-----------------+---------------------+------------------+------------------+---------------+-----------------+---------------+------------+--------------------+-----------+--------------------+---------------------+---------------------+
|_c0|GROSS_SALARY_18|INCOME_CHARGABLE_SAL_18|FORM_ID_18|RES_STATUS18|Age|Sex|           PST_OFC|   SUB_DISTRICT_NM|DISTRICT_NM|      STATE_NM|Lbk_Pncd|Pncd_Ctgry|  Pncd_Ctgry_Dscrptn|TOP_30_CITY|CNT_YRS_EXP|DEDUCTOR_TYPE|INCOME_CHARGABLE_SAL_17|Taxpayer_Type_17|FORM_ID_17|RES_STATUS17|EMP_MONTHS_BY_DT|EMP_PRD_MNTH|GROSS_INC_AMT_24Q|SAL_EX_HRA_PF_AMT_24Q|CHARGE_INC_AMT_24Q|TOT_DE_VIA_AMT_24Q|TOT_VIA_AMT_24Q|OTHER_INC_AMT_24Q|ASSESSMENT_YEAR|

In [26]:
# pan_count = df.select('EMP_PAN').distinct().count()
# pan_count
# 50000

50000

**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**Univariate Analysis: Categorical variable**

---



In [None]:
categorical_Vars = ['FORM_ID_18', 'RES_STATUS18', 'Sex', 'Pncd_Ctgry', 'TOP_30_CITY', 'DEDUCTOR_TYPE', 'Taxpayer_Type_17',  'FORM_ID_17',
 'RES_STATUS17', 'GRS_TI_RNG_DDCTR', 'TRNVR_RNG_DDCTR', 'CMPTD_TXPYR_STS_DDCTR', 'CMPTD_TXPYR_TYP_DDCTR']
categorical_Vars

['FORM_ID_18',
 'RES_STATUS18',
 'Sex',
 'Pncd_Ctgry',
 'TOP_30_CITY',
 'DEDUCTOR_TYPE',
 'Taxpayer_Type_17',
 'FORM_ID_17',
 'RES_STATUS17',
 'GRS_TI_RNG_DDCTR',
 'TRNVR_RNG_DDCTR',
 'CMPTD_TXPYR_STS_DDCTR',
 'CMPTD_TXPYR_TYP_DDCTR']

In [None]:
categorical_Freq = [df.groupby(x).count().toPandas() for x in categorical_Vars]

In [None]:
CategoricalColsCount = len(categorical_Freq)
cols=4
rows=math.ceil(CategoricalColsCount/cols)
fig = make_subplots(rows=rows, cols=cols)

In [None]:
for k in range(CategoricalColsCount):
  freq = categorical_Freq[k]
  fig.add_trace(go.Bar(x=freq.iloc[:,0], y=freq.iloc[:,1]), row=int(k/cols)+1, col=k%cols+1)

In [None]:
fig.update_layout(height=2000, width=800, title_text="Subplots")
fig.show()

Frequency distribution --> Skipping - PST_OFC, SUB_DISTRICT_NM, DISTRICT_NM, STATE_NM Numbers are very high and contains missing values

**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**Univariate Analysis: Continuous variable**

---

In [None]:
continuous_Vars = ['GROSS_SALARY_18', 'INCOME_CHARGABLE_SAL_18', 'Age', 'CNT_YRS_EXP', 'INCOME_CHARGABLE_SAL_17', 'EMP_MONTHS_BY_DT', 
                      'EMP_PRD_MNTH', 'GROSS_INC_AMT_24Q', 'SAL_EX_HRA_PF_AMT_24Q', 'CHARGE_INC_AMT_24Q', 'TOT_DE_VIA_AMT_24Q', 'TOT_VIA_AMT_24Q', 
                      'OTHER_INC_AMT_24Q', 'GRS_TI_DDCTR', 'TRNVR_DDCTR']
continuous_Vars

['GROSS_SALARY_18',
 'INCOME_CHARGABLE_SAL_18',
 'Age',
 'CNT_YRS_EXP',
 'INCOME_CHARGABLE_SAL_17',
 'EMP_MONTHS_BY_DT',
 'EMP_PRD_MNTH',
 'GROSS_INC_AMT_24Q',
 'SAL_EX_HRA_PF_AMT_24Q',
 'CHARGE_INC_AMT_24Q',
 'TOT_DE_VIA_AMT_24Q',
 'TOT_VIA_AMT_24Q',
 'OTHER_INC_AMT_24Q',
 'GRS_TI_DDCTR',
 'TRNVR_DDCTR']

In [None]:
df1 = df.select(continuous_Vars)

In [None]:
df1.printSchema()

In [None]:
df1.describe().show()

+-------+------------------+-----------------------+------------------+-----------------+-----------------------+------------------+-----------------+-----------------+---------------------+------------------+------------------+-----------------+-----------------+--------------------+--------------------+
|summary|   GROSS_SALARY_18|INCOME_CHARGABLE_SAL_18|               Age|      CNT_YRS_EXP|INCOME_CHARGABLE_SAL_17|  EMP_MONTHS_BY_DT|     EMP_PRD_MNTH|GROSS_INC_AMT_24Q|SAL_EX_HRA_PF_AMT_24Q|CHARGE_INC_AMT_24Q|TOT_DE_VIA_AMT_24Q|  TOT_VIA_AMT_24Q|OTHER_INC_AMT_24Q|        GRS_TI_DDCTR|         TRNVR_DDCTR|
+-------+------------------+-----------------------+------------------+-----------------+-----------------------+------------------+-----------------+-----------------+---------------------+------------------+------------------+-----------------+-----------------+--------------------+--------------------+
|  count|             50000|                  49999|             50000|        

Histogram of continuous variables --> 'Age', 'CNT_YRS_EXP', 'EMP_PRD_MNTH', 'INCOME_CHARGABLE_SAL_18', 'INCOME_CHARGABLE_SAL_17', 'CHARGE_INC_AMT_24Q'

In [None]:
continuous_Var = 'Age'
px.histogram(df1.select(continuous_Var).sort(continuous_Var).toPandas(), x=continuous_Var).show()

In [None]:
continuous_Var = 'CNT_YRS_EXP'
px.histogram(df1.select(continuous_Var).sort(continuous_Var).toPandas(), x=continuous_Var).show()

In [None]:
continuous_Var = 'EMP_PRD_MNTH'
px.histogram(df1.select(continuous_Var).sort(continuous_Var).toPandas(), x=continuous_Var).show()

In [None]:
continuous_Var = 'INCOME_CHARGABLE_SAL_18'
px.histogram(df1.select(continuous_Var).sort(continuous_Var).toPandas(), x=continuous_Var).show()

In [None]:
# <= 4.75 million
continuous_Var = 'INCOME_CHARGABLE_SAL_18'
px.histogram(df1.filter(df1.INCOME_CHARGABLE_SAL_18 <= 4750000).select(continuous_Var).sort(continuous_Var).toPandas(), x=continuous_Var).show()

In [None]:
continuous_Var = 'INCOME_CHARGABLE_SAL_17'
px.histogram(df1.select(continuous_Var).sort(continuous_Var).toPandas(), x=continuous_Var).show()

In [None]:
# <= 4.75 million
continuous_Var = 'INCOME_CHARGABLE_SAL_17'
px.histogram(df1.filter(df1.INCOME_CHARGABLE_SAL_17 <= 4750000).select(continuous_Var).sort(continuous_Var).toPandas(), x=continuous_Var).show()

In [None]:
continuous_Var = 'CHARGE_INC_AMT_24Q'
px.histogram(df1.select(continuous_Var).sort(continuous_Var).toPandas(), x=continuous_Var).show()

In [None]:
# <= 4.75 million
continuous_Var = 'CHARGE_INC_AMT_24Q'
px.histogram(df1.filter(df1.CHARGE_INC_AMT_24Q <= 4750000).select(continuous_Var).sort(continuous_Var).toPandas(), x=continuous_Var).show()

***Standard Scaling***

In [None]:
continuous_Var = 'INCOME_CHARGABLE_SAL_18'
df_in = df1.select(continuous_Var).sort(continuous_Var).toPandas()
scaler = StandardScaler().fit(df_in)
df_out = scaler.transform(df_in)
px.histogram(df_out).show()

In [None]:
# <= 4.75 million
threshold = (4750000-scaler.mean_)/scaler.scale_
df_out = df_out[df_out <= threshold]
px.histogram(df_out).show()

In [None]:
continuous_Var = 'INCOME_CHARGABLE_SAL_17'
df_in = df1.select(continuous_Var).sort(continuous_Var).toPandas()
scaler = StandardScaler().fit(df_in)
df_out = scaler.transform(df_in)
px.histogram(df_out).show()

In [None]:
# <= 4.75 million
threshold = (4750000-scaler.mean_)/scaler.scale_
df_out = df_out[df_out <= threshold]
px.histogram(df_out).show()

In [None]:
continuous_Var = 'CHARGE_INC_AMT_24Q'
df_in = df1.select(continuous_Var).sort(continuous_Var).toPandas()
scaler = StandardScaler().fit(df_in)
df_out = scaler.transform(df_in)
px.histogram(df_out).show()

In [None]:
# <= 4.75 million
threshold = (4750000-scaler.mean_)/scaler.scale_
df_out = df_out[df_out <= threshold]
px.histogram(df_out).show()

**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**Bivariate Analysis --> continuous vs continuous**

---



**Scatter plot between continuous variables**

In [None]:
continuous_Vars_Stat = ['Age', 'CNT_YRS_EXP', 'EMP_PRD_MNTH', 'INCOME_CHARGABLE_SAL_18', 'INCOME_CHARGABLE_SAL_17', 'CHARGE_INC_AMT_24Q']

In [None]:
df2 = df.select(continuous_Vars_Stat).toPandas()

In [None]:
fig = px.scatter_matrix(df2)
fig.show()

In [None]:
fig = px.scatter(x=df2.Age, y=df2.INCOME_CHARGABLE_SAL_18)
fig.show()

In [None]:
fig = px.scatter(x=df2.CHARGE_INC_AMT_24Q, y=df2.INCOME_CHARGABLE_SAL_18)
fig.show()

**correlation**

In [None]:
corr_Matrix = df2.corr()
corr_Matrix

In [None]:
sns.heatmap(corr_Matrix)  

**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**Bivariate Analysis --> categorical vs categorical**

---



**Chi-square test**

In [None]:
# create contingency table
df3 = df.groupBy('TOP_30_CITY').pivot('FORM_ID_17').count().toPandas().set_index('TOP_30_CITY')
df3.head()


In [None]:
# Chi-square test
stat, p, dof, expected = chi2_contingency(df3)

**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**Bivariate Analysis --> continuous vs categorical**

---



**Anova test**

In [None]:
ITR = df.select(['FORM_ID_17']).distinct().toPandas().squeeze().tolist()
df4 = df.select(['FORM_ID_17', 'INCOME_CHARGABLE_SAL_18'])
df4_list = [df4.filter(df4.FORM_ID_17 == i).filter(df4.INCOME_CHARGABLE_SAL_18.isNotNull()).select('INCOME_CHARGABLE_SAL_18').toPandas().squeeze() for i in ITR]

In [None]:
F, p = f_oneway(*df4_list)
print(F, p)

1123.1414513687826 0.0


**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**Feature Engineering**

---



In [13]:
from pyspark.sql.functions import when

In [14]:
df5 = df.withColumn("FORM_ID_17_1", when(df.FORM_ID_17 == 'ITR-1',1).otherwise(0))\
         .withColumn("FORM_ID_17_2", when(df.FORM_ID_17 == 'ITR-2',1).otherwise(0))\
         .withColumn("FORM_ID_17_3", when(df.FORM_ID_17 == 'ITR-3',1).otherwise(0))\
         .withColumn("FORM_ID_17_4", when(df.FORM_ID_17 == 'ITR-4',1).otherwise(0))                                                             

In [15]:
df5 =  df5.withColumn("RES_STATUS17_NOR", when(df.RES_STATUS17 == 'NOR',1).otherwise(0))\
            .withColumn("RES_STATUS17_RES", when(df.RES_STATUS17 == 'RES',1).otherwise(0))\
            .withColumn("RES_STATUS17_NRI", when(df.RES_STATUS17 == 'NRI',1).otherwise(0))   

In [16]:
df5 = df5.filter(df5.Sex.isNotNull())

In [17]:
df5 =  df5.withColumn("Sex_F", when(df.Sex == 'F',1).otherwise(0))\
          .withColumn("Sex_M", when(df.Sex == 'M',1).otherwise(0))\
          .withColumn("Sex_T", when(df.Sex == 'T',1).otherwise(0))   

In [18]:
df5 =  df5.withColumn("Pncd_Ctgry_NA", when(df.Pncd_Ctgry == 'NA',1).otherwise(0))\
          .withColumn("Pncd_Ctgry_M", when(df.Pncd_Ctgry == 'Mixed',1).otherwise(0))\
          .withColumn("Pncd_Ctgry_R", when(df.Pncd_Ctgry == 'Rural',1).otherwise(0))\
          .withColumn("Pncd_Ctgry_30C", when(df.Pncd_Ctgry == '30City',1).otherwise(0))\
          .withColumn("Pncd_Ctgry_U", when(df.Pncd_Ctgry == 'Urban',1).otherwise(0))\
          .withColumn("Pncd_Ctgry_NULL", when(df.Pncd_Ctgry == '',1).otherwise(0))

In [19]:
df5 =  df5.withColumn("Taxpayer_Type_17_SP", when(df.Taxpayer_Type_17 == 'Salaried Person',1).otherwise(0))\
          .withColumn("Taxpayer_Type_17_PROF", when(df.Taxpayer_Type_17 == 'Professional',1).otherwise(0))\
          .withColumn("Taxpayer_Type_17_PROP", when(df.Taxpayer_Type_17 == 'Proprietor',1).otherwise(0))\
          .withColumn("Taxpayer_Type_17_O", when(df.Taxpayer_Type_17 == 'Others',1).otherwise(0))

In [20]:
df5 =  df5.withColumn("TOP_30_CITY_Y", when(df.TOP_30_CITY == 'Yes',1).otherwise(0))\
          .withColumn("TOP_30_CITY_N", when(df.TOP_30_CITY == 'No',1).otherwise(0))\
          .withColumn("TOP_30_CITY_NULL", when(df.TOP_30_CITY == '',1).otherwise(0))

In [21]:
df5 =  df5.withColumn("GRS_TI_RNG_DDCTR_VS1", when(df.GRS_TI_RNG_DDCTR == 'a. Very Small (< 2.5 Lac)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_VS2", when(df.GRS_TI_RNG_DDCTR == 'a. Very Small (< 10 Lac)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_S1", when(df.GRS_TI_RNG_DDCTR == 'b. Small (2.5 Lac - 5 Lac)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_S2", when(df.GRS_TI_RNG_DDCTR == 'b. Small (10 Lac - 50 Lac)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_M1", when(df.GRS_TI_RNG_DDCTR == 'c. Medium (5 Lac - 10 Lac)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_M2", when(df.GRS_TI_RNG_DDCTR == 'c. Medium (50 Lac - 1 Cr)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_L1", when(df.GRS_TI_RNG_DDCTR == 'd. Large (10 Lac - 50 Lac)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_L2", when(df.GRS_TI_RNG_DDCTR == 'd. Large (1 Cr - 10 Cr)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_VL1", when(df.GRS_TI_RNG_DDCTR == 'e. Very Large (>= 50 Lac)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_VL2", when(df.GRS_TI_RNG_DDCTR == 'e. Very Large (>= 10 Cr)',1).otherwise(0))\
          .withColumn("GRS_TI_RNG_DDCTR_U", when(df.GRS_TI_RNG_DDCTR == 'Unknown',1).otherwise(0))

In [22]:
df5 =  df5.withColumn("CMPTD_TXPYR_STS_DDCTR_A", when(df.CMPTD_TXPYR_STS_DDCTR == 'AJP',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_STS_DDCTR_AB", when(df.CMPTD_TXPYR_STS_DDCTR == 'AOP/BOI',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_STS_DDCTR_C", when(df.CMPTD_TXPYR_STS_DDCTR == 'Company',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_STS_DDCTR_G", when(df.CMPTD_TXPYR_STS_DDCTR == 'Government',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_STS_DDCTR_H", when(df.CMPTD_TXPYR_STS_DDCTR == 'HUF',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_STS_DDCTR_I", when(df.CMPTD_TXPYR_STS_DDCTR == 'Individual',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_STS_DDCTR_L", when(df.CMPTD_TXPYR_STS_DDCTR == 'Local Authority',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_STS_DDCTR_T", when(df.CMPTD_TXPYR_STS_DDCTR == 'Trust',1).otherwise(0))

In [23]:
df5 =  df5.withColumn("CMPTD_TXPYR_TYP_DDCTR_BUS", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Business',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_CR", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Charitable/Religious',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_COM", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Company (PAN)',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_FRM", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Firm',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_FC", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Foreign Company',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_LLP", when(df.CMPTD_TXPYR_TYP_DDCTR == 'LLP',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_OTH", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Others',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_PVTL", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Private Limited',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_PROF", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Professional',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_PROP", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Proprietor',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_PUBL", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Public Limited',1).otherwise(0))\
          .withColumn("CMPTD_TXPYR_TYP_DDCTR_SAL", when(df.CMPTD_TXPYR_TYP_DDCTR == 'Salaried Person',1).otherwise(0))

In [24]:
df5 =  df5.withColumn("DEDUCTOR_TYPE_11", when(df.DEDUCTOR_TYPE == '11',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_12", when(df.DEDUCTOR_TYPE == '12',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_13", when(df.DEDUCTOR_TYPE == '13',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_14", when(df.DEDUCTOR_TYPE == '14',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_15", when(df.DEDUCTOR_TYPE == '15',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_16", when(df.DEDUCTOR_TYPE == '16',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_17", when(df.DEDUCTOR_TYPE == '17',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_18", when(df.DEDUCTOR_TYPE == '18',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_19", when(df.DEDUCTOR_TYPE == '19',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_20", when(df.DEDUCTOR_TYPE == '20',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_21", when(df.DEDUCTOR_TYPE == '21',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_22", when(df.DEDUCTOR_TYPE == '22',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_23", when(df.DEDUCTOR_TYPE == '23',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_24", when(df.DEDUCTOR_TYPE == '24',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_25", when(df.DEDUCTOR_TYPE == '25',1).otherwise(0))\
          .withColumn("DEDUCTOR_TYPE_26", when(df.DEDUCTOR_TYPE == '26',1).otherwise(0))

In [None]:
# pan_count = df5.select('EMP_PAN').distinct().count()
# pan_count
# # 50000

**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**Outlier Detection and Removal**

---



In [28]:
def outlierDetectionRange(quantiles):
  quantile_25, quantile_50, quantile_75 = quantiles
  a =  quantile_50-1.5*(quantile_75-quantile_25)
  b =  quantile_50+1.5*(quantile_75-quantile_25)
  return {'min' : a, 'max' : b}

In [29]:
quantile_list = ['INCOME_CHARGABLE_SAL_18', 'CHARGE_INC_AMT_24Q', 'INCOME_CHARGABLE_SAL_17']
df_quantile = df5.approxQuantile(quantile_list, [0.25,0.5,0.75], 0.25)
df_quantile

[[22.0, 482029.0, 147163685.0],
 [0.0, 457697.0, 221735507.0],
 [0.0, 430180.0, 136591195.0]]

In [30]:
interQuantileRange = {quantile_list[i] : outlierDetectionRange(df_quantile[i]) for i in range(len(quantile_list))}
interQuantileRange
# {'INCOME_CHARGABLE_SAL_18': {'min': -220263494.5, 'max': 221227494.5},
#  'CHARGE_INC_AMT_24Q': {'min': -332145645.5, 'max': 333060875.5},
#  'INCOME_CHARGABLE_SAL_17': {'min': -204456612.5, 'max': 205316972.5}}

{'INCOME_CHARGABLE_SAL_18': {'min': -220263465.5, 'max': 221227523.5},
 'CHARGE_INC_AMT_24Q': {'min': -332145563.5, 'max': 333060957.5},
 'INCOME_CHARGABLE_SAL_17': {'min': -204456612.5, 'max': 205316972.5}}

In [31]:
df6 = df5.filter((df5.INCOME_CHARGABLE_SAL_18 <= interQuantileRange['INCOME_CHARGABLE_SAL_18']['max'])\
               & (df5.INCOME_CHARGABLE_SAL_18 >= interQuantileRange['INCOME_CHARGABLE_SAL_18']['min']))

In [32]:
df6 = df6.filter((df6.CHARGE_INC_AMT_24Q <= interQuantileRange['CHARGE_INC_AMT_24Q']['max'])\
               & (df6.CHARGE_INC_AMT_24Q >= interQuantileRange['CHARGE_INC_AMT_24Q']['min']))

In [33]:
df6 = df6.filter((df6.INCOME_CHARGABLE_SAL_17 <= interQuantileRange['INCOME_CHARGABLE_SAL_17']['max'])\
               & (df6.INCOME_CHARGABLE_SAL_17 >= interQuantileRange['INCOME_CHARGABLE_SAL_17']['min']))

**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**Standard Scaling**

---



In [34]:
df_scaler = df6.select(['INCOME_CHARGABLE_SAL_18', 'CHARGE_INC_AMT_24Q', 'INCOME_CHARGABLE_SAL_17'])

In [35]:
df_stats = df_scaler.describe().toPandas()
df_stats

Unnamed: 0,summary,INCOME_CHARGABLE_SAL_18,CHARGE_INC_AMT_24Q,INCOME_CHARGABLE_SAL_17
0,count,49773.0,49773.0,49773.0
1,mean,942016.656560786,926780.6389006088,836804.523215398
2,stddev,1736794.5340572968,1969964.6937781388,1643872.6854271607
3,min,22.0,0.0,0.0
4,max,147163685.0,221735507.0,136591195.0


In [36]:
df7 = df6.withColumn("INCOME_CHARGABLE_SAL_18_scaler", (df6.INCOME_CHARGABLE_SAL_18-df_stats.iloc[1,1])/df_stats.iloc[2,1])\
         .withColumn("CHARGE_INC_AMT_24Q_scaler", (df6.CHARGE_INC_AMT_24Q-df_stats.iloc[1,2])/df_stats.iloc[2,2])\
         .withColumn("INCOME_CHARGABLE_SAL_17_scaler", (df6.INCOME_CHARGABLE_SAL_17-df_stats.iloc[1,3])/df_stats.iloc[2,3])

**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**Missing values**

---



In [37]:
df8 = df7.filter(df7.INCOME_CHARGABLE_SAL_18_scaler.isNotNull())
df8 = df8.filter(df8.INCOME_CHARGABLE_SAL_17_scaler.isNotNull())
df8 = df8.filter(df8.CHARGE_INC_AMT_24Q_scaler.isNotNull())
df8 = df8.filter(df8.Age.isNotNull())
df8 = df8.filter(df8.CNT_YRS_EXP.isNotNull())
df8 = df8.filter(df8.EMP_PRD_MNTH.isNotNull())

In [38]:
len(df8.columns)

109

*Features selection*

In [39]:
df9 = df8.drop('_c0', 'EMP_PAN', 'DEDUCTOR_TAN', 'DEDUCTOR_PAN')

df9 = df9.drop('GROSS_SALARY_18','FORM_ID_18','RES_STATUS18','Sex','PST_OFC','SUB_DISTRICT_NM','DISTRICT_NM','STATE_NM','Lbk_Pncd','Pncd_Ctgry',
'Pncd_Ctgry_Dscrptn','TOP_30_CITY','DEDUCTOR_TYPE','Taxpayer_Type_17','FORM_ID_17','RES_STATUS17','EMP_MONTHS_BY_DT','GROSS_INC_AMT_24Q',
'SAL_EX_HRA_PF_AMT_24Q','TOT_DE_VIA_AMT_24Q','TOT_VIA_AMT_24Q','OTHER_INC_AMT_24Q','ASSESSMENT_YEAR','GRS_TI_DDCTR','GRS_TI_RNG_DDCTR','TRNVR_DDCTR',
'TRNVR_RNG_DDCTR','CMPTD_TXPYR_STS_DDCTR','CMPTD_TXPYR_TYP_DDCTR','Sex_T','Pncd_Ctgry_NULL')

df9 = df9.drop('CHARGE_INC_AMT_24Q', 'INCOME_CHARGABLE_SAL_17')

**+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++**

**MODELLING**

---



**Date split into train and test**

In [40]:
from sklearn.model_selection import train_test_split

In [41]:
train, test = train_test_split(df9.toPandas(), test_size=0.3)

In [42]:
train.shape, test.shape

((33628, 75), (14413, 75))

In [43]:
X_train = train.drop(['INCOME_CHARGABLE_SAL_18','INCOME_CHARGABLE_SAL_18_scaler'], axis=1)
y_train = train.INCOME_CHARGABLE_SAL_18_scaler

In [44]:
X_test = test.drop(['INCOME_CHARGABLE_SAL_18', 'INCOME_CHARGABLE_SAL_18_scaler'], axis=1)
y_test_log = test.INCOME_CHARGABLE_SAL_18_scaler
y_test = test.INCOME_CHARGABLE_SAL_18

*Mean absolute percentage error*

In [65]:
INCOME_CHARGABLE_SAL_18_mean = float(df_stats.iloc[1,1])
INCOME_CHARGABLE_SAL_18_stddev = float(df_stats.iloc[2,1])

In [66]:
def MAPE_fn(predict,expect):
  return (np.abs(expect-(predict*INCOME_CHARGABLE_SAL_18_stddev+INCOME_CHARGABLE_SAL_18_mean))/expect)*100

**Linear Regression**

In [61]:
from sklearn.linear_model import LinearRegression

In [64]:
reg = LinearRegression().fit(X_train, y_train)
reg.score(X_train, y_train), reg.coef_, reg.intercept_

(0.9385096286227228,
 array([ 4.27488830e-04,  2.86548798e-03,  4.51590182e-03, -4.72123457e-02,
         6.43664917e-02, -7.76005289e-03, -9.39409312e-03, -2.92382537e-02,
        -2.18738954e-02,  5.11121490e-02, -3.55868913e-03,  3.55868913e-03,
         4.03450614e-03,  1.11613550e-02,  4.57404399e-03,  2.11288117e-02,
         1.20634128e-02, -2.35387910e-02, -3.68690037e-02, -7.20637747e-02,
         1.32471569e-01,  1.91264157e-03, -2.00043445e-03, -6.24500451e-17,
         1.17717300e-02,  5.26177546e-03,  4.39235340e-03,  2.16140092e-02,
         2.40658240e-03,  1.61291747e-02,  2.45658849e-02,  3.47217139e-02,
         3.77381870e-02,  1.53767690e-02,  1.81834459e-02, -3.43458670e-02,
        -3.02712251e-02, -6.28755994e-02, -6.45303095e-03, -2.56427376e-02,
        -3.38122619e-02, -3.74460821e-02, -3.71006856e-02,  4.21126949e-03,
         2.41692882e-02,  2.43045553e-02, -4.13968924e-02,  5.89012917e-02,
        -1.77427579e-02,  2.28590996e-03,  5.39839177e-02,  5.05815

In [69]:
y_pred = reg.predict(X_test)
y_expct  = np.array(y_test)

In [70]:
MAPE_np = MAPE_fn(y_pred, y_expct)

**Decision Tree**

In [45]:
from sklearn.tree import DecisionTreeRegressor
from sklearn import metrics

In [46]:
regressor = DecisionTreeRegressor(random_state=123, max_depth=7)
regressor.fit(X_train,y_train)

In [47]:
y_pred = regressor.predict(X_test)
y_expct  = np.array(y_test)

In [50]:
MAPE_np = MAPE_fn(y_pred, y_expct)

**CACHE**

In [51]:
MAPE_np[:3]

array([25.95039413, 21.54947519, 10.60890187])

**Logrithm**

---



In [52]:
from pyspark.sql.functions import log

In [53]:
df7 = df6.withColumn("INCOME_CHARGABLE_SAL_18_log", log(df6.INCOME_CHARGABLE_SAL_18))\
         .withColumn("CHARGE_INC_AMT_24Q_log", log(df6.CHARGE_INC_AMT_24Q))\
         .withColumn("INCOME_CHARGABLE_SAL_17_log", log(df6.INCOME_CHARGABLE_SAL_17))

In [54]:
MAPE_np[:23]

array([25.95039413, 21.54947519, 10.60890187, 29.04041898, 23.78132666,
       37.04562021,  9.82303777, 11.93090838, 11.3667902 , 10.58130463,
        1.90051158,  8.00439912,  7.83700012,  4.22143177,  2.21563904,
        2.34095476,  1.01529967,  3.40268965,  1.19249981,  1.4958023 ,
        2.10826833,  1.74179073, 87.47117063])

In [55]:
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.ml.stat import Correlation

In [56]:
dataset = [[Vectors.dense([1, 1, 1, 1])],
           [Vectors.dense([1, 1, 1, 1])],
           [Vectors.dense([1, 1, 1, 1])],
           [Vectors.dense([1, 1, 1, 1])],
           [Vectors.dense([1, 1, 1, 1])]]
dataset = spark.createDataFrame(dataset, ['features'])
pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0]
print(str(pearsonCorr).replace('nan', 'NaN'))

DenseMatrix([[ 1., NaN, NaN, NaN],
             [NaN,  1., NaN, NaN],
             [NaN, NaN,  1., NaN],
             [NaN, NaN, NaN,  1.]])


In [57]:
dataset.show()

+-----------------+
|         features|
+-----------------+
|[1.0,1.0,1.0,1.0]|
|[1.0,1.0,1.0,1.0]|
|[1.0,1.0,1.0,1.0]|
|[1.0,1.0,1.0,1.0]|
|[1.0,1.0,1.0,1.0]|
+-----------------+



In [58]:
histogram_Continuous_Vars = ['INCOME_CHARGABLE_SAL_18', 'Age', 'CNT_YRS_EXP', 'INCOME_CHARGABLE_SAL_17', 'EMP_PRD_MNTH', 'CHARGE_INC_AMT_24Q']

In [59]:
ContinuousColsCount=len(histogram_Continuous_Vars)

In [60]:
figures = []
for k in range(ContinuousColsCount):
  ContinuousVar = histogram_Continuous_Vars[k]
  # fig.add_trace(go.Histogram(x=df1.select(ContinuousVar).toPandas(), xbins=go.histogram.XBins(size=1)), row=k+1, col=1)
  figures.append(px.histogram(df1.select(ContinuousVar).sort(ContinuousVar).toPandas(), x=ContinuousVar, nbins=100))

NameError: ignored

In [None]:
m = []
for k in range(12):
  m.append((int(k/2)+1,k%2+1))

m

In [None]:
df_FORM_ID_18 = df.groupby('FORM_ID_18').count().toPandas()
df_RES_STATUS18 = df.groupby('RES_STATUS18').count().toPandas()
df_Sex = df.groupby('Sex').count().toPandas()
df_ = df.groupby('').count().toPandas()


In [None]:
ITR_Type = df.select('FORM_ID_18').distinct().toPandas()
ITR_Type

In [None]:
!ls

In [None]:
!pwd

In [None]:
np_array = np.array(df1.select(continuous_Var).sort(continuous_Var).toPandas())

In [None]:
from sklearn import preprocessing
import numpy as np
X_train = np.array([[ 1., -1.,  2.],
                   [ 2.,  0.,  0.],
                  [ 0.,  1., -1.]])
scaler = preprocessing.StandardScaler().fit(np_array)
scaler

In [None]:
# d = pd.DataFrame(np_StandardScaler, columns = [continuous_Var])

np.array(np_StandardScaler)