In [1]:
import pyspark.sql.functions as fn
import pyspark.sql.types
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [32]:
# 1. Filters years from 2005-2020
# 2. Unbucketize the columns
# 3. Filters chosen countries
def filterCountryData(df,countries_chosen):
    
    # range b/w 2005-2020
    years = list(map(lambda x: str(x),list(range(2005,2021,1)))) 
    
    cols =["Country Name","Country Code","Indicator Name","Indicator Code"]+years
    country_2005_20 = df.select(cols)
    
    
    # filters countries chosen and fills any missing year values with 0.00
    ts = "2020-04-01"
    countries_chosen_2005_20 = (country_2005_20
                                .filter(fn.col("Country Name").isin(countries_chosen)).fillna(0.00, subset=years)
                                .withColumn("date",fn.date_format(fn.lit(ts),"yyyy-MM-dd"))
                               )
    
    #unbucketize the data
    unpivotStr= list(map(lambda x: " '{t}',`{t}`".format(t=x),years))
    sep = ','
    unpivotExpr = "stack("+str(len(years))+", "+sep.join(unpivotStr)+") as (Year, Value)"
    columns_without_years= set(countries_chosen_2005_20.columns ) - set(years)
    
    res = countries_chosen_2005_20.select(
        "Country Name",
        "Country Code",
        "Indicator Name",
        fn.expr(unpivotExpr),
        fn.month("date").alias("month"),
        fn.dayofmonth("date").alias("day"),
        fn.quarter("date").alias("quarter")            
    )
    
    #TODO: join the dimensions to make a fact table

    return res

In [80]:
# Date Dimension
def generate_dates(spark,range_list,interval=60*60*24,dt_col="date_time_ref"): # TODO: attention to sparkSession
     """
     Create a Spark DataFrame with a single column named dt_col and a range of date within a specified interval (start and stop included).
     With hourly data, dates end at 23 of stop day

     :param spark: SparkSession or sqlContext depending on environment (server vs local)
     :param range_list: array of strings formatted as "2018-01-20" or "2018-01-20 00:00:00"
     :param interval: number of seconds (frequency), output from get_freq()
     :param dt_col: string with date column name. Date column must be TimestampType

     :returns: df from range
     """
     start,stop = range_list
     temp_df = spark.createDataFrame([(start, stop)], ("start", "stop"))
     temp_df = temp_df.select([fn.col(c).cast("timestamp") for c in ("start", "stop")])
     temp_df = temp_df.withColumn("stop",fn.date_add("stop",1).cast("timestamp"))
     temp_df = temp_df.select([fn.col(c).cast("long") for c in ("start", "stop")])
     start, stop = temp_df.first()
     return spark.range(start,stop,interval).select(fn.col("id").cast("timestamp").alias(dt_col))


def dateDimension():
    time_rng = ["2005-01-01","2020-12-31"]
    year_df= generate_dates(spark,time_rng)
    tmp = (year_df
           .withColumn("year",fn.year("date_time_ref"))
           .withColumn("month",fn.month("date_time_ref"))
           .withColumn("day",fn.dayofmonth("date_time_ref"))
           .withColumn("quarter",fn.quarter("date_time_ref"))
           .withColumn("decade",
                          fn.when(fn.col("year") % 10 >=5,fn.col("year")-fn.col("year")%10+10)
                              .otherwise(fn.col("year")- fn.col("year") % 10))
           .withColumn("year_code",fn.monotonically_increasing_id())

          )
    date_dim = (tmp
                   .select(tmp.year_code,*set(tmp.columns)-set(["year_code"]))
               )
    
    return date_dim

In [90]:
def naturalDisasterDim(df,filePath,countries_chosen):
    """
        creates natural disaster dimension + look up table
    
        df - date dataframe
        filePath - filePath to natural disaster csv
        countries_chosen - list of strings of countries to work on

    """
    
    # reads csv
    natural_disaster_df = (spark
                       .read
                       .format('csv')
                       .option("inferSchema",True)
                       .option("header",True)
                       .load(filePath))
    
    
    # reconfigures column names 
    tmp_nd = (natural_disaster_df
                  # replaces United States of America -> united states
                  .withColumn("Country",fn.when(fn.lower(fn.col("Country")).contains("united states"),"united states").otherwise(fn.lower(fn.col("Country"))))
                  .withColumn("start_month",fn.col("Start Month"))
                  .withColumn("start_year",fn.col("Start Year"))
                  .withColumn("start_day",fn.col("Start Day"))
                  .withColumn("end_month",fn.col("End Month"))
                  .withColumn("end_year",fn.col("End Year"))
                  .withColumn("end_day",fn.col("End Day"))
                  .drop("year")
             )

    # join on start year
    nd_j_on_date = df.join(tmp_nd,tmp_nd.start_year==df.year).select(*tmp_nd.columns)
    
    
    # filter countries chosen
    tmp = (nd_j_on_date
           .filter(fn.col("Country").isin(list(map(lambda x: x.lower(),countries_chosen))))
           .withColumn("NaturalDisasterKey",fn.monotonically_increasing_id())
          )
    
    
    lookup = (tmp
              .select(
                  "NaturalDisasterKey",
                  "Country",
                  "start_year",
                  "start_month",
                  "start_day",
                  "end_year",
                  "end_month",
                  "end_day"
              ))
    
    natural_disaster_dimension = (tmp
                                  .select(fn.col("NaturalDisasterKey"),
                                          fn.col("country").alias("region"),
                                          fn.col("Disaster Type").alias("disaster_type"),
                                          fn.col("disaster subtype").alias("diaster_subtype"),
                                          fn.col("disaster subsubtype").alias("disaster_nestedsubtype"),
                                           fn.col("disaster subgroup").alias("disaster_subgroup"),
                                           fn.col("event name").alias("event_name"),
                                           fn.col("no injured").alias("ttl_injured"),
                                          fn.col("total deaths").alias("ttl_death"),
                                   fn.col("no affected").alias("ttl_affected"),
                                   fn.col("Total Damages ('000 US$)").alias("ttl_damaged_usd_thousands"),
                                   fn.col("ofda response").alias("ofda_response"))
                                 )
    
    return natural_disaster_dimension,lookup

In [6]:
# def countryDimension(df,countries_chosen):
    
#     tmp = (df
#                .filter(fn.lower(fn.col("Country")).isin(list(map(lambda x: x.lower(),countries_chosen))))
#                .withColumn("")
           
    

In [None]:
spark = SparkSession.builder.appName("ds_datastage").getOrCreate()

In [91]:
#MAIN block
countries_chosen = ["United States", "Canada","Mexico","Thailand","Finland","Nigeria","Somalia","Norway","Japan"]



df=spark.read.format("csv").option("header",True).option("inferSchema",True).load("AssignmentData/HNP_StatsData.csv")

#filtered data
# filterdCountryDf=filterCountryData(df,countries_chosen)
dateDim = dateDimension()
naturalDisasterDimension, nd_lookup=naturalDisasterDim(
    dateDim,
    countries_chosen=countries_chosen,
    filePath="AssignmentData/ExternalSources/DISASTERS/1900_2021_DISASTERS.xlsx - emdat data.csv"
)
# countryDimension = 

# display(filterdCountryDf.toPandas())
# display(dateDim.toPandas())
display(naturalDisasterDimension.toPandas())
display(nd_lookup.toPandas())

Unnamed: 0,NaturalDisasterKey,region,disaster_type,diaster_subtype,disaster_nestedsubtype,disaster_subgroup,event_name,ttl_injured,ttl_death,ttl_affected,ttl_damaged_usd_thousands,ofda_response
0,0,japan,Earthquake,Ground movement,,Geophysical,,735.0,1.0,2800.0,400000.0,
1,1,japan,Earthquake,Ground movement,,Geophysical,,735.0,1.0,2800.0,400000.0,
2,2,japan,Earthquake,Ground movement,,Geophysical,,735.0,1.0,2800.0,400000.0,
3,3,japan,Earthquake,Ground movement,,Geophysical,,735.0,1.0,2800.0,400000.0,
4,4,japan,Earthquake,Ground movement,,Geophysical,,735.0,1.0,2800.0,400000.0,
...,...,...,...,...,...,...,...,...,...,...,...,...
295114,8590004000,united states,Wildfire,Forest fire,,Climatological,August Complex fire,,32.0,,11000000.0,
295115,8590004001,united states,Wildfire,Forest fire,,Climatological,August Complex fire,,32.0,,11000000.0,
295116,8590004002,united states,Wildfire,Forest fire,,Climatological,August Complex fire,,32.0,,11000000.0,
295117,8590004003,united states,Wildfire,Forest fire,,Climatological,August Complex fire,,32.0,,11000000.0,


Unnamed: 0,NaturalDisasterKey,Country,start_year,start_month,start_day,end_year,end_month,end_day
0,0,japan,2005,3.0,20.0,2005,3.0,20.0
1,1,japan,2005,3.0,20.0,2005,3.0,20.0
2,2,japan,2005,3.0,20.0,2005,3.0,20.0
3,3,japan,2005,3.0,20.0,2005,3.0,20.0
4,4,japan,2005,3.0,20.0,2005,3.0,20.0
...,...,...,...,...,...,...,...,...
295114,8590004000,united states,2020,8.0,16.0,2020,10.0,1.0
295115,8590004001,united states,2020,8.0,16.0,2020,10.0,1.0
295116,8590004002,united states,2020,8.0,16.0,2020,10.0,1.0
295117,8590004003,united states,2020,8.0,16.0,2020,10.0,1.0


In [71]:
### TESTING BLOCK


## LOOKUP TABLE LOGIC
# 2006-2010
tmp = dateDim.filter(fn.col("year")==2006).select(fn.col("year").alias("year_2"))
dateDim_a = dateDim.alias("a")
tmp_b = tmp.alias("b")

test2 = dateDim_a.join(tmp_b.alias("b"),tmp_b.year_2<dateDim_a.year)
test3 = dateDim_a.join(tmp_b.alias("b"),2010>dateDim_a.year)

test2.show()
test3.show()

test3.intersect(test2).show()

+------------+----+------+------+
|   year_code|year|decade|year_2|
+------------+----+------+------+
| 17179869184|2007|  2010|  2006|
| 25769803776|2008|  2010|  2006|
| 34359738368|2009|  2010|  2006|
| 42949672960|2010|  2010|  2006|
| 51539607552|2011|  2010|  2006|
| 60129542144|2012|  2010|  2006|
| 68719476736|2013|  2010|  2006|
| 77309411328|2014|  2010|  2006|
| 85899345920|2015|  2020|  2006|
| 94489280512|2016|  2020|  2006|
|103079215104|2017|  2020|  2006|
|111669149696|2018|  2020|  2006|
|120259084288|2019|  2020|  2006|
|128849018880|2020|  2020|  2006|
+------------+----+------+------+

+-----------+----+------+------+
|  year_code|year|decade|year_2|
+-----------+----+------+------+
|          0|2005|  2010|  2006|
| 8589934592|2006|  2010|  2006|
|17179869184|2007|  2010|  2006|
|25769803776|2008|  2010|  2006|
|34359738368|2009|  2010|  2006|
+-----------+----+------+------+

+-----------+----+------+------+
|  year_code|year|decade|year_2|
+-----------+----+-----

In [None]:
### TEST FUNCTION
# df - dataframe
# col - column to observe
def nullCount(df,cl):
    non_null =(df
     .filter(fn.col(cl).isNotNull())
     .select(df.ttl_damaged_usd_thousands)
     .groupBy(df.ttl_damaged_usd_thousands)
     .count()
     .sort(naturalDisasterDimension.ttl_damaged_usd_thousands.asc())
    )
    
    print("Number of non null values: "+non_null.count())
    null = (naturalDisasterDimension
        .filter(naturalDisasterDimension.ttl_damaged_usd_thousands.isNull())
        .groupBy(naturalDisasterDimension.ttl_damaged_usd_thousands)
        .count().alias("null count")
        .show()
    )