In [2]:
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import findspark
findspark.init()

In [3]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder \
    .master("local") \
    .appName("MoveoutPrediction") \
    .getOrCreate()

In [5]:
pandasDF = pd.read_csv( "./../data/gig_leases.csv" )
moveoutSchema = StructType( [ StructField("property_id", LongType() )\
                             ,StructField("property_unit_id", LongType() )\
                             ,StructField("unit_space_id", LongType() )\
                             ,StructField("lease_id", LongType() )\
                             ,StructField( "move_in_date", StringType() )\
                             ,StructField( "lease_interval_type_id", LongType() )\
                             ,StructField( "first_renewal_start_date", StringType(), True )\
                             ,StructField( "first_renewal_end_date", StringType(), True )\
                             ,StructField( "last_renewal_start_date", StringType(), True )\
                             ,StructField( "last_renewal_end_date", StringType(), True )\
                             ,StructField( "first_notice_date", StringType(), True )\
                             ,StructField( "last_notice_date", StringType(), True )\
                             ,StructField( "number_of_units", LongType() )\
                             ,StructField( "move_in_year_cohort", IntegerType() )\
                             ,StructField( "estimated_move_out", StringType(), True )\
                             ,StructField( "six_month_before_end", StringType(), True )\
                            ,StructField( "censored", ShortType() )\
                            ,StructField( "duration", LongType() )\
                            ,StructField( "is_male", LongType() )\
                            ,StructField( "current_age", FloatType() )\
                            ,StructField( "age_at_move_in", FloatType() )\
                            ,StructField( "six_month_payment_count", FloatType() )
                            ,StructField( "market_rent", FloatType() )
                            ,StructField( "amenity_rent_flag", FloatType() )] )




In [6]:
sdf = spark.createDataFrame( pandasDF, schema = moveoutSchema )
sdf = sdf.replace('NaN', None)
# show only 5 rows
sdf = sdf.select( col('lease_id'), col('lease_interval_type_id'), col('number_of_units'), col('move_in_year_cohort'),
           col('duration'), col('is_male'), col('current_age'), col('age_at_move_in'), 
           col('six_month_payment_count'), col('market_rent'), col('amenity_rent_flag'), col('censored') )   
 

In [7]:
current_age = sdf.approxQuantile("current_age", [ 0.5 ], 0.25)
print(current_age)
age_at_move_in = sdf.approxQuantile("age_at_move_in", [ 0.5 ], 0.25)
print(age_at_move_in)
six_month_payment_count = sdf.approxQuantile("six_month_payment_count", [ 0.5 ], 0.25)
print(six_month_payment_count)
market_rent = sdf.approxQuantile("market_rent", [ 0.5 ], 0.25)
print(market_rent)
amenity_rent_flag = sdf.approxQuantile("amenity_rent_flag", [ 0.5 ], 0.25)
print(amenity_rent_flag)

sdf.na.fill( { 'current_age':current_age[0], 'age_at_move_in':age_at_move_in[0], 
             'six_month_payment_count' : six_month_payment_count[0], 
             'market_rent': market_rent[0], 'amenity_rent_flag' : amenity_rent_flag[0] } )

[34.0]
[31.0]
[6.0]
[2131.0]
[1.0]


DataFrame[lease_id: bigint, lease_interval_type_id: bigint, number_of_units: bigint, move_in_year_cohort: int, duration: bigint, is_male: bigint, current_age: float, age_at_move_in: float, six_month_payment_count: float, market_rent: float, amenity_rent_flag: float, censored: smallint]

In [8]:
# survival analysis
from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

assembler = VectorAssembler( inputCols=[ 'duration', 'six_month_payment_count' ], outputCol="features", handleInvalid= "skip" )
modelDf = assembler.transform( sdf ).select( col("lease_id").alias("label"), col("features"), col("censored").alias("censor") )

aftsr = AFTSurvivalRegression()
aftsr.setMaxIter(30)
model = aftsr.fit(modelDf)

model.transform(modelDf).show(10)

+--------+----------+------+--------------------+
|   label|  features|censor|          prediction|
+--------+----------+------+--------------------+
|13106763|[32.0,7.0]|     0|3.448702679086453...|
|13034395|[63.0,7.0]|     1|2.838609937424484E16|
|13153039|[12.0,7.0]|     0|3.910246756697529...|
|13141760|[40.0,7.0]|     0|3.279717146759322...|
|13165497|[28.0,6.0]|     0|3.523816754992268...|
|13021375|[49.0,7.0]|     0|3.099485477751760...|
|12952136|[77.0,6.0]|     1|2.590417179008697...|
|12974990|[25.0,6.0]|     0|3.590836059172275...|
|13103483|[32.0,6.0]|     0|3.436399397315663...|
|12882356|[60.0,6.0]|     1|2.882277955268536...|
+--------+----------+------+--------------------+
only showing top 10 rows

