# Spark DataFrame

### Download dataset

In [None]:
!wget -O natural_disasters_BIG.csv https://www.dropbox.com/s/b74bpkbr89l8k43/natural_disasters_BIG.csv?dl=0

--2019-11-15 13:03:00--  https://www.dropbox.com/s/b74bpkbr89l8k43/natural_disasters_BIG.csv?dl=0
Resolving www.dropbox.com (www.dropbox.com)... 162.125.68.1, 2620:100:6024:1::a27d:4401
Connecting to www.dropbox.com (www.dropbox.com)|162.125.68.1|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: /s/raw/b74bpkbr89l8k43/natural_disasters_BIG.csv [following]
--2019-11-15 13:03:01--  https://www.dropbox.com/s/raw/b74bpkbr89l8k43/natural_disasters_BIG.csv
Reusing existing connection to www.dropbox.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://ucc9819570c7610595a55e17ea82.dl.dropboxusercontent.com/cd/0/inline/AsYw-bQV7AFGhJ6MKrcp1CROm7tmOb86tjiT0Hq5zvHRUkZOwI19Kp61JY8AxPd7Luhm0OnTwBxxgoWIxSLfRsk8Y5qymNCvXc1eyheuXYri1w/file# [following]
--2019-11-15 13:03:01--  https://ucc9819570c7610595a55e17ea82.dl.dropboxusercontent.com/cd/0/inline/AsYw-bQV7AFGhJ6MKrcp1CROm7tmOb86tjiT0Hq5zvHRUkZOwI19Kp61JY8AxPd7Luhm0OnTwBxxgoWIxSLfRsk8

### 1. How many disasters occurred in continent C?

In [None]:
%%file dataframe_1.py
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('disasters').getOrCreate()
sc = spark.sparkContext

try :
    # import input CSV file
    lines = sc.textFile('natural_disasters_BIG.csv')
    # separate each line by its columns and retrieve only the columns with continents and occurrences
    disasterRows = lines.map( lambda line : line.split(',') ) \
                        .map( lambda arr : Row( continent = arr[5], count=int(arr[9])))
    # create a data frame with the continent and occurrences columns
    disasterRowsDF = spark.createDataFrame( disasterRows )
    
    # group the data by continents and, for each continent, sum the occurrences count and aggregate this new column
    # to the column with the continents
    continentsDF = disasterRowsDF.groupBy('continent').agg(sum('count').alias('count'))
    
    # show all results
    continentsDF.show()
    sc.stop()
    
except Exception as e:
    print(e)
    sc.stop()

In [None]:
!bash -c "time python dataframe_1.py"

### 2. In which regions there were disasters of type X?

In [None]:
%%file dataframe_2.py
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('disasters').getOrCreate()
sc = spark.sparkContext

try :
    # import input CSV file
    lines = sc.textFile('natural_disasters_BIG.csv')
    # separate each line by its columns and retrieve only the columns with disaster type and region
    disasterRows = lines.map( lambda line : line.split(',') ) \
                        .map( lambda arr : Row( type_x = arr[3], region = arr[6]))
    # create a data frame with the disaster type and regions columns
    disasterRowsDF = spark.createDataFrame( disasterRows )
    
    # group the data by disaster type and, for each type, collect the distinct regions in a list
    regionByType = disasterRowsDF.groupBy('type_x').agg(collect_set('region').alias('region'))
    
    # show results
    regionByType.show()
    sc.stop()

except Exception as e:
    print(e)
    sc.stop()

In [None]:
!bash -c "time python dataframe_2.py"

### 3. What are the probabilities of getting injured or dying in a natural disaster of type T in the continent C during decade D (190x, 191x, 192x, ..., 199x, 200x, 201x)?

In [None]:
%%file dataframe_3.py
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('disasters').getOrCreate()
sc = spark.sparkContext

try :
    # import input CSV file
    lines = sc.textFile('natural_disasters_BIG.csv')
    # separate each line by its columns, remove the empty lines in the columns of interest and retrieve only the columns 
    # with the decade (year column with a 0 in the last digit), the disaster type, the continent, the deaths, the injured 
    # and the total affected
    disasterRows = lines.map( lambda line : line.split(',') ) \
                        .filter(lambda lines: len(lines[3])>0 and len(lines[5]) and len(lines[10])>0 and len(lines[11])>0 and len(lines[14])>0) \
                        .map( lambda arr : Row(decade = arr[0][:len(arr[0])-1]+"0", type_ = arr[3], continent = arr[5],
                                               deaths = int(arr[10]), injured = int(arr[11]), affected = int(arr[14])))
    # create a data frame with the selected columns
    disasterRowsDF = spark.createDataFrame( disasterRows )
    
    # group the data by decade, disaster type and continent and, for each combination and that keys, sum the numbers of
    # deaths, the number of injured and the number of total affected
    summed_stats_columns = disasterRowsDF.groupBy('decade','type_','continent') \
                         .agg(sum('deaths').alias('deaths'),sum('injured').alias('injured'),sum('affected').alias('affected'))
    
    # create a new column with the summed deaths divided by the summed total affected
    death_probs = summed_stats_columns.withColumn('death_prob', summed_stats_columns['deaths'] / summed_stats_columns['affected'])
    
    # create a new column with the summed injured divided by the summed total affected
    injured_probs = death_probs.withColumn('injury_prob', summed_stats_columns['injured'] / summed_stats_columns['affected'])
    
    # drop the original columns with the summed deaths, injured and total affected
    final_result = injured_probs.drop("deaths").drop("injured").drop("affected")
    
    # show results
    final_result.show()
    sc.stop()

except Exception as e:
    print(e)
    sc.stop()


In [None]:
!bash -c "time python dataframe_3.py"

### 4. Optional Exercise. What is the mean total damage and most costly disaster's expanses of a certain disaster subgroup in each country?

In [None]:
%%file dataframe_4.py
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('disasters').getOrCreate()
sc = spark.sparkContext

try :
    # import input CSV file
    lines = sc.textFile('natural_disasters_BIG.csv')
    # separate each line by its columns, remove the empty lines in the column of interest and retrieve only the columns 
    # with the country, disaster subgroup, total damage cost and ocurrences
    disasterRows = lines.map( lambda line : line.split(',') ) \
                        .filter(lambda lines: len(lines[15])>0) \
                        .map( lambda arr : Row(country = arr[8], subgroup = arr[2], total_damage = float(arr[15]), occurrences = int(arr[9])))
    # create a data frame with the selected columns
    disasterRowsDF = spark.createDataFrame( disasterRows )
    
    # group the data by disaster subgroup and country and, for each combination and those keys, sum the numbers of
    # total damages and get the maximum for total damage
    summed_stats_columns = disasterRowsDF.groupBy('country','subgroup') \
                         .agg(sum('total_damage').alias('total_damage'), sum('occurrences').alias('occurrences'),max('total_damage').alias('max_damage'))
    
    # create a new column with the summed total_damage divided by the occurrences
    mean_damage = summed_stats_columns.withColumn('mean_damage', summed_stats_columns['total_damage'] / summed_stats_columns['occurrences'])
    
    # drop the original columns with the summed total damages and occurrences
    final_result = mean_damage.drop("total_damage").drop("occurrences")
    
    # show results
    final_result.show()
    sc.stop()

except Exception as e:
    print(e)
    sc.stop()


In [None]:
!bash -c "time python dataframe_4.py"