In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, lit, trim, avg, ceil
from pyspark.sql.types import StringType
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

In [0]:
import os
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('sss') \
        .getOrCreate()

In [0]:
# dbutils.fs.rm("dbfs:/FileStore/shared_uploads/sinha.ashish.4.u@gmail.com/", True) 

Out[3]: True

##Mount the S3 data

In [0]:
dbutils.fs.cp("/FileStore/tables/dcad_data/dwh-2.cfg", "file:///tmp/dwh-2.cfg")
import configparser
config = configparser.ConfigParser()
config.read_file(open('/tmp/dwh-2.cfg'))
KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

In [0]:
ACCESS_KEY = KEY
# Encode the Secret Key to remove any "/" characters
SECRET_KEY = SECRET.replace("/", "%2F")
AWS_BUCKET_NAME = "s3pysparkfiledump/udacity"
MOUNT_NAME = "/mnt/Ash-tmp"

In [0]:
try:
  dbutils.fs.mount("s3a://{}:{}@{}".format(ACCESS_KEY, SECRET_KEY, AWS_BUCKET_NAME), MOUNT_NAME)
except:
  print("""{} already mounted. Unmount using `dbutils.fs.unmount("{}")` to unmount first""".format(MOUNT_NAME, MOUNT_NAME))

/mnt/Ash-tmp already mounted. Unmount using `dbutils.fs.unmount("/mnt/Ash-tmp")` to unmount first


In [0]:
%fs ls /mnt/Ash-tmp

path,name,size
dbfs:/mnt/Ash-tmp/features.csv,features.csv,20069199
dbfs:/mnt/Ash-tmp/firecalls/,firecalls/,0
dbfs:/mnt/Ash-tmp/hr/,hr/,0
dbfs:/mnt/Ash-tmp/labels.csv,labels.csv,1148327
dbfs:/mnt/Ash-tmp/payment/,payment/,0
dbfs:/mnt/Ash-tmp/songs/,songs/,0
dbfs:/mnt/Ash-tmp/web_sales.csv,web_sales.csv,12010775


In [0]:
## If unmounting is required
#dbutils.fs.unmount("/mnt/Ash-tmp")

In [0]:
import pandas as pd
df_features = spark.read.csv("/mnt/Ash-tmp/features.csv", inferSchema = True, header= True)
df_label = spark.read.csv("/mnt/Ash-tmp/labels.csv", inferSchema = True, header= True)

In [0]:
print(df_features.count())
print(df_label.count())

59400
59400


#Join the two dataframe

In [0]:
print(df_features.columns)
df_label.columns

['id', 'amount_tsh', 'date_recorded', 'funder', 'gps_height', 'installer', 'longitude', 'latitude', 'wpt_name', 'num_private', 'basin', 'subvillage', 'region', 'region_code', 'district_code', 'lga', 'ward', 'population', 'public_meeting', 'recorded_by', 'scheme_management', 'scheme_name', 'permit', 'construction_year', 'extraction_type', 'extraction_type_group', 'extraction_type_class', 'management', 'management_group', 'payment', 'payment_type', 'water_quality', 'quality_group', 'quantity', 'quantity_group', 'source', 'source_type', 'source_class', 'waterpoint_type', 'waterpoint_type_group']
Out[4]: ['id', 'status_group']

In [0]:
data = df_features.join(df_label, on='id')

#df_features.join(df_label,df_label.id ===  df_features.id,"inner")

