In [None]:

from pyspark.sql import SparkSession
import warnings

import pandas as pd
from pyspark.sql.functions import max, min, avg, year, dayofyear, to_date, date_format, dayofweek, hour, month, length
from pyspark.context import SparkContext as sc
from pyspark.sql.types import StructField, StringType, StructType
from pyspark.sql import SQLContext

PATH_TO_DATA = "../data/"

warnings.filterwarnings('ignore')

spark=SparkSession.builder.master("local[*]").appName("preprocessing").config("spark.driver.memory", "16g").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
sqlContext = SQLContext(spark)

#file to load is churn.csv in the same directory

dataset = spark.read.option("header", "true").option("inferSchema", "true").csv(PATH_TO_DATA + "churn.csv")
dataset.show(5)

In [None]:
#remove negative values from column 'days_since_last_login'

dataset = dataset.filter(dataset['days_since_last_login'] > 0)
dataset.show(5)

In [None]:
#remove rows with 'security_no' string length greater than 7


dataset = dataset.filter(length(dataset['security_no']) <= 7)

dataset.count()

In [None]:
#remove rows with avg_time_spent less than 0

dataset = dataset.filter(dataset['avg_time_spent'] >= 0)
dataset.count()

In [None]:
#remove rows with avg_frequency_login_days set to Error

dataset = dataset.filter(dataset['avg_frequency_login_days'] != "Error")
dataset.count()


In [None]:
#remove rows with points_in_wallet less or equal to 0

dataset = dataset.filter(dataset['points_in_wallet'] > 0)
dataset.count()

In [None]:
#remove rows with joined_through_referral set to ?

dataset = dataset.filter(dataset['joined_through_referral'] != "?")
dataset.count()

#save the cleaned dataset to a new csv file

dataset.write.csv(PATH_TO_DATA + "dataCleaned.csv", header=True)