## Importación de librerías necesarias

(En caso de no haberlo hecho, estas de deben instalar previamente)

In [3]:
from pyspark.sql import SparkSession
import os
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import isnan, when, count, col, lit, trim, avg, ceil
from pyspark.sql.types import StringType
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

## Configuración del entorno

Si este bloque de código genera error, se debe modificar el primer parámetro (JAVA_HOME) ajustandolo a la versión instalada.

In [4]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark/spark-3.2.1-bin-hadoop3.2"

## Descaraga de datasets desde AWS

In [5]:
!wget https://s3.amazonaws.com/drivendata/data/7/public/4910797b-ee55-40a7-8668-10efd5c1b960.csv -O features.csv
!wget https://s3.amazonaws.com/drivendata/data/7/public/0bf8bc6e-30d0-4c50-956a-603fc693d966.csv -O labels.csv

--2022-05-30 08:06:40--  https://s3.amazonaws.com/drivendata/data/7/public/4910797b-ee55-40a7-8668-10efd5c1b960.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.163.40
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.163.40|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20069199 (19M) [text/csv]
Saving to: ‘features.csv’


2022-05-30 08:06:45 (4.78 MB/s) - ‘features.csv’ saved [20069199/20069199]

--2022-05-30 08:06:45--  https://s3.amazonaws.com/drivendata/data/7/public/0bf8bc6e-30d0-4c50-956a-603fc693d966.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.172.232
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.172.232|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1148327 (1.1M) [text/csv]
Saving to: ‘labels.csv’


2022-05-30 08:06:46 (2.90 MB/s) - ‘labels.csv’ saved [1148327/1148327]



## Definición del contexto de Spark utilizando todos los núcleos disponibles

In [6]:
sc = SparkSession.builder.master("local[*]").getOrCreate()

22/05/30 08:07:39 WARN Utils: Your hostname, DESKTOP-L28I83P resolves to a loopback address: 127.0.1.1; using 172.22.69.52 instead (on interface eth0)
22/05/30 08:07:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/30 08:07:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Creación de dataframes a partir de la data descargada

In [7]:
feature = sc.read.csv("features.csv", inferSchema=True,header=True)
label = sc.read.csv("labels.csv", inferSchema=True,header=True)

                                                                                

## Revisión de filas y columnas

In [8]:
print(feature.count())
print(label.count())
print(feature.columns)
print(label.columns)

59400
59400
['id', 'amount_tsh', 'date_recorded', 'funder', 'gps_height', 'installer', 'longitude', 'latitude', 'wpt_name', 'num_private', 'basin', 'subvillage', 'region', 'region_code', 'district_code', 'lga', 'ward', 'population', 'public_meeting', 'recorded_by', 'scheme_management', 'scheme_name', 'permit', 'construction_year', 'extraction_type', 'extraction_type_group', 'extraction_type_class', 'management', 'management_group', 'payment', 'payment_type', 'water_quality', 'quality_group', 'quantity', 'quantity_group', 'source', 'source_type', 'source_class', 'waterpoint_type', 'waterpoint_type_group']
['id', 'status_group']


## Cruce de las dos tablas

In [9]:
data = feature.join(label, on=("id"))
print(data.count())
print(data.columns)

59400
['id', 'amount_tsh', 'date_recorded', 'funder', 'gps_height', 'installer', 'longitude', 'latitude', 'wpt_name', 'num_private', 'basin', 'subvillage', 'region', 'region_code', 'district_code', 'lga', 'ward', 'population', 'public_meeting', 'recorded_by', 'scheme_management', 'scheme_name', 'permit', 'construction_year', 'extraction_type', 'extraction_type_group', 'extraction_type_class', 'management', 'management_group', 'payment', 'payment_type', 'water_quality', 'quality_group', 'quantity', 'quantity_group', 'source', 'source_type', 'source_class', 'waterpoint_type', 'waterpoint_type_group', 'status_group']


## Impresión del schema de la tabla y 10 primeros registros

In [11]:
print(data.printSchema())
display(data.limit(10).toPandas())

