# Data Cleaning

In [1]:
%matplotlib inline

# Filter warnings
import warnings
warnings.filterwarnings("ignore")

# Data manipulation
import pandas as pd
import numpy as np

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Set font scale and style
plt.rcParams.update({'font.size': 18})

# Pyspark modules
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import sql, SparkContext, SparkConf
from pyspark.sql.functions import *

In [2]:
# Create spark session
spark = SparkSession.builder.appName("project").getOrCreate()

# Load data 

In [3]:
# load data
df = spark.read.csv('../data/emdat_public_raw.csv', inferSchema = True, header = True)

df = df.toDF(*[c.lower() for c in df.columns]) # column names in lower case

print("Data size:", (df.count(), len(df.columns))) # print data size

cols = ['seq','year', 'continent', 'disaster type', 'total deaths', 'no injured','no affected','no homeless', 'total affected'
]

df.select(cols).show()

Data size: (15901, 43)
+----+----+---------+-------------------+------------+----------+-----------+-----------+--------------+
| seq|year|continent|      disaster type|total deaths|no injured|no affected|no homeless|total affected|
+----+----+---------+-------------------+------------+----------+-----------+-----------+--------------+
|9002|1900|   Africa|            Drought|       11000|      null|       null|       null|          null|
|9001|1900|     Asia|            Drought|     1250000|      null|       null|       null|          null|
|  12|1902| Americas|         Earthquake|        2000|      null|       null|       null|          null|
|   3|1902| Americas|  Volcanic activity|        1000|      null|       null|       null|          null|
|  10|1902| Americas|  Volcanic activity|        6000|      null|       null|       null|          null|
|   6|1903| Americas|Mass movement (dry)|          76|        23|       null|       null|            23|
|  12|1903|   Africa|  Volcanic 

In [4]:
df.printSchema()

root
 |-- dis no: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- seq: integer (nullable = true)
 |-- disaster group: string (nullable = true)
 |-- disaster subgroup: string (nullable = true)
 |-- disaster type: string (nullable = true)
 |-- disaster subtype: string (nullable = true)
 |-- disaster subsubtype: string (nullable = true)
 |-- event name: string (nullable = true)
 |-- entry criteria: string (nullable = true)
 |-- country: string (nullable = true)
 |-- iso: string (nullable = true)
 |-- region: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- associated dis: string (nullable = true)
 |-- associated dis2: string (nullable = true)
 |-- ofda response: string (nullable = true)
 |-- appeal: string (nullable = true)
 |-- declaration: string (nullable = true)
 |-- aid contribution: integer (nullable = true)
 |-- dis mag value: integer (nullable = true)
 |-- dis mag

# Select relevant columns for analysis

In [5]:
# Select columns
cols = ['iso','country', 'region', 'continent',  'year',  'disaster type','latitude', 'longitude','local time',  'start month','total deaths', 'no injured', 'no affected', 'no homeless', 'total affected','total damages (us$)', 'cpi'
]
df_sel = df.select(cols)

# Add underscores between column names
df_sel = df_sel.toDF(*[c.replace(' ', '_') for c in df_sel.columns]) 

# create table for sql query
df_sel.createOrReplaceTempView("raw_table") 

# Show data
cols =['country', 'year', 'continent', 'region', 'disaster_type', 'start_month']
df_sel.select(cols).show()

+-------------+----+---------+----------------+-------------------+-----------+
|      country|year|continent|          region|      disaster_type|start_month|
+-------------+----+---------+----------------+-------------------+-----------+
|   Cabo Verde|1900|   Africa|  Western Africa|            Drought|       null|
|        India|1900|     Asia|   Southern Asia|            Drought|       null|
|    Guatemala|1902| Americas| Central America|         Earthquake|          4|
|    Guatemala|1902| Americas| Central America|  Volcanic activity|          4|
|    Guatemala|1902| Americas| Central America|  Volcanic activity|         10|
|       Canada|1903| Americas|Northern America|Mass movement (dry)|          4|
|Comoros (the)|1903|   Africa|  Eastern Africa|  Volcanic activity|       null|
|   Bangladesh|1904|     Asia|   Southern Asia|              Storm|         11|
|       Canada|1905| Americas|Northern America|Mass movement (dry)|          8|
|        India|1905|     Asia|   Souther

# Data types and missing values
Some variables have missing values.  Although they will not affect exploratory data analysis, we will impute them during modeling.

In [6]:
df_missing = df_sel.agg(*[count(c).alias(c) for c in df_sel.columns])

cols = ['country', 'year', 'total_deaths','disaster_type', 'no_injured','no_affected','no_homeless', 'total_affected'
]
df_missing.select(cols).show()

+-------+-----+------------+-------------+----------+-----------+-----------+--------------+
|country| year|total_deaths|disaster_type|no_injured|no_affected|no_homeless|total_affected|
+-------+-----+------------+-------------+----------+-----------+-----------+--------------+
|  15901|15901|       11281|        15901|      3832|       9044|       2414|         11421|
+-------+-----+------------+-------------+----------+-----------+-----------+--------------+



# Categorical attributes

## 1. Time

In [7]:
df_sel.groupBy('local_time')\
    .count()\
    .sort(desc("count"))\
    .show()

+----------+-----+
|local_time|count|
+----------+-----+
|      null|14804|
|     21:48|    6|
|     20:38|    5|
|     15:00|    5|
|     16:30|    5|
|     05:15|    5|
|     12:55|    4|
|     15:30|    4|
|     10:26|    4|
|     20:28|    4|
|     13:35|    4|
|     12:19|    4|
|     04:47|    4|
|     06:42|    4|
|     20:06|    4|
|     03:00|    3|
|     12:15|    3|
|     13:39|    3|
|     20:18|    3|
|     16:40|    3|
+----------+-----+
only showing top 20 rows



In [8]:
# Extract hour from time
df_sel = df_sel.withColumn('local_hour', split(col('local_time'), ":").getItem(0))
df_sel = df_sel.withColumn('local_hour', split(col('local_hour'), "-").getItem(0))
df_sel = df_sel.withColumn('local_hour', floor(col('local_hour'))) # get nearest integers

In [9]:
df_sel.groupBy('local_hour')\
    .count()\
    .sort(asc("count"))\
    .show()

+----------+-----+
|local_hour|count|
+----------+-----+
|        95|    1|
|        88|    1|
|        39|    1|
|        22|   32|
|        23|   37|
|         7|   37|
|         0|   37|
|        14|   39|
|        15|   40|
|         9|   40|
|        19|   41|
|        10|   41|
|        11|   41|
|         8|   43|
|        17|   44|
|        16|   44|
|        18|   45|
|        13|   47|
|         2|   47|
|        21|   48|
+----------+-----+
only showing top 20 rows



In [10]:
@pandas_udf("long")

def replace_hour(x: pd.Series) -> pd.Series:
    """ 
    pandas udf for replacing values with other values
    """
    param_dict = {39:3, 88:8, 95:5}
    return x.replace(param_dict)

# Replace inconsistent hours
df_sel = df_sel.withColumn('local_hour', replace_hour(df_sel['local_hour']))

## 2. Month

In [11]:
df_sel.groupBy('start_month')\
    .count()\
    .sort(desc("count"))\
    .show()

+-----------+-----+
|start_month|count|
+-----------+-----+
|          1| 1759|
|          8| 1630|
|          7| 1599|
|          9| 1441|
|         10| 1283|
|          6| 1279|
|          5| 1200|
|         12| 1123|
|          4| 1098|
|         11| 1052|
|          2| 1029|
|          3| 1023|
|       null|  385|
+-----------+-----+



## 3. Country, Region, and Continent

In [12]:
df_sel.groupBy('country')\
    .count()\
    .sort(desc("count"))\
    .show()

+--------------------+-----+
|             country|count|
+--------------------+-----+
|United States of ...| 1064|
|               China|  971|
|               India|  740|
|   Philippines (the)|  663|
|           Indonesia|  568|
|               Japan|  372|
|          Bangladesh|  354|
|              Mexico|  285|
|              Brazil|  248|
|Iran (Islamic Rep...|  248|
|           Australia|  247|
|            Viet Nam|  245|
|            Pakistan|  230|
|              Turkey|  203|
|         Afghanistan|  203|
|            Colombia|  203|
|                Peru|  200|
|              France|  181|
|               Italy|  171|
|Russian Federatio...|  168|
+--------------------+-----+
only showing top 20 rows



In [13]:
df_sel.groupBy('region')\
    .count()\
    .sort(desc("count"))\
    .show()

+--------------------+-----+
|              region|count|
+--------------------+-----+
|       Southern Asia| 2037|
|  South-Eastern Asia| 1920|
|        Eastern Asia| 1825|
|       South America| 1268|
|    Northern America| 1211|
|      Eastern Africa| 1145|
|     Central America|  808|
|      Western Africa|  794|
|     Southern Europe|  642|
|           Caribbean|  620|
|      Eastern Europe|  534|
|      Western Europe|  517|
|        Western Asia|  491|
|       Middle Africa|  423|
|     Northern Africa|  339|
|Australia and New...|  322|
|           Melanesia|  256|
|     Northern Europe|  209|
|     Southern Africa|  207|
|        Central Asia|  138|
+--------------------+-----+
only showing top 20 rows



In [14]:
df_sel.groupBy('continent')\
    .count()\
    .sort(desc("count"))\
    .show()

+---------+-----+
|continent|count|
+---------+-----+
|     Asia| 6411|
| Americas| 3907|
|   Africa| 2908|
|   Europe| 1962|
|  Oceania|  713|
+---------+-----+



## 4. Disaster type

In [15]:
df_sel.groupBy('disaster_type')\
    .count()\
    .sort(desc("count"))\
    .show()

+--------------------+-----+
|       disaster_type|count|
+--------------------+-----+
|               Flood| 5447|
|               Storm| 4434|
|          Earthquake| 1534|
|            Epidemic| 1496|
|           Landslide|  768|
|             Drought|  758|
| Extreme temperature|  601|
|            Wildfire|  456|
|   Volcanic activity|  259|
|  Insect infestation|   96|
| Mass movement (dry)|   48|
|              Impact|    1|
|     Animal accident|    1|
|Glacial lake outb...|    1|
|                 Fog|    1|
+--------------------+-----+



# Add year in decade attribute


In [16]:
df_sel = df_sel.withColumn('year_in_decade', (floor(col('year')/10))*10)

# Incorrect data entries in longitude and latitude

In [17]:
query = """
select 
    longitude, 
    latitude 
from raw_table
where longitude like '%W%' 
or longitude like '%E%'
or longitude like '%N%'
or longitude like '%S%'
or latitude like '%W%' 
or latitude like '%E%'
or latitude like '%N%'
or latitude like '%S%'
"""
spark.sql(query).show()

+---------+--------+
|longitude|latitude|
+---------+--------+
|  78.46 W|  1.51 N|
|  31.15 E| 30.03 N|
|  58.00 W| 48.60 N|
|  71.40 W| 35.28 S|
|  23.44 E| 38.00 N|
| 121.70 E|  8.30 S|
| 104.06 E| 30.37 N|
|  78.30 W|  0.14 S|
|  26.76 E| 45.77 N|
| 119.41 W| 34.25 N|
|  27.10 E| 38.25 N|
|  32.56 E|  2.31 S|
|  56.57 E| 27.69 N|
|   7.25 E| 43.70 N|
|  87.72 W| 14.53 N|
|  71.40 E| 34.01 N|
|   5.31 E| 50.37 N|
|  70.28 E| 34.26 N|
|  25.57 E| 43.12 N|
|  62.42 W|  3.58 S|
+---------+--------+
only showing top 20 rows



In [18]:
# Remove all the strings from numbers
df_sel = df_sel.withColumn('longitude', regexp_replace('longitude', '[EW]', ''))
df_sel = df_sel.withColumn('latitude', regexp_replace('latitude', '[NS]', ''))

# create table for sql query
df_sel.createOrReplaceTempView("raw_table2") 

In [19]:
# Cross-check if strings were removed
query = """
select 
    longitude, 
    latitude 
from raw_table2
where longitude like '%W%' 
or longitude like '%E%'
or longitude like '%N%'
or longitude like '%S%'
or latitude like '%W%' 
or latitude like '%E%'
or latitude like '%N%'
or latitude like '%S%'
"""
spark.sql(query).show()

+---------+--------+
|longitude|latitude|
+---------+--------+
+---------+--------+



# Add number of occurrence per year

In [20]:
df_final = spark.sql("""
select 
    t1.*, t2.no_occurrence
from raw_table2 as t1
left join (
    select year, count(*) as no_occurrence from raw_table2
group by 1
) as t2 on (t1.year = t2.year)
""")
df_final.select('year', 'year_in_decade', 'continent','no_occurrence').show()

+----+--------------+---------+-------------+
|year|year_in_decade|continent|no_occurrence|
+----+--------------+---------+-------------+
|1900|          1900|   Africa|            7|
|1900|          1900|     Asia|            7|
|1902|          1900| Americas|           10|
|1902|          1900| Americas|           10|
|1902|          1900| Americas|           10|
|1903|          1900| Americas|           12|
|1903|          1900|   Africa|           12|
|1904|          1900|     Asia|            4|
|1905|          1900| Americas|            8|
|1905|          1900|     Asia|            8|
|1906|          1900| Americas|           13|
|1906|          1900| Americas|           13|
|1906|          1900|   Europe|           13|
|1906|          1900|   Europe|           13|
|1906|          1900|     Asia|           13|
|1907|          1900|     Asia|            3|
|1907|          1900|     Asia|            3|
|1908|          1900| Americas|            3|
|1909|          1900|     Asia|   

# Convert to numerical variables to numeric

In [21]:
def df_to_numeric(df, dont_cols):
    """
    Convert numerical columns to double type
    """
    cols = [x for x in df.columns if x not in dont_cols]
    for col in cols:
        df = df.withColumn(col, df[col].cast(DoubleType()))
    return df

# Cast columns not in dont_cols to float
dont_cols = ['iso', 'country', 'region', 'continent', 'disaster_type', 
            'year', 'year_in_decade', 'local_time', 'start_month']
df_final = df_to_numeric(df_final, dont_cols)

In [22]:
df_final.printSchema()

root
 |-- iso: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- disaster_type: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- local_time: string (nullable = true)
 |-- start_month: integer (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- no_injured: double (nullable = true)
 |-- no_affected: double (nullable = true)
 |-- no_homeless: double (nullable = true)
 |-- total_affected: double (nullable = true)
 |-- total_damages_(us$): double (nullable = true)
 |-- cpi: double (nullable = true)
 |-- local_hour: double (nullable = true)
 |-- year_in_decade: long (nullable = true)
 |-- no_occurrence: double (nullable = true)



# Save cleaned data as csv

In [23]:
df_final.coalesce(1)\
      .write.format('spark.csv')\
      .option("header","true")\
      .mode("overwrite")\
      .csv("../data/emdat_public_cleaned.csv")