In [1]:
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
from pyspark import SparkContext, SparkConf,StorageLevel
# create sparksession
conf = SparkConf().setAppName("MLLab").setMaster("local[2]")
sc = SparkContext(conf=conf)
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

In [2]:
USE_PLOTLY = True
if USE_PLOTLY:
    import plotly.express as px
    import plotly.graph_objs as go
    import pandas as pd
    import requests
    import plotly.io as pio
    pio.renderers.default = 'iframe'

In [3]:
features_path = "../data/features.csv"
labels_path = "../data/labels.csv"

### Problem description

Your goal is to predict the operating condition of a waterpoint for each record in the dataset. You are provided the following set of information about the waterpoints:

`amount_tsh` - Total static head (amount water available to waterpoint) <br>
`date_recorded` - The date the row was entered<br>
`funder` - Who funded the well<br>
`gps_height` - Altitude of the well<br>
`installer` - Organization that installed the well<br>
`longitude` - GPS coordinate<br>
`latitude` - GPS coordinate<br>
`wpt_name` - Name of the waterpoint if there is one<br>
`num_private` -
`basin` - Geographic water basin<br>
`subvillage` - Geographic location<br>
`region` - Geographic location<br>
`region_code` - Geographic location (coded)<br>
`district_code` - Geographic location (coded)<br>
`lga` - Geographic location<br>
`ward` - Geographic location<br>
`population` - Population around the well<br>
`public_meeting` - True/False<br>
`recorded_by` - Group entering this row of data<br>
`scheme_management` - Who operates the waterpoint<br>
`scheme_name` - Who operates the waterpoint<br>
`permit` - If the waterpoint is permitted<br>
`construction_year` - Year the waterpoint was constructed<br>
`extraction_type` - The kind of extraction the waterpoint uses<br>
`extraction_type_group` - The kind of extraction the waterpoint uses<br>
`extraction_type_class` - The kind of extraction the waterpoint uses<br>
`management` - How the waterpoint is managed<br>
`management_group` - How the waterpoint is managed<br>
`payment` - What the water costs<br>
`payment_type` - What the water costs<br>
`water_quality` - The quality of the water<br>
`quality_group` - The quality of the water<br>
`quantity` - The quantity of water<br>
`quantity_group` - The quantity of water<br>
`source` - The source of the water<br>
`source_type` - The source of the water<br>
`source_class` - The source of the water<br>
`waterpoint_type` - The kind of waterpoint<br>
`waterpoint_type_group` - The kind of waterpoint<br>

In [4]:
features = spark.read.format("csv").option("inferSchema", 'true').option("header", "true").load(features_path)
labels = spark.read.format("csv").option("inferSchema", 'true').option("header", "true").load(labels_path)

In [5]:
if USE_PLOTLY:
    fig = px.histogram(labels.toPandas(), x='status_group')
    fig.show()

In [6]:
features.columns

['id',
 'amount_tsh',
 'date_recorded',
 'funder',
 'gps_height',
 'installer',
 'longitude',
 'latitude',
 'wpt_name',
 'num_private',
 'basin',
 'subvillage',
 'region',
 'region_code',
 'district_code',
 'lga',
 'ward',
 'population',
 'public_meeting',
 'recorded_by',
 'scheme_management',
 'scheme_name',
 'permit',
 'construction_year',
 'extraction_type',
 'extraction_type_group',
 'extraction_type_class',
 'management',
 'management_group',
 'payment',
 'payment_type',
 'water_quality',
 'quality_group',
 'quantity',
 'quantity_group',
 'source',
 'source_type',
 'source_class',
 'waterpoint_type',
 'waterpoint_type_group']

In [7]:
from pyspark.sql.functions import col
full_train = features.alias('a').join(labels.alias('b'),col('b.id') == col('a.id')).drop('id')

In [8]:
full_train.take(1)

[Row(amount_tsh=6000.0, date_recorded=datetime.datetime(2011, 3, 14, 0, 0), funder='Roman', gps_height=1390, installer='Roman', longitude=34.93809275, latitude=-9.85632177, wpt_name='none', num_private=0, basin='Lake Nyasa', subvillage='Mnyusi B', region='Iringa', region_code=11, district_code=5, lga='Ludewa', ward='Mundindi', population=109, public_meeting=True, recorded_by='GeoData Consultants Ltd', scheme_management='VWC', scheme_name='Roman', permit=False, construction_year=1999, extraction_type='gravity', extraction_type_group='gravity', extraction_type_class='gravity', management='vwc', management_group='user-group', payment='pay annually', payment_type='annually', water_quality='soft', quality_group='good', quantity='enough', quantity_group='enough', source='spring', source_type='spring', source_class='groundwater', waterpoint_type='communal standpipe', waterpoint_type_group='communal standpipe', status_group='functional')]

