# TAREA 1

## Instalación de Spark

In [1]:
import pyspark
print(pyspark.__version__)

3.5.2


In [3]:
import subprocess
java_home = subprocess.check_output('echo %JAVA_HOME%', shell=True).decode().strip()
spark_home = subprocess.check_output('echo %SPARK_HOME%', shell=True).decode().strip()
print('JAVA_HOME:', java_home)
print('SPARK_HOME:', spark_home)

JAVA_HOME: C:\Program Files\Java\jdk-11
SPARK_HOME: C:\Spark


In [4]:
import os
import sys
import findspark
findspark.init()
findspark.find()
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import col, to_date, concat, lpad, lit
spark= SparkSession.builder.appName("Mi primera").getOrCreate()
spark

## Conjunto de Datos

Em-DATA contiene registros de desastres naturales alrededor del mundo, reuniendo información de distintas fuentes como la ONU, ONGs y reaseguradoras.
El conjunto de datos seleccionado trae la información referente al continente americano (2000 - actualidad) que el proyecto pone a disposición pública. 

Se incluye la ubicación del evento por coordenadas geográficas, se fecha de inicio y fin, una serie de clasificaciones, número de heridos, muertes y desplazados, así como el impacto económico a través del costo de reconstrucción e indemnizaciones por seguro.

Actualmente trabajo como analista de reservas de seguros de daños en LATAM. Los desastres naturales tienen gran impacto para la industria aseguradora al ser considerados eventos catastróficos. De ahí mi interés por trabajar con este conjunto de datos que además es libre.

In [28]:
df = spark.read.csv('disasters.csv', header=True, inferSchema=True)
#https://doc.emdat.be/docs/data-structure-and-content/emdat-public-table/

In [29]:
def clean_column_names(df):
    new_columns = [col.replace(" ", "_").lower().strip() for col in df.columns]
    df = df.toDF(*new_columns)
    return df

In [30]:
df = clean_column_names(df)

In [31]:
df.printSchema()

root
 |-- disno: string (nullable = true)
 |-- historic: string (nullable = true)
 |-- classification_key: string (nullable = true)
 |-- disaster_group: string (nullable = true)
 |-- disaster_subgroup: string (nullable = true)
 |-- disaster_type: string (nullable = true)
 |-- disaster_subtype: string (nullable = true)
 |-- external_ids: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- iso: string (nullable = true)
 |-- country: string (nullable = true)
 |-- subregion: string (nullable = true)
 |-- region: string (nullable = true)
 |-- location: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- associated_types: string (nullable = true)
 |-- ofda_bha_response: string (nullable = true)
 |-- appeal: string (nullable = true)
 |-- declaration: string (nullable = true)
 |-- aid_contribution: integer (nullable = true)
 |-- magnitude: double (nullable = true)
 |-- magnitude_scale: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lon

In [35]:
excl = ['disno','external_ids']
df.select([col for col in df.columns if col not in excl]).describe().show()

+-------+--------+------------------+--------------+-----------------+-------------+------------------+--------------------+----+--------------------+--------------------+--------+--------------------+--------------------+--------------------+-----------------+------+-----------+------------------+------------------+---------------+------------------+------------------+-----------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+--------------------+-----------------------------+------------------+-----------------------+-----------------+---------------------+----------------+--------------------+--------------------+-------------------+
|summary|historic|classification_key|disaster_group|disaster_subgroup|disaster_type|  disaster_subtype|          event_name| iso|             country|           subregion|  region|            locat

In [41]:
def create_date_column(df, year_col, month_col, day_col, new_col_name):
    df = df.withColumn(month_col, lpad(col(month_col), 2, "0")) \
           .withColumn(day_col, lpad(col(day_col), 2, "0"))
    
    return df.withColumn(new_col_name, to_date(concat(col(year_col), 
                                                        lit("-"), 
                                                        col(month_col), 
                                                        lit("-"), 
                                                        col(day_col)), 
                                                "yyyy-MM-dd"))

df = create_date_column(df, "start_year", "start_month", "start_day", "start_date")
df = create_date_column(df, "end_year", "end_month", "end_day", "end_date")

In [44]:
df.filter(df.disaster_group == "Natural").orderBy(F.desc("start_date")).show(5)

+-------------+--------+------------------+--------------+-----------------+-------------+----------------+--------------------+-----------------+---+--------------------+--------------------+--------+--------------------+------+----------------+-----------------+------+-----------+----------------+---------+---------------+--------+---------+-----------+----------+-----------+---------+--------+---------+-------+------------+-------+--------+--------+--------------+--------------------+-----------------------------+--------------+-----------------------+------------+---------------------+----+-----------+----------+-----------+----------+----------+
|        disno|historic|classification_key|disaster_group|disaster_subgroup|disaster_type|disaster_subtype|        external_ids|       event_name|iso|             country|           subregion|  region|            location|origin|associated_types|ofda_bha_response|appeal|declaration|aid_contribution|magnitude|magnitude_scale|latitude|longit

In [48]:
df = df.withColumn("duration_days", F.datediff(col("end_date"), col("start_date")))

In [45]:
df.groupBy("country").sum("insured_damage_adjusted").orderBy(F.desc("sum(insured_damage_adjusted)")).show(truncate=False)

+--------------------------+----------------------------+
|country                   |sum(insured_damage_adjusted)|
+--------------------------+----------------------------+
|United States of America  |758511085                   |
|Puerto Rico               |38421087                    |
|Mexico                    |17277689                    |
|Chile                     |12564374                    |
|Canada                    |10727427                    |
|Cayman Islands            |2488594                     |
|Saint Martin (French Part)|1530472                     |
|Saint Barth�lemy          |1303735                     |
|Peru                      |852322                      |
|Brazil                    |737726                      |
|Ecuador                   |710949                      |
|Sint Maarten (Dutch part) |621537                      |
|Jamaica                   |568881                      |
|El Salvador               |499041                      |
|Bahamas      

In [52]:
df.groupBy("disaster_subtype").count().show(truncate=False)

+--------------------------------+-----+
|disaster_subtype                |count|
+--------------------------------+-----+
|Forest fire                     |82   |
|Cold wave                       |42   |
|Fire (Industrial)               |3    |
|Water                           |93   |
|Flood (General)                 |310  |
|Sand/Dust storm                 |1    |
|Coastal flood                   |6    |
|Road                            |386  |
|Tsunami                         |4    |
|Rail                            |26   |
|Oil spill                       |3    |
|Ash fall                        |39   |
|Collapse (Industrial)           |12   |
|Miscellaneous accident (General)|23   |
|Tropical cyclone                |428  |
|Infectious disease (General)    |2    |
|Explosion (Industrial)          |34   |
|Parasitic disease               |3    |
|Landslide (wet)                 |69   |
|Blizzard/Winter storm           |81   |
+--------------------------------+-----+
only showing top