# filling rows that contain missing values 

In [1]:
import findspark

In [2]:
findspark.init('/home/abhi/spark-2.2.1-bin-hadoop2.7')

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark=SparkSession.builder.appName('preprocess').getOrCreate()

In [6]:
df=spark.read.csv('/home/abhi/project/orange_small_train.data',inferSchema=True,header=True,sep='\t')

In [7]:
from pyspark.sql.functions import *
null_count=df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).collect()

## column-wise null count

In [8]:
null_count

[Row(Var1=49298, Var2=48759, Var3=48760, Var4=48421, Var5=48513, Var6=5529, Var7=5539, Var8=50000, Var9=49298, Var10=48513, Var11=48760, Var12=49442, Var13=5539, Var14=48760, Var15=50000, Var16=48513, Var17=48421, Var18=48421, Var19=48421, Var20=50000, Var21=5529, Var22=5009, Var23=48513, Var24=7230, Var25=5009, Var26=48513, Var27=48513, Var28=5011, Var29=49298, Var30=49298, Var31=50000, Var32=50000, Var33=49153, Var34=48759, Var35=5009, Var36=48759, Var37=48421, Var38=5009, Var39=50000, Var40=48759, Var41=49298, Var42=50000, Var43=48759, Var44=5009, Var45=49656, Var46=48759, Var47=49298, Var48=50000, Var49=48759, Var50=49298, Var51=46253, Var52=50000, Var53=49298, Var54=48759, Var55=50000, Var56=49354, Var57=0, Var58=49298, Var59=49180, Var60=48513, Var61=49153, Var62=49442, Var63=49306, Var64=49762, Var65=5539, Var66=49306, Var67=48513, Var68=48759, Var69=48513, Var70=48513, Var71=48871, Var72=22380, Var73=0, Var74=5539, Var75=48759, Var76=5009, Var77=49298, Var78=5009, Var79=50000, 

## setting threshold for null values of a column 

In [9]:
my_list=[]
thresh=0.60*df.count()
i=0
while i<len(df.columns):
    if(null_count[0][i]<thresh):
        my_list.append("Var{}".format(i+1))
    i+=1

## list of columns that are below the threshold

In [10]:
my_list

['Var6',
 'Var7',
 'Var13',
 'Var21',
 'Var22',
 'Var24',
 'Var25',
 'Var28',
 'Var35',
 'Var38',
 'Var44',
 'Var57',
 'Var65',
 'Var72',
 'Var73',
 'Var74',
 'Var76',
 'Var78',
 'Var81',
 'Var83',
 'Var85',
 'Var94',
 'Var109',
 'Var112',
 'Var113',
 'Var119',
 'Var123',
 'Var125',
 'Var126',
 'Var132',
 'Var133',
 'Var134',
 'Var140',
 'Var143',
 'Var144',
 'Var149',
 'Var153',
 'Var160',
 'Var163',
 'Var173',
 'Var181',
 'Var189',
 'Var192',
 'Var193',
 'Var195',
 'Var196',
 'Var197',
 'Var198',
 'Var199',
 'Var200',
 'Var202',
 'Var203',
 'Var204',
 'Var205',
 'Var206',
 'Var207',
 'Var208',
 'Var210',
 'Var211',
 'Var212',
 'Var214',
 'Var216',
 'Var217',
 'Var218',
 'Var219',
 'Var220',
 'Var221',
 'Var222',
 'Var223',
 'Var225',
 'Var226',
 'Var227',
 'Var228',
 'Var229']

## selecting the columns that are below the threshold

In [11]:
my_cols=df.select(my_list)
my_cols.count()

50000

## identifying the type of data present in each column

In [12]:
num_list=[]
cat_list=[]
for name,dtype in my_cols.dtypes:
    if(dtype=='int' or dtype=='double' or dtype=='float'):
        num_list.append(name)
    else:
        cat_list.append(name)


## filling the missing values of numeric data type columns with mean

In [13]:
for c in num_list:
        average=my_cols.select(mean(c)).collect()
        my_cols = my_cols.fillna(average[0][0],c)

## filling the missing values of categorical data with mode
## frequently repeated value is placed in missing values of the column

In [14]:
for c in cat_list:
    freq_count=my_cols.groupBy(c).count()
    freq_list=freq_count.orderBy(freq_count["count"].desc()).collect()
    if(freq_list[0][0]==None):
        my_cols = my_cols.fillna(freq_list[1][0],c)
    else:
        my_cols = my_cols.fillna(freq_list[0][0],c)

## total rows after filling the missing values

In [15]:
my_cols.count()

50000

In [16]:
null_count2=my_cols.select([count(when(isnull(c), c)).alias(c) for c in my_cols.columns]).collect()

## nullcount of each column after filling missing values(which is  zero for every column)

In [17]:
null_count2

[Row(Var6=0, Var7=0, Var13=0, Var21=0, Var22=0, Var24=0, Var25=0, Var28=0, Var35=0, Var38=0, Var44=0, Var57=0, Var65=0, Var72=0, Var73=0, Var74=0, Var76=0, Var78=0, Var81=0, Var83=0, Var85=0, Var94=0, Var109=0, Var112=0, Var113=0, Var119=0, Var123=0, Var125=0, Var126=0, Var132=0, Var133=0, Var134=0, Var140=0, Var143=0, Var144=0, Var149=0, Var153=0, Var160=0, Var163=0, Var173=0, Var181=0, Var189=0, Var192=0, Var193=0, Var195=0, Var196=0, Var197=0, Var198=0, Var199=0, Var200=0, Var202=0, Var203=0, Var204=0, Var205=0, Var206=0, Var207=0, Var208=0, Var210=0, Var211=0, Var212=0, Var214=0, Var216=0, Var217=0, Var218=0, Var219=0, Var220=0, Var221=0, Var222=0, Var223=0, Var225=0, Var226=0, Var227=0, Var228=0, Var229=0)]

In [18]:
churn_label = spark.read.csv('/home/abhi/project/orange_small_train_churn.labels',inferSchema=True,header=False)

## using lit() to add a new column that only '1' in each row

In [19]:
from pyspark.sql.functions import lit

my_cols = my_cols.withColumn("order",lit(1))
churn_label = churn_label.withColumn("order",lit(1))

## creating another column rowNum that contains consecutive values

In [20]:
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window

my_cols = my_cols.select("*", F.row_number().over(Window.partitionBy("order").orderBy("order")).alias("rowNum"))
churn_label = churn_label.select("*", F.row_number().over(Window.partitionBy("order").orderBy("order")).alias("rowNum"))

## joining the feature columns data frame with label dataframe

In [21]:
result = my_cols.join(churn_label,my_cols.rowNum == churn_label.rowNum)

## schema of the resultant dataframe

In [22]:
result.printSchema()

root
 |-- Var6: integer (nullable = true)
 |-- Var7: integer (nullable = true)
 |-- Var13: integer (nullable = true)
 |-- Var21: integer (nullable = true)
 |-- Var22: integer (nullable = true)
 |-- Var24: integer (nullable = true)
 |-- Var25: integer (nullable = true)
 |-- Var28: double (nullable = false)
 |-- Var35: integer (nullable = true)
 |-- Var38: integer (nullable = true)
 |-- Var44: integer (nullable = true)
 |-- Var57: double (nullable = false)
 |-- Var65: integer (nullable = true)
 |-- Var72: integer (nullable = true)
 |-- Var73: integer (nullable = true)
 |-- Var74: integer (nullable = true)
 |-- Var76: integer (nullable = true)
 |-- Var78: integer (nullable = true)
 |-- Var81: double (nullable = false)
 |-- Var83: integer (nullable = true)
 |-- Var85: integer (nullable = true)
 |-- Var94: integer (nullable = true)
 |-- Var109: integer (nullable = true)
 |-- Var112: integer (nullable = true)
 |-- Var113: double (nullable = false)
 |-- Var119: integer (nullable = true)
 |-- 

## dropping order and rowNum columns which are used to join dataframes

In [23]:
result = result.drop("order","rowNum")

## Renaming the _c0 column to label

In [24]:
result = result.withColumnRenamed("_c0","Label")

## generating the csv file for classification

In [27]:
result.repartition(1).write.csv('/home/abhi/project/cleaned_data2',sep=',',header=True)