In [9]:
from pyspark.sql.types import DoubleType, IntegerType, DateType, BooleanType

double_columns = ['amount_tsh', 'longitude', 'latitude', ]
int_columns = ['gps_height', 'num_private', 'region_code', 'district_code', 'population','construction_year']
bool_columns = ['public_meeting','permit']
numeric_columns = double_columns + int_columns
date_columns = ['date_recorded']

indexed_columns = date_columns + ['lga']
drop_colums =  ['ward', 'funder', 'installer', 'wpt_name', 'subvillage', 'scheme_name', 'recorded_by']

target_column = 'status_group'

not_unique_columns = double_columns + int_columns + bool_columns + drop_colums + [target_column] + indexed_columns
unique_columns = [elem for elem in full_train.columns if elem not in not_unique_columns] + bool_columns

In [10]:
if USE_PLOTLY:
    import plotly.express as px
    fig = px.scatter_matrix(full_train.toPandas()[numeric_columns])
    fig.show()

In [11]:
dropped = full_train

In [12]:
for column in numeric_columns:
    dropped =  dropped.withColumn(column, dropped[column].cast("double"))

In [13]:
for column in bool_columns:
    dropped =  dropped.withColumn(column, dropped[column].cast("string"))

In [14]:
for column in date_columns:
    dropped =  dropped.withColumn(column, dropped[column].cast("string"))


In [15]:
unique_values = {}
print("unique_columns:")
for column in unique_columns:
    unique_values[column] = dropped.select(column).distinct().count()
    print(column, unique_values[column])
print('\ndropped columns:')
for column in drop_colums:
    unique_values[column] = full_train.select(column).distinct().count()
    print(column, unique_values[column])
print('\nindexed columns:')
for column in indexed_columns:
    unique_values[column] = full_train.select(column).distinct().count()
    print(column, unique_values[column])

unique_columns:
basin 9
region 21
scheme_management 13
extraction_type 18
extraction_type_group 13
extraction_type_class 7
management 12
management_group 5
payment 7
payment_type 7
water_quality 8
quality_group 6
quantity 5
quantity_group 5
source 10
source_type 7
source_class 3
waterpoint_type 7
waterpoint_type_group 6
public_meeting 3
permit 3

dropped columns:
ward 2092
funder 1898
installer 2146
wpt_name 37400
subvillage 19288
scheme_name 2697
recorded_by 1

indexed columns:
date_recorded 356
lga 125


In [16]:
dropped.schema

StructType(List(StructField(amount_tsh,DoubleType,true),StructField(date_recorded,StringType,true),StructField(funder,StringType,true),StructField(gps_height,DoubleType,true),StructField(installer,StringType,true),StructField(longitude,DoubleType,true),StructField(latitude,DoubleType,true),StructField(wpt_name,StringType,true),StructField(num_private,DoubleType,true),StructField(basin,StringType,true),StructField(subvillage,StringType,true),StructField(region,StringType,true),StructField(region_code,DoubleType,true),StructField(district_code,DoubleType,true),StructField(lga,StringType,true),StructField(ward,StringType,true),StructField(population,DoubleType,true),StructField(public_meeting,StringType,true),StructField(recorded_by,StringType,true),StructField(scheme_management,StringType,true),StructField(scheme_name,StringType,true),StructField(permit,StringType,true),StructField(construction_year,DoubleType,true),StructField(extraction_type,StringType,true),StructField(extraction_type

In [17]:
from pyspark.sql.functions import avg
def fill_with_mean(df, include=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in include
    ))
    return df.fillna(stats.first().asDict())

def fill_with_median(df, include=list()):
    medians = df.approxQuantile(include, [0.5], 0.01)
    assert isinstance(include, (tuple,list))
    medians = list(map(lambda x: x[0],medians))
    medians=dict(zip(include,medians))
    print(medians)
    return df.fillna(medians)


dropped = fill_with_median(dropped, numeric_columns)

{'amount_tsh': 0.0, 'longitude': 34.90038636, 'latitude': -5.02762847, 'gps_height': 365.0, 'num_private': 0.0, 'region_code': 12.0, 'district_code': 3.0, 'population': 25.0, 'construction_year': 1986.0}


In [18]:
features_column = "features"
label_column = "label"

In [19]:
dropped.select(numeric_columns).take(5)

[Row(amount_tsh=6000.0, longitude=34.93809275, latitude=-9.85632177, gps_height=1390.0, num_private=0.0, region_code=11.0, district_code=5.0, population=109.0, construction_year=1999.0),
 Row(amount_tsh=0.0, longitude=34.6987661, latitude=-2.14746569, gps_height=1399.0, num_private=0.0, region_code=20.0, district_code=2.0, population=280.0, construction_year=2010.0),
 Row(amount_tsh=25.0, longitude=37.46066446, latitude=-3.82132853, gps_height=686.0, num_private=0.0, region_code=21.0, district_code=4.0, population=250.0, construction_year=2009.0),
 Row(amount_tsh=0.0, longitude=38.48616088, latitude=-11.15529772, gps_height=263.0, num_private=0.0, region_code=90.0, district_code=63.0, population=58.0, construction_year=1986.0),
 Row(amount_tsh=0.0, longitude=31.13084671, latitude=-1.82535885, gps_height=0.0, num_private=0.0, region_code=18.0, district_code=1.0, population=0.0, construction_year=0.0)]

In [20]:
dropped.schema
from pyspark.sql.functions import isnan, when, count, col
temp_df = dropped.select([target_column])
temp_df.select([count(when(isnan(c), c)).alias(c) for c in temp_df.columns]).show()

+------------+
|status_group|
+------------+
|           0|
+------------+



In [21]:
dropped.columns

['amount_tsh',
 'date_recorded',
 'funder',
 'gps_height',
 'installer',
 'longitude',
 'latitude',
 'wpt_name',
 'num_private',
 'basin',
 'subvillage',
 'region',
 'region_code',
 'district_code',
 'lga',
 'ward',
 'population',
 'public_meeting',
 'recorded_by',
 'scheme_management',
 'scheme_name',
 'permit',
 'construction_year',
 'extraction_type',
 'extraction_type_group',
 'extraction_type_class',
 'management',
 'management_group',
 'payment',
 'payment_type',
 'water_quality',
 'quality_group',
 'quantity',
 'quantity_group',
 'source',
 'source_type',
 'source_class',
 'waterpoint_type',
 'waterpoint_type_group',
 'status_group']

In [22]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler as SC
from pyspark.ml import Pipeline

encoders = []

assembler_numeric = VectorAssembler(
    inputCols=numeric_columns,
    outputCol='numeric')
scaler = SC(inputCol='numeric', outputCol='_'+'numeric',
                        withStd=True, withMean=True)


    
for column in unique_columns + indexed_columns:
    enc = StringIndexer(inputCol=column,
                                 outputCol="_"+column)
    
    enc.setHandleInvalid('keep')
    encoders.append(enc)

new_indexed_columns = list(map(lambda x: "_"+x, indexed_columns))
new_unique_columns = list(map(lambda x: "_"+x, unique_columns))
new_new = list(map(lambda x: "_"+x, new_unique_columns))
encoder2 = OneHotEncoderEstimator(inputCols=new_unique_columns,
                                 outputCols=new_new)


#encoder3 = OneHotEncoderEstimator(inputCols=bool_columns,
#                                 outputCols=list(map(lambda x: "_"+x, bool_columns)))

print(new_indexed_columns, unique_columns)

label_encoder = StringIndexer(inputCol=target_column,
                                 outputCol=label_column)

assembler = VectorAssembler(
    inputCols=(['_numeric'] + new_new + new_indexed_columns),
    outputCol=features_column)



pipeline = Pipeline(stages=[assembler_numeric, scaler, *encoders, encoder2, label_encoder, assembler])


model = pipeline.fit(dropped)
encoded = model.transform(dropped)

preprocessed_data = encoded.drop(*(new_unique_columns+unique_columns + bool_columns))


['_date_recorded', '_lga'] ['basin', 'region', 'scheme_management', 'extraction_type', 'extraction_type_group', 'extraction_type_class', 'management', 'management_group', 'payment', 'payment_type', 'water_quality', 'quality_group', 'quantity', 'quantity_group', 'source', 'source_type', 'source_class', 'waterpoint_type', 'waterpoint_type_group', 'public_meeting', 'permit']


In [23]:
preprocessed_data.select(features_column).take(1)

[Row(features=SparseVector(183, {0: 1.8956, 1: 0.1311, 2: -1.4088, 3: 1.0412, 4: -0.0387, 5: -0.2443, 6: -0.0654, 7: -0.1504, 8: 0.7339, 15: 1.0, 18: 1.0, 39: 1.0, 51: 1.0, 69: 1.0, 82: 1.0, 89: 1.0, 101: 1.0, 111: 1.0, 118: 1.0, 120: 1.0, 128: 1.0, 134: 1.0, 139: 1.0, 144: 1.0, 154: 1.0, 161: 1.0, 164: 1.0, 171: 1.0, 177: 1.0, 180: 1.0, 181: 3.0, 182: 37.0}))]

In [24]:
preprocessed_data.select([features_column, label_column]).count()

59400

In [25]:
from pyspark.sql.functions import isnan, when, count, col
temp_df = preprocessed_data.select([label_column])
temp_df.select([count(when(isnan(c), c)).alias(c) for c in temp_df.columns]).show()

+-----+
|label|
+-----+
|    0|
+-----+



### Vis projections

In [26]:

if USE_PLOTLY:
    import numpy as np
    vis_X  = np.stack(preprocessed_data.select([features_column]).toPandas()[features_column].apply(lambda x : np.array(x.toArray())).values)
    print(vis_X)
    print(vis_X.shape)


[[ 1.89564913e+00  1.31050627e-01 -1.40877876e+00 ...  1.00000000e+00
   3.00000000e+00  3.70000000e+01]
 [-1.05969136e-01  9.46091901e-02  1.20792394e+00 ...  0.00000000e+00
   1.88000000e+02  2.20000000e+01]
 [-9.76290594e-02  5.15153845e-01  6.39746070e-01 ...  0.00000000e+00
   9.10000000e+01  8.10000000e+01]
 ...
 [-1.05969136e-01 -9.18770586e-03 -1.03339510e+00 ...  1.00000000e+00
   1.66000000e+02  3.20000000e+01]
 [-1.05969136e-01  2.71626514e-01 -2.28287934e-01 ...  0.00000000e+00
   6.40000000e+01  7.30000000e+01]
 [-1.05969136e-01  6.13119652e-01 -3.53504700e-01 ...  0.00000000e+00
   1.80000000e+01  4.20000000e+01]]
(59400, 183)


In [27]:
if USE_PLOTLY:
    vis_Y  = np.stack(preprocessed_data.select([label_column]).toPandas()[label_column])


In [30]:
if USE_PLOTLY:
    from sklearn.decomposition import PCA
    from sklearn.preprocessing import StandardScaler
    
    X_embedded = StandardScaler().fit_transform(PCA(n_components=2).fit_transform(vis_X))
    print(X_embedded)
    fig = px.scatter(x=X_embedded[:,0], y=X_embedded[:,1], color=vis_Y)
    print(X_embedded)
    fig.show()

[[-1.28474841 -0.14029904]
 [ 1.41110138 -0.47928638]
 [-0.01934886  1.30990496]
 ...
 [ 1.08781893 -0.17911527]
 [-0.4092007   1.03904494]
 [-1.06814168  0.02932886]]
[[-1.28474841 -0.14029904]
 [ 1.41110138 -0.47928638]
 [-0.01934886  1.30990496]
 ...
 [ 1.08781893 -0.17911527]
 [-0.4092007   1.03904494]
 [-1.06814168  0.02932886]]


### Train model

In [29]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



# Split the data into training and test sets (20% held out for testing)
(trainingData, testData) = preprocessed_data.select([features_column, label_column]).randomSplit([0.8, 0.2], seed=42)

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol=label_column, featuresCol=features_column)
dt.setMaxBins(400)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", label_column, features_column).show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol=label_column, predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

print("Accuracy = %g " % (accuracy))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  1.0|(183,[0,1,2,3,4,5...|
|       1.0|  1.0|(183,[0,1,2,3,4,5...|
|       0.0|  1.0|(183,[0,1,2,3,4,5...|
|       1.0|  0.0|(183,[0,1,2,3,4,5...|
|       1.0|  0.0|(183,[0,1,2,3,4,5...|
+----------+-----+--------------------+
only showing top 5 rows

Test Error = 0.280792 
Accuracy = 0.719208 