In [0]:
data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- amount_tsh: double (nullable = true)
 |-- date_recorded: string (nullable = true)
 |-- funder: string (nullable = true)
 |-- gps_height: integer (nullable = true)
 |-- installer: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- wpt_name: string (nullable = true)
 |-- num_private: integer (nullable = true)
 |-- basin: string (nullable = true)
 |-- subvillage: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- district_code: integer (nullable = true)
 |-- lga: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- public_meeting: boolean (nullable = true)
 |-- recorded_by: string (nullable = true)
 |-- scheme_management: string (nullable = true)
 |-- scheme_name: string (nullable = true)
 |-- permit: boolean (nullable = true)
 |-- construction_year: integer (nullable = 

In [0]:
data = data.withColumn('region_code', col('region_code').cast(StringType()))
data = data.withColumn('district_code', col('district_code').cast(StringType()))
            

In [0]:
data.printSchema()


root
 |-- id: integer (nullable = true)
 |-- amount_tsh: double (nullable = true)
 |-- date_recorded: string (nullable = true)
 |-- funder: string (nullable = true)
 |-- gps_height: integer (nullable = true)
 |-- installer: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- wpt_name: string (nullable = true)
 |-- num_private: integer (nullable = true)
 |-- basin: string (nullable = true)
 |-- subvillage: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_code: string (nullable = true)
 |-- district_code: string (nullable = true)
 |-- lga: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- public_meeting: boolean (nullable = true)
 |-- recorded_by: string (nullable = true)
 |-- scheme_management: string (nullable = true)
 |-- scheme_name: string (nullable = true)
 |-- permit: boolean (nullable = true)
 |-- construction_year: integer (nullable = tr

In [0]:
data = data.distinct()
#data.count()

In [0]:
str_cols = [item[0] for item in data.dtypes if item[1].startswith('string')]
for col in str_cols:
  data = data.withColumn(col,trim(data[col]))

Task 3 - Remove columns with null values more than a threshold. If you are disconnected, please run the previous cells by clicking on this cell, going to Runtime, then clicking Run before.

In [0]:
data.limit(10).toPandas()

Unnamed: 0,id,amount_tsh,date_recorded,funder,gps_height,installer,longitude,latitude,wpt_name,num_private,...,water_quality,quality_group,quantity,quantity_group,source,source_type,source_class,waterpoint_type,waterpoint_type_group,status_group
0,46144,0.0,2011-08-03,Isingiro Ho,0,Artisan,30.626991,-1.257051,Kwapeto,0,...,soft,good,enough,enough,shallow well,shallow well,groundwater,hand pump,hand pump,functional
1,19816,0.0,2012-10-01,Dwsp,0,DWSP,33.36241,-3.766365,Kwa Ngomho,0,...,soft,good,enough,enough,machine dbh,borehole,groundwater,hand pump,hand pump,non functional
2,9944,20.0,2011-03-13,Mkinga Distric Coun,0,DWE,39.172796,-4.765587,Tajiri,0,...,salty,salty,enough,enough,other,other,unknown,communal standpipe multiple,communal standpipe,functional
3,67743,0.0,2013-01-28,Unicef,263,UNICEF,38.486161,-11.155298,Zahanati Ya Nanyumbu,0,...,soft,good,dry,dry,machine dbh,borehole,groundwater,communal standpipe multiple,communal standpipe,non functional
4,53934,0.0,2012-11-03,Wateraid,0,Water Aid,32.7111,-5.146712,Kwa Ramadhan Musa,0,...,salty,salty,seasonal,seasonal,machine dbh,borehole,groundwater,hand pump,hand pump,non functional
5,8776,0.0,2013-03-06,Grumeti,1399,GRUMETI,34.698766,-2.147466,Zahanati,0,...,soft,good,insufficient,insufficient,rainwater harvesting,rainwater harvesting,surface,communal standpipe,communal standpipe,functional
6,69572,6000.0,2011-03-14,Roman,1390,Roman,34.938093,-9.856322,none,0,...,soft,good,enough,enough,spring,spring,groundwater,communal standpipe,communal standpipe,functional
7,54551,0.0,2012-10-09,Rwssp,0,DWE,32.620617,-4.226198,Tushirikiane,0,...,milky,milky,enough,enough,shallow well,shallow well,groundwater,hand pump,hand pump,non functional
8,34310,25.0,2013-02-25,Lottery Club,686,World vision,37.460664,-3.821329,Kwa Mahundi,0,...,soft,good,enough,enough,dam,dam,surface,communal standpipe multiple,communal standpipe,functional
9,19728,0.0,2011-07-13,Action In A,0,Artisan,31.130847,-1.825359,Shuleni,0,...,soft,good,seasonal,seasonal,rainwater harvesting,rainwater harvesting,surface,communal standpipe,communal standpipe,functional


In [0]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import isnan, when, count, col, lit, trim, avg, ceil
pd.options.display.max_columns = None
data_cnt=  data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns if c not in {'date_recorded','public_meeting','permit'}]).toPandas()
data_cnt


Unnamed: 0,id,amount_tsh,funder,gps_height,installer,longitude,latitude,wpt_name,num_private,basin,subvillage,region,region_code,district_code,lga,ward,population,recorded_by,scheme_management,scheme_name,construction_year,extraction_type,extraction_type_group,extraction_type_class,management,management_group,payment,payment_type,water_quality,quality_group,quantity,quantity_group,source,source_type,source_class,waterpoint_type,waterpoint_type_group,status_group
0,0,0,3635,0,3655,0,0,0,0,0,371,0,0,0,0,0,0,0,3877,28166,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


#Optional Step when we want to remove data which has more than 40 per null value

In [0]:
data_cnt=  data.select([(count(when(isnan(c) | col(c).isNull(), c))/data.count()).alias(c) for c in data.columns if c not in {'date_recorded','public_meeting','permit'}]).collect()

In [0]:
agg_dict_list = [row.asDict() for row in data_cnt]
agg_dict = agg_dict_list[0]
print(agg_dict)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-2220046690990348>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0magg_dict_list[0m [0;34m=[0m [0;34m[[0m[0mrow[0m[0;34m.[0m[0masDict[0m[0;34m([0m[0;34m)[0m [0;32mfor[0m [0mrow[0m [0;32min[0m [0mdata_cnt[0m[0;34m][0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0magg_dict[0m [0;34m=[0m [0magg_dict_list[0m[0;34m[[0m[0;36m0[0m[0;34m][0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m [0mprint[0m[0;34m([0m[0magg_dict[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m<command-2220046690990348>[0m in [0;36m<listcomp>[0;34m(.0)[0m
[0;32m----> 1[0;31m [0magg_dict_list[0m [0;34m=[0m [0;34m[[0m[0mrow[0m[0;34m.[0m[0masDict[0m[0;34m([0m[0;34m)[0m [0;32mfor[0m [0mrow[0m [0;32min[0m [0mdata_cnt[0m[0;34m][0m[0;34m[0m[0;34m[

In [0]:
col_nulls = [i for i in agg_dict if agg_dict[i] > .4 ]
data = data.drop(*col_nulls)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-1918953434311759>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mcol_nulls[0m [0;34m=[0m [0;34m[[0m[0mi[0m [0;32mfor[0m [0mi[0m [0;32min[0m [0magg_dict[0m [0;32mif[0m [0magg_dict[0m[0;34m[[0m[0mi[0m[0;34m][0m [0;34m>[0m [0;36m.4[0m [0;34m][0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0mdata[0m [0;34m=[0m [0mdata[0m[0;34m.[0m[0mdrop[0m[0;34m([0m[0;34m*[0m[0mcol_nulls[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mNameError[0m: name 'agg_dict' is not defined

Task 4 - Group, aggregate, create pivot table. If you are disconnected, please run the previous cells by clicking on this cell, going to Runtime, then clicking Run before.

In [0]:
data.groupBy('water_quality').count().orderBy('count',ascending= False).show()

+------------------+-----+
|     water_quality|count|
+------------------+-----+
|              soft|50818|
|             salty| 4856|
|           unknown| 1876|
|             milky|  804|
|          coloured|  490|
|   salty abandoned|  339|
|          fluoride|  200|
|fluoride abandoned|   17|
+------------------+-----+



In [0]:
data= data.drop('recordedBy')
data = data.drop('scheme_name')

In [0]:
data.groupBy('status_group').pivot('region').sum('amount_tsh').show()

+--------------------+---------+-------------+------+---------+------+--------+-----------+-------+--------+--------+-----+--------+-------+-------+---------+--------+---------+---------+--------+------+--------+
|        status_group|   Arusha|Dar es Salaam|Dodoma|   Iringa|Kagera|  Kigoma|Kilimanjaro|  Lindi| Manyara|    Mara|Mbeya|Morogoro| Mtwara| Mwanza|    Pwani|   Rukwa|   Ruvuma|Shinyanga| Singida|Tabora|   Tanga|
+--------------------+---------+-------------+------+---------+------+--------+-----------+-------+--------+--------+-----+--------+-------+-------+---------+--------+---------+---------+--------+------+--------+
|functional needs ...|  10280.0|          0.0|   0.0| 118400.0|   0.0|270770.0|   129248.0| 3625.0| 71315.0|   900.0|  0.0| 70005.0|36240.0|10000.0|    520.0|292980.0| 125650.0|   2200.0|  6505.0|   0.0|  4310.0|
|          functional|1125485.0|     181175.0|   0.0|5515710.0|   0.0|530725.0|  746942.25|24560.0|312745.0| 96786.0|  0.0|587642.0|52710.0|33550.0|

Task 5 - Convert categories with low frequency to Others, impute missing values. If you are disconnected, please run the previous cells by clicking on this cell, going to Runtime, then clicking Run before.

In [0]:
str_cols = [item[0] for item in data.dtypes if item[1].startswith('string')]
for column in str_cols:
  print(data.groupBy(column).count().orderBy('count',ascending=False).show())
  values_cat = data.groupBy(column).count().collect()
  less_than = [x[0] for x in values_cat if x[1] <1000 ]
  data = data.withColumn(column, when(col(column).isin(less_than),'others').otherwise(col(column)))
  data.groupBy(column).count().orderBy('count',ascending = False).show()

+-------------+-----+
|date_recorded|count|
+-------------+-----+
|   2011-03-15|  572|
|   2011-03-17|  558|
|   2013-02-03|  546|
|   2011-03-14|  520|
|   2011-03-16|  513|
|   2011-03-18|  497|
|   2011-03-19|  466|
|   2013-02-04|  464|
|   2013-01-29|  459|
|   2011-03-04|  458|
|   2013-02-14|  444|
|   2013-01-24|  435|
|   2011-03-05|  434|
|   2013-02-15|  429|
|   2013-03-15|  428|
|   2011-03-11|  426|
|   2013-01-30|  421|
|   2013-02-16|  418|
|   2011-03-23|  417|
|   2011-03-09|  416|
+-------------+-----+
only showing top 20 rows

None
+-------------+-----+
|date_recorded|count|
+-------------+-----+
|       others|59400|
+-------------+-----+

+--------------------+-----+
|              funder|count|
+--------------------+-----+
|Government Of Tan...| 9084|
|                null| 3635|
|              Danida| 3114|
|              Hesawa| 2202|
|               Rwssp| 1374|
|          World Bank| 1349|
|                Kkkt| 1287|
|        World Vision| 1246|
|          

In [0]:
data.groupBy('population').count().orderBy('population').show()

+----------+-----+
|population|count|
+----------+-----+
|         0|21381|
|         1| 7025|
|         2|    4|
|         3|    4|
|         4|   13|
|         5|   44|
|         6|   19|
|         7|    3|
|         8|   23|
|         9|   11|
|        10|  163|
|        11|    7|
|        12|   43|
|        13|   12|
|        14|   18|
|        15|  193|
|        16|    8|
|        17|    6|
|        18|   18|
|        19|    3|
+----------+-----+
only showing top 20 rows



In [0]:
from pyspark.sql.window import Window
data.withColumn('population', when(col('population')<2,lit(None)).otherwise(col('population')))
w = Window.partitionBy(data['district_code'])
data = data.withColumn('population',when(col('population').isNull(),avg(data['population']).over(w)).otherwise(col('population')))

Task 6 - Make visualizations. If you are disconnected, please run the previous cells by clicking on this cell, going to Runtime, then clicking Run before.

In [0]:
color_status = {'functional': 'green', 'non functional': 'red', 'functional needs repair': 'blue'}