In [1]:
spark = SparkSession.builder.appName("InClass Notebook").getOrCreate()

In [2]:
import pyspark.sql.functions as F
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import Bucketizer

In [3]:
columns = [
    'TARGET',
'NAME_CONTRACT_TYPE',
'CODE_GENDER',
'FLAG_OWN_CAR',
'FLAG_OWN_REALTY',
'CNT_CHILDREN',
'AMT_INCOME_TOTAL',
'AMT_CREDIT',
'AMT_ANNUITY',
'NAME_INCOME_TYPE',
'NAME_EDUCATION_TYPE',
'NAME_FAMILY_STATUS',
'NAME_HOUSING_TYPE',
'DAYS_BIRTH',
'DAYS_EMPLOYED',
'FLAG_MOBIL',
'FLAG_EMP_PHONE',
'FLAG_WORK_PHONE',
'FLAG_CONT_MOBILE',
'FLAG_PHONE',
'CNT_FAM_MEMBERS',
'REGION_RATING_CLIENT',
'REGION_RATING_CLIENT_W_CITY',
'REG_REGION_NOT_LIVE_REGION',
'REG_REGION_NOT_WORK_REGION',
'ORGANIZATION_TYPE',
'FLAG_DOCUMENT_2',
'FLAG_DOCUMENT_3',
'FLAG_DOCUMENT_4',
'FLAG_DOCUMENT_5',
'FLAG_DOCUMENT_6',
'FLAG_DOCUMENT_7',
'FLAG_DOCUMENT_8',
'FLAG_DOCUMENT_9',
'FLAG_DOCUMENT_10',
'FLAG_DOCUMENT_11',
'FLAG_DOCUMENT_12'
]

In [4]:
data = spark.read.option("inferSchema", True).csv("hdfs:///user/edureka_448212/sqoop_import1/*").limit(1000)
data = data.toDF(*columns)
data.count()
data.cache()

DataFrame[TARGET: int, NAME_CONTRACT_TYPE: string, CODE_GENDER: string, FLAG_OWN_CAR: string, FLAG_OWN_REALTY: string, CNT_CHILDREN: int, AMT_INCOME_TOTAL: double, AMT_CREDIT: double, AMT_ANNUITY: double, NAME_INCOME_TYPE: string, NAME_EDUCATION_TYPE: string, NAME_FAMILY_STATUS: string, NAME_HOUSING_TYPE: string, DAYS_BIRTH: int, DAYS_EMPLOYED: int, FLAG_MOBIL: int, FLAG_EMP_PHONE: int, FLAG_WORK_PHONE: int, FLAG_CONT_MOBILE: int, FLAG_PHONE: int, CNT_FAM_MEMBERS: double, REGION_RATING_CLIENT: int, REGION_RATING_CLIENT_W_CITY: int, REG_REGION_NOT_LIVE_REGION: int, REG_REGION_NOT_WORK_REGION: int, ORGANIZATION_TYPE: string, FLAG_DOCUMENT_2: int, FLAG_DOCUMENT_3: int, FLAG_DOCUMENT_4: int, FLAG_DOCUMENT_5: int, FLAG_DOCUMENT_6: int, FLAG_DOCUMENT_7: int, FLAG_DOCUMENT_8: int, FLAG_DOCUMENT_9: int, FLAG_DOCUMENT_10: int, FLAG_DOCUMENT_11: int, FLAG_DOCUMENT_12: int]

In [5]:
data = data.withColumn("AGE", F.col("DAYS_BIRTH")/-365)
data.select("DAYS_BIRTH","AGE").describe().show()

+-------+-----------------+------------------+
|summary|       DAYS_BIRTH|               AGE|
+-------+-----------------+------------------+
|  count|             1000|              1000|
|   mean|       -15872.748|43.486980821917804|
| stddev|4235.854370105366|11.605080466042105|
|    min|           -25104| 21.10958904109589|
|    max|            -7705| 68.77808219178083|
+-------+-----------------+------------------+



In [6]:
data = data.withColumn('DAYS_EMPLOYED_ANOM',F.col("DAYS_EMPLOYED") == 365243)
data = data.withColumn('DAYS_EMPLOYED', F.when(F.col('DAYS_EMPLOYED') == 365243, 0).otherwise(F.col('DAYS_EMPLOYED')))

In [7]:
data = data.withColumn('CREDIT_INCOME_PERCENT',F.col('AMT_CREDIT')/F.col('AMT_INCOME_TOTAL'))
data = data.withColumn('ANNUITY_INCOME_PERCENT',F.col('AMT_ANNUITY')/F.col('AMT_INCOME_TOTAL'))
data = data.withColumn('CREDIT_TERM',F.col('AMT_ANNUITY')/F.col('AMT_CREDIT'))
data = data.withColumn('DAYS_EMPLOYED_PERCENT',F.col('DAYS_EMPLOYED')/F.col('DAYS_BIRTH'))
data = data.withColumn("label",F.col("TARGET"))

In [8]:
feature_cols = [
 'CNT_CHILDREN',
 'AMT_INCOME_TOTAL',
 'AMT_CREDIT',
 'AMT_ANNUITY',
 'DAYS_EMPLOYED',
 'FLAG_MOBIL',
 'FLAG_EMP_PHONE',
 'FLAG_WORK_PHONE',
 'FLAG_CONT_MOBILE',
 'FLAG_PHONE',
 'CNT_FAM_MEMBERS',
 'REGION_RATING_CLIENT',
 'REGION_RATING_CLIENT_W_CITY',
 'REG_REGION_NOT_LIVE_REGION',
 'REG_REGION_NOT_WORK_REGION',
 'FLAG_DOCUMENT_2',
 'FLAG_DOCUMENT_3',
 'FLAG_DOCUMENT_4',
 'FLAG_DOCUMENT_5',
 'FLAG_DOCUMENT_6',
 'FLAG_DOCUMENT_7',
 'FLAG_DOCUMENT_8',
 'FLAG_DOCUMENT_9',
 'FLAG_DOCUMENT_10',
 'FLAG_DOCUMENT_11',
 'FLAG_DOCUMENT_12',
 'NAME_CONTRACT_TYPE_index',
 'CODE_GENDER_index',
 'FLAG_OWN_CAR_index',
 'FLAG_OWN_REALTY_index',
 'NAME_INCOME_TYPE_Vec',
 'NAME_EDUCATION_TYPE_Vec',
 'ORGANIZATION_TYPE_Vec',
 'AGE',
 'DAYS_EMPLOYED_ANOM',
 'bucketedData',
 'CREDIT_INCOME_PERCENT',
 'ANNUITY_INCOME_PERCENT',
 'CREDIT_TERM',
 'DAYS_EMPLOYED_PERCENT']

In [9]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in 
            ['NAME_CONTRACT_TYPE','CODE_GENDER','FLAG_OWN_CAR','FLAG_OWN_REALTY','NAME_INCOME_TYPE',
             'NAME_EDUCATION_TYPE','ORGANIZATION_TYPE']] 
encoder = [OneHotEncoder().setInputCol(column+"_index").setOutputCol(column + "_Vec") for column in 
           ['NAME_INCOME_TYPE', 'NAME_EDUCATION_TYPE','ORGANIZATION_TYPE']] 

In [10]:
assembler = VectorAssembler().setInputCols(feature_cols).setOutputCol("features")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [11]:
splits = [0, 25.0, 35.0, 55.0, 100.0]
bucketizer = Bucketizer(splits=splits, inputCol="AGE", outputCol="bucketedData")

In [12]:
pipeline = Pipeline(stages=[bucketizer] + indexers + encoder + [assembler, lr])
pmodel = pipeline.fit(data)

In [13]:
# Model Persistence
pmodel.write().overwrite().save('hdfs:///user/edureka_448212/pymodellr_ujjwal')

In [14]:
predictions = pmodel.transform(data)

In [15]:
pipeline = PipelineModel.load("hdfs:///user/edureka_448212/pymodellr_ujjwal")

In [16]:
predictions = pipeline.transform(data)

In [17]:
predictions.show(1)

+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+----------------+--------------------+--------------------+-----------------+----------+-------------+----------+--------------+---------------+----------------+----------+---------------+--------------------+---------------------------+--------------------------+--------------------------+--------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------+----------------+----------------+-----------------+------------------+---------------------+----------------------+-------------------+---------------------+-----+------------+------------------------+-----------------+------------------+---------------------+----------------------+-------------------------+-----------------------+--------------------+-----------------------+---------------------+--------------------+