root
 |-- id: integer (nullable = true)
 |-- amount_tsh: double (nullable = true)
 |-- date_recorded: string (nullable = true)
 |-- funder: string (nullable = true)
 |-- gps_height: integer (nullable = true)
 |-- installer: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- wpt_name: string (nullable = true)
 |-- num_private: integer (nullable = true)
 |-- basin: string (nullable = true)
 |-- subvillage: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- district_code: integer (nullable = true)
 |-- lga: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- public_meeting: boolean (nullable = true)
 |-- recorded_by: string (nullable = true)
 |-- scheme_management: string (nullable = true)
 |-- scheme_name: string (nullable = true)
 |-- permit: boolean (nullable = true)
 |-- construction_year: integer (nullable = 

Unnamed: 0,id,amount_tsh,date_recorded,funder,gps_height,installer,longitude,latitude,wpt_name,num_private,...,water_quality,quality_group,quantity,quantity_group,source,source_type,source_class,waterpoint_type,waterpoint_type_group,status_group
0,69572,6000.0,2011-03-14,Roman,1390,Roman,34.938093,-9.856322,none,0,...,soft,good,enough,enough,spring,spring,groundwater,communal standpipe,communal standpipe,functional
1,8776,0.0,2013-03-06,Grumeti,1399,GRUMETI,34.698766,-2.147466,Zahanati,0,...,soft,good,insufficient,insufficient,rainwater harvesting,rainwater harvesting,surface,communal standpipe,communal standpipe,functional
2,34310,25.0,2013-02-25,Lottery Club,686,World vision,37.460664,-3.821329,Kwa Mahundi,0,...,soft,good,enough,enough,dam,dam,surface,communal standpipe multiple,communal standpipe,functional
3,67743,0.0,2013-01-28,Unicef,263,UNICEF,38.486161,-11.155298,Zahanati Ya Nanyumbu,0,...,soft,good,dry,dry,machine dbh,borehole,groundwater,communal standpipe multiple,communal standpipe,non functional
4,19728,0.0,2011-07-13,Action In A,0,Artisan,31.130847,-1.825359,Shuleni,0,...,soft,good,seasonal,seasonal,rainwater harvesting,rainwater harvesting,surface,communal standpipe,communal standpipe,functional
5,9944,20.0,2011-03-13,Mkinga Distric Coun,0,DWE,39.172796,-4.765587,Tajiri,0,...,salty,salty,enough,enough,other,other,unknown,communal standpipe multiple,communal standpipe,functional
6,19816,0.0,2012-10-01,Dwsp,0,DWSP,33.36241,-3.766365,Kwa Ngomho,0,...,soft,good,enough,enough,machine dbh,borehole,groundwater,hand pump,hand pump,non functional
7,54551,0.0,2012-10-09,Rwssp,0,DWE,32.620617,-4.226198,Tushirikiane,0,...,milky,milky,enough,enough,shallow well,shallow well,groundwater,hand pump,hand pump,non functional
8,53934,0.0,2012-11-03,Wateraid,0,Water Aid,32.7111,-5.146712,Kwa Ramadhan Musa,0,...,salty,salty,seasonal,seasonal,machine dbh,borehole,groundwater,hand pump,hand pump,non functional
9,46144,0.0,2011-08-03,Isingiro Ho,0,Artisan,30.626991,-1.257051,Kwapeto,0,...,soft,good,enough,enough,shallow well,shallow well,groundwater,hand pump,hand pump,functional


## Categorización de las variables `region_code` y `district_code` para su posterio utilización en modelos

In [12]:
data = data.withColumn('region_code', col('region_code').cast(StringType())).withColumn('district_code', col('district_code').cast(StringType()))

In [13]:
data

DataFrame[id: int, amount_tsh: double, date_recorded: string, funder: string, gps_height: int, installer: string, longitude: double, latitude: double, wpt_name: string, num_private: int, basin: string, subvillage: string, region: string, region_code: string, district_code: string, lga: string, ward: string, population: int, public_meeting: boolean, recorded_by: string, scheme_management: string, scheme_name: string, permit: boolean, construction_year: int, extraction_type: string, extraction_type_group: string, extraction_type_class: string, management: string, management_group: string, payment: string, payment_type: string, water_quality: string, quality_group: string, quantity: string, quantity_group: string, source: string, source_type: string, source_class: string, waterpoint_type: string, waterpoint_type_group: string, status_group: string]

## Verificación del cambio de tipo

In [15]:
data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- amount_tsh: double (nullable = true)
 |-- date_recorded: string (nullable = true)
 |-- funder: string (nullable = true)
 |-- gps_height: integer (nullable = true)
 |-- installer: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- wpt_name: string (nullable = true)
 |-- num_private: integer (nullable = true)
 |-- basin: string (nullable = true)
 |-- subvillage: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_code: string (nullable = true)
 |-- district_code: string (nullable = true)
 |-- lga: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- public_meeting: boolean (nullable = true)
 |-- recorded_by: string (nullable = true)
 |-- scheme_management: string (nullable = true)
 |-- scheme_name: string (nullable = true)
 |-- permit: boolean (nullable = true)
 |-- construction_year: integer (nullable = tr

## Eliminación de registros duplicados

In [16]:
data = data.dropDuplicates(['id'])
data.count()

59400

## Identificación de columnas de tipo String

In [17]:
str_col = [item[0] for item in data.dtypes if item[1].startswith('string')]
str_col

['date_recorded',
 'funder',
 'installer',
 'wpt_name',
 'basin',
 'subvillage',
 'region',
 'region_code',
 'district_code',
 'lga',
 'ward',
 'recorded_by',
 'scheme_management',
 'scheme_name',
 'extraction_type',
 'extraction_type_group',
 'extraction_type_class',
 'management',
 'management_group',
 'payment',
 'payment_type',
 'water_quality',
 'quality_group',
 'quantity',
 'quantity_group',
 'source',
 'source_type',
 'source_class',
 'waterpoint_type',
 'waterpoint_type_group',
 'status_group']

## Eliminación de espacios en blanco

In [18]:
for cols in str_col:
  data = data.withColumn(cols, trim(data[cols]))

display(data.limit(20).toPandas())

                                                                                

Unnamed: 0,id,amount_tsh,date_recorded,funder,gps_height,installer,longitude,latitude,wpt_name,num_private,...,water_quality,quality_group,quantity,quantity_group,source,source_type,source_class,waterpoint_type,waterpoint_type_group,status_group
0,26,0.0,2013-03-03,Tassaf,949,TASSAF,35.166289,-11.12649,Kwa Mzee Nassibu,0,...,soft,good,insufficient,insufficient,spring,spring,groundwater,communal standpipe,communal standpipe,functional
1,27,0.0,2013-02-23,Government Of Tanzania,964,RUVUMA BASIN,35.142487,-10.68131,Kwa Mzee Hesawa,0,...,soft,good,dry,dry,spring,spring,groundwater,communal standpipe,communal standpipe,non functional
2,28,0.0,2011-07-29,,0,,33.678669,-9.392807,Kilabuni,0,...,soft,good,enough,enough,spring,spring,groundwater,communal standpipe,communal standpipe,functional
3,31,1000.0,2013-02-11,Danida,576,DANIDA,37.236365,-11.00932,Ipanje Primary School,0,...,soft,good,enough,enough,river,river/lake,surface,communal standpipe,communal standpipe,non functional
4,34,0.0,2013-02-07,Bgm,0,BGM,32.685457,-3.880662,Kwa,0,...,soft,good,insufficient,insufficient,shallow well,shallow well,groundwater,other,other,non functional
5,44,50.0,2011-03-27,District Council,521,Handeni Trunk Main(,38.178901,-5.459401,Kwa Ramadhani Kasidi,5,...,soft,good,enough,enough,river,river/lake,surface,communal standpipe,communal standpipe,functional
6,65,500.0,2013-01-03,Tassaf,1228,Community,36.924137,-3.357655,Marko Akyo,0,...,soft,good,seasonal,seasonal,river,river/lake,surface,communal standpipe,communal standpipe,functional
7,76,0.0,2012-10-30,Grumeti,1394,GRUMETI,34.422111,-2.004985,Shule Ya Msingi Motukeri,0,...,soft,good,insufficient,insufficient,rainwater harvesting,rainwater harvesting,surface,communal standpipe,communal standpipe,functional
8,78,3000.0,2013-02-02,Danida,402,DANIDA,37.692945,-11.17178,Kwa Mzee Subati,0,...,salty,salty,insufficient,insufficient,machine dbh,borehole,groundwater,hand pump,hand pump,functional
9,81,0.0,2012-11-04,Hesawa,0,DWE,0.0,-2e-08,Kwarugeni,0,...,salty,salty,insufficient,insufficient,shallow well,shallow well,groundwater,other,other,non functional
