# Criação da sessão do PySpark

In [66]:
#iniciar spark
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pandas as pd

conf = SparkConf()
#conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.2')
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.2,com.microsoft.azure:spark-mssql-connector_2.12:1.2.0')
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.InstanceProfileCredentialsProvider')

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Lendo arquivo CSV vindo de um bucket da S3

In [67]:
df_cholera = spark.read.option('delimiter', ',') \
    .option('header', 'true') \
    .csv('s3a://andre-sprint03-sptech-bucket-bruto/cholera-cases.csv')

# Tratamento no nome das colunas

In [68]:
df_cholera.columns

['Country',
 'Year',
 'Number of reported cases of cholera',
 'Number of reported deaths from cholera',
 'Cholera case fatality rate',
 'WHO Region']

In [69]:
from pyspark.sql.functions import col

df_cholera = df_cholera.select(
    col("Country").alias("country"), 
    col("Year").alias("year"), 
    col("Number of reported cases of cholera").alias("cases_cholera"),
    col("Number of reported deaths from cholera").alias("deaths_cholera"),
    col("Cholera case fatality rate").alias("fatality_rate"),
    col("WHO Region").alias("region"))
df_cholera.columns

['country',
 'year',
 'cases_cholera',
 'deaths_cholera',
 'fatality_rate',
 'region']

# Leitura de um XML

In [70]:
sanitation_water_xml = pd.DataFrame(pd.read_xml('sanitation-water-global.xml'))
sanitation_water_xml

Unnamed: 0,index,Year,Region,Country_code,Country,Population_using_at_least_basic_drinking_water_services_Rural,Population_using_at_least_basic_drinking_water_services_Total,Population_using_at_least_basic_drinking_water_services_Urban,Population_using_safely_managed_drinking_water_services_Rural,Population_using_safely_managed_drinking_water_services_Total,...,Population_using_at_least_basic_sanitation_services_Urban,Population_using_safely_managed_sanitation_services_Rural,Population_using_safely_managed_sanitation_services_Total,Population_using_safely_managed_sanitation_services_Urban,Population_with_basic_handwashing_facilities_at_home_Rural,Population_with_basic_handwashing_facilities_at_home_Total,Population_with_basic_handwashing_facilities_at_home_Urban,Population_practising_open_defecation_Rural,Population_practising_open_defecation_Total,Population_practising_open_defecation_Urban
0,0,2000,Africa,AGO,Angola,21.15264,41.14431,61.06653,,,...,47.51445,,,,,,,66.71371,42.86652,19.10217
1,1,2000,Africa,BDI,Burundi,47.84253,50.66312,82.04812,,,...,40.55778,,,,,,,2.82610,2.75996,2.02413
2,2,2000,Africa,BEN,Benin,52.03753,61.45970,76.61732,,,...,19.02374,,,,0.00000,1.02804,2.68187,86.00783,67.60516,38.00048
3,3,2000,Africa,BFA,Burkina Faso,50.78503,54.91951,73.95518,,,...,50.31496,,,,2.09808,5.88390,23.31431,84.91952,71.39430,9.12255
4,4,2000,Africa,BWA,Botswana,53.57892,75.19897,94.20359,,,...,68.77863,,,,,,,42.15451,21.71464,3.74742
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3458,3458,2001,Europe,POL,Poland,,,,,,...,94.28275,,79.97399,,,,,0.00000,0.00000,0.00000
3459,3459,2001,Western Pacific,KOR,Republic of Korea,,,,,,...,,,86.68383,,,,,,0.00000,
3460,3460,2002,Europe,POL,Poland,,,,,,...,94.54662,,80.64503,,,,,0.00000,0.00000,0.00000
3461,3461,2003,Europe,POL,Poland,,,,,,...,94.81047,,81.29703,,,,,0.00000,0.00000,0.00000


In [71]:
df_sanitation_water = spark.createDataFrame(sanitation_water_xml)

In [72]:
df_sanitation_water.columns

['index',
 'Year',
 'Region',
 'Country_code',
 'Country',
 'Population_using_at_least_basic_drinking_water_services_Rural',
 'Population_using_at_least_basic_drinking_water_services_Total',
 'Population_using_at_least_basic_drinking_water_services_Urban',
 'Population_using_safely_managed_drinking_water_services_Rural',
 'Population_using_safely_managed_drinking_water_services_Total',
 'Population_using_safely_managed_drinking_water_services_Urban',
 'Population_using_safely_managed_drinking_water_services_Urban.1',
 'Population_using_at_least_basic_sanitation_services_Total',
 'Population_using_at_least_basic_sanitation_services_Urban',
 'Population_using_safely_managed_sanitation_services_Rural',
 'Population_using_safely_managed_sanitation_services_Total',
 'Population_using_safely_managed_sanitation_services_Urban',
 'Population_with_basic_handwashing_facilities_at_home_Rural',
 'Population_with_basic_handwashing_facilities_at_home_Total',
 'Population_with_basic_handwashing_facil

In [73]:
df_cholera.show()

+-----------+----+-------------+--------------+-------------+--------------------+
|    country|year|cases_cholera|deaths_cholera|fatality_rate|              region|
+-----------+----+-------------+--------------+-------------+--------------------+
|Afghanistan|2016|          677|             5|          0.7|Eastern Mediterra...|
|Afghanistan|2015|        58064|             8|         0.01|Eastern Mediterra...|
|Afghanistan|2014|        45481|             4|          0.0|Eastern Mediterra...|
|Afghanistan|2013|         3957|            14|         0.35|Eastern Mediterra...|
|Afghanistan|2012|           12|             0|          0.1|Eastern Mediterra...|
|Afghanistan|2011|         3733|            44|         1.18|Eastern Mediterra...|
|Afghanistan|2010|         2369|            10|         0.42|Eastern Mediterra...|
|Afghanistan|2009|          662|            11|         1.66|Eastern Mediterra...|
|Afghanistan|2008|         4384|            22|          0.5|Eastern Mediterra...|
|Afg

# Tratando os datatypes do dataframe

In [74]:
df_cholera.printSchema()

root
 |-- country: string (nullable = true)
 |-- year: string (nullable = true)
 |-- cases_cholera: string (nullable = true)
 |-- deaths_cholera: string (nullable = true)
 |-- fatality_rate: string (nullable = true)
 |-- region: string (nullable = true)



In [75]:
df_sanitation_water.printSchema()

root
 |-- index: long (nullable = true)
 |-- Year: long (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country_code: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Population_using_at_least_basic_drinking_water_services_Rural: double (nullable = true)
 |-- Population_using_at_least_basic_drinking_water_services_Total: double (nullable = true)
 |-- Population_using_at_least_basic_drinking_water_services_Urban: double (nullable = true)
 |-- Population_using_safely_managed_drinking_water_services_Rural: double (nullable = true)
 |-- Population_using_safely_managed_drinking_water_services_Total: double (nullable = true)
 |-- Population_using_safely_managed_drinking_water_services_Urban: double (nullable = true)
 |-- Population_using_safely_managed_drinking_water_services_Urban.1: double (nullable = true)
 |-- Population_using_at_least_basic_sanitation_services_Total: double (nullable = true)
 |-- Population_using_at_least_basic_sanitation_services_Urban:

In [76]:
from pyspark.sql.types import IntegerType, DecimalType
from pyspark.sql.functions import col

#Tranformando os datatypes do dataframe de colera

df_cholera = df_cholera.select(
    col('country').alias('COUNTRY'),
    col('year').cast(IntegerType()).alias('YEAR'),
    col('cases_cholera').cast(IntegerType()).alias('CASES_CHOLERA'),
    col('deaths_cholera').cast(IntegerType()).alias('DEATHS_CHOLERA'),
    col('fatality_rate').cast(DecimalType(18, 2)).alias('FATALITY_RATE'),
    col('region').alias('REGION')
)

In [77]:
df_cholera.printSchema()

root
 |-- COUNTRY: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- CASES_CHOLERA: integer (nullable = true)
 |-- DEATHS_CHOLERA: integer (nullable = true)
 |-- FATALITY_RATE: decimal(18,2) (nullable = true)
 |-- REGION: string (nullable = true)



In [78]:
from pyspark.sql.types import IntegerType, DecimalType, StringType, DoubleType
from pyspark.sql.functions import col

# Modificando o datatype do Dataframe de saneamneto de água
df_sanitation_water = df_sanitation_water.select(
    col('Year').cast(IntegerType()),
    col('Region').cast(StringType()),
    col('Country_code').cast(StringType()),
    col('Country').cast(StringType()),
    col('Population_using_at_least_basic_drinking_water_services_Rural').cast(DecimalType(18, 2)),
    col('Population_using_at_least_basic_drinking_water_services_Total').cast(DecimalType(18, 2)),
    col('Population_using_at_least_basic_drinking_water_services_Urban').cast(DecimalType(18, 2)),
    col('Population_using_safely_managed_drinking_water_services_Rural').cast(DecimalType(18, 2)),
    col('Population_using_safely_managed_drinking_water_services_Total').cast(DecimalType(18, 2)),
    col('Population_using_safely_managed_drinking_water_services_Urban').cast(DecimalType(18, 2)),
    col('Population_using_at_least_basic_sanitation_services_Total').cast(DecimalType(18, 2)),
    col('Population_using_at_least_basic_sanitation_services_Urban').cast(DecimalType(18, 2)),
    col('Population_using_safely_managed_sanitation_services_Rural').cast(DecimalType(18, 2)),
    col('Population_using_safely_managed_sanitation_services_Total').cast(DecimalType(18, 2)),
    col('Population_using_safely_managed_sanitation_services_Urban').cast(DecimalType(18, 2)),
    col('Population_with_basic_handwashing_facilities_at_home_Rural').cast(DecimalType(18, 2)),
    col('Population_with_basic_handwashing_facilities_at_home_Total').cast(DecimalType(18, 2)),
    col('Population_with_basic_handwashing_facilities_at_home_Urban').cast(DecimalType(18, 2)),
    col('Population_practising_open_defecation_Rural').cast(DecimalType(18, 2)),
    col('Population_practising_open_defecation_Total').cast(DecimalType(18, 2)),
    col('Population_practising_open_defecation_Urban').cast(DecimalType(18, 2))
)

In [79]:
# Dataframe antes de trabalhar os datatypes
df_sanitation_water.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country_code: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Population_using_at_least_basic_drinking_water_services_Rural: decimal(18,2) (nullable = true)
 |-- Population_using_at_least_basic_drinking_water_services_Total: decimal(18,2) (nullable = true)
 |-- Population_using_at_least_basic_drinking_water_services_Urban: decimal(18,2) (nullable = true)
 |-- Population_using_safely_managed_drinking_water_services_Rural: decimal(18,2) (nullable = true)
 |-- Population_using_safely_managed_drinking_water_services_Total: decimal(18,2) (nullable = true)
 |-- Population_using_safely_managed_drinking_water_services_Urban: decimal(18,2) (nullable = true)
 |-- Population_using_at_least_basic_sanitation_services_Total: decimal(18,2) (nullable = true)
 |-- Population_using_at_least_basic_sanitation_services_Urban: decimal(18,2) (nullable = true)
 |-- Population_using_safely_managed_sanitati

# Verificando se dataframe possuí dados NaN ou Null

In [80]:
df_sanitation_water.show()

+----+------+------------+--------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------------+----------------------------------------------------------+----------------------------------------------------------+-------------------------------------------+-------------------------------------------+--------------------------

In [81]:
df_sanitation_water.describe('Population_using_at_least_basic_drinking_water_services_Rural').show()
df_sanitation_water.describe('Population_using_at_least_basic_drinking_water_services_Total').show()
df_sanitation_water.describe('Population_using_at_least_basic_drinking_water_services_Urban').show()

df_sanitation_water.describe('Population_using_safely_managed_drinking_water_services_Rural').show()
df_sanitation_water.describe('Population_using_safely_managed_drinking_water_services_Total').show()
df_sanitation_water.describe('Population_using_safely_managed_drinking_water_services_Urban').show()

df_sanitation_water.describe('Population_using_at_least_basic_sanitation_services_Total').show()
df_sanitation_water.describe('Population_using_at_least_basic_sanitation_services_Urban').show()

df_sanitation_water.describe('Population_using_safely_managed_sanitation_services_Rural').show()
df_sanitation_water.describe('Population_using_safely_managed_sanitation_services_Total').show()
df_sanitation_water.describe('Population_using_safely_managed_sanitation_services_Urban').show()

df_sanitation_water.describe('Population_with_basic_handwashing_facilities_at_home_Rural').show()
df_sanitation_water.describe('Population_with_basic_handwashing_facilities_at_home_Total').show()
df_sanitation_water.describe('Population_with_basic_handwashing_facilities_at_home_Urban').show()

df_sanitation_water.describe('Population_practising_open_defecation_Rural').show()
df_sanitation_water.describe('Population_practising_open_defecation_Total').show()
df_sanitation_water.describe('Population_practising_open_defecation_Urban').show()

+-------+-------------------------------------------------------------+
|summary|Population_using_at_least_basic_drinking_water_services_Rural|
+-------+-------------------------------------------------------------+
|  count|                                                         2953|
|   mean|                                                    74.984047|
| stddev|                                            24.67612595035673|
|    min|                                                         4.08|
|    max|                                                       100.00|
+-------+-------------------------------------------------------------+

+-------+-------------------------------------------------------------+
|summary|Population_using_at_least_basic_drinking_water_services_Total|
+-------+-------------------------------------------------------------+
|  count|                                                         3449|
|   mean|                                                    84

In [82]:
# Inserindo valores 0 aonde existe pois o valor minimo em todas as colunas decimais é 0
df_sanitation_water = df_sanitation_water.na.fill(0)

In [83]:
# Dataframe após substituir os valores
df_sanitation_water.show()

+----+------+------------+--------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------------+----------------------------------------------------------+----------------------------------------------------------+-------------------------------------------+-------------------------------------------+--------------------------

In [84]:
df_cholera.show()

+-----------+----+-------------+--------------+-------------+--------------------+
|    COUNTRY|YEAR|CASES_CHOLERA|DEATHS_CHOLERA|FATALITY_RATE|              REGION|
+-----------+----+-------------+--------------+-------------+--------------------+
|Afghanistan|2016|          677|             5|         0.70|Eastern Mediterra...|
|Afghanistan|2015|        58064|             8|         0.01|Eastern Mediterra...|
|Afghanistan|2014|        45481|             4|         0.00|Eastern Mediterra...|
|Afghanistan|2013|         3957|            14|         0.35|Eastern Mediterra...|
|Afghanistan|2012|           12|             0|         0.10|Eastern Mediterra...|
|Afghanistan|2011|         3733|            44|         1.18|Eastern Mediterra...|
|Afghanistan|2010|         2369|            10|         0.42|Eastern Mediterra...|
|Afghanistan|2009|          662|            11|         1.66|Eastern Mediterra...|
|Afghanistan|2008|         4384|            22|         0.50|Eastern Mediterra...|
|Afg

In [85]:
df_cholera.describe('CASES_CHOLERA').show()
df_cholera.describe('DEATHS_CHOLERA').show()
df_cholera.describe('FATALITY_RATE').show()

+-------+------------------+
|summary|     CASES_CHOLERA|
+-------+------------------+
|  count|              2469|
|   mean| 3718.379100850547|
| stddev|14904.906044893929|
|    min|                 0|
|    max|            340311|
+-------+------------------+

+-------+------------------+
|summary|    DEATHS_CHOLERA|
+-------+------------------+
|  count|              2373|
|   mean|378.08849557522126|
| stddev|3570.2829791470526|
|    min|                 0|
|    max|            124227|
+-------+------------------+

+-------+------------------+
|summary|     FATALITY_RATE|
+-------+------------------+
|  count|              2363|
|   mean|          5.758028|
| stddev|15.566521726111958|
|    min|              0.00|
|    max|            450.00|
+-------+------------------+



In [86]:
# Substituindo os valores nulls por 0 pois a mínima dessas colunas é 0
df_cholera = df_cholera.na.fill(0)

In [87]:
df_cholera.show()

+-----------+----+-------------+--------------+-------------+--------------------+
|    COUNTRY|YEAR|CASES_CHOLERA|DEATHS_CHOLERA|FATALITY_RATE|              REGION|
+-----------+----+-------------+--------------+-------------+--------------------+
|Afghanistan|2016|          677|             5|         0.70|Eastern Mediterra...|
|Afghanistan|2015|        58064|             8|         0.01|Eastern Mediterra...|
|Afghanistan|2014|        45481|             4|         0.00|Eastern Mediterra...|
|Afghanistan|2013|         3957|            14|         0.35|Eastern Mediterra...|
|Afghanistan|2012|           12|             0|         0.10|Eastern Mediterra...|
|Afghanistan|2011|         3733|            44|         1.18|Eastern Mediterra...|
|Afghanistan|2010|         2369|            10|         0.42|Eastern Mediterra...|
|Afghanistan|2009|          662|            11|         1.66|Eastern Mediterra...|
|Afghanistan|2008|         4384|            22|         0.50|Eastern Mediterra...|
|Afg

# Fazendo upload dos datasets no S3

In [88]:
df_cholera.write \
   .option("header", "true") \
   .mode('overwrite') \
   .csv('s3a://andre-sprint03-sptech-bucket-tratados/cases_of_cholera')

22/06/12 15:33:12 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
22/06/12 15:33:12 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [89]:
df_sanitation_water.write \
   .option("header", "true") \
   .mode('overwrite') \
   .csv('s3a://andre-sprint03-sptech-bucket-tratados/sanitation_water')

22/06/12 15:33:15 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
22/06/12 15:33:15 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
22/06/12 15:33:15 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
----------------------------------------                                        
Exception happened during processing of request from ('127.0.0.1', 34328)
Traceback (most recent call last):
  File "/usr/lib64/python3.7/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.7/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.7/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, se