# Mt. Everest Data Cleansing 
Silver Layer Processing

### Importing libs

In [None]:
from pyspark.sql import functions as F, types as T, DataFrame as Fr, Column as C 

### Himalayan Database data

Starting with the Expedition data, I'm going to create a dataframe for each file to start working with it.

In [None]:
df_expeditions = spark.read.format("csv").option("header","true").option("sep", ";").load("Files/expeditions.csv")
df_peaks = spark.read.format("csv").option("header","true").option("sep", ";").load("Files/peaks.csv")
df_members = spark.read.format("csv").option("header","true").option("sep", ";").load("Files/members.csv")

I just want to import data about Everest, so I'll start with removing all other rows from Peaks - this with be the starting point for the upcoming filters.

In [None]:
df_peaks = df_peaks.filter(df_peaks.PKNAME == 'Everest')

Next I'm doing the same concerning the Expeditions, but for this I need to do three steps:
1. Rename the PEAKID column in the Expeditions dataframe
2. Join the Expeditions with the Peaks using the PEAKID column
3. Clearing from the resulting dataframe all the columns that were imported from the Peaks dataframe 

I started by renaming the column PEAKID from the Expeditions to PEAKID_EXP so that when dropping the extra columns there would not be a duplicate PEAKID column.

In [None]:
df_expeditions = df_expeditions.withColumnRenamed('PEAKID', 'PEAKID_EXP')
df_expeditions = df_expeditions.join(df_peaks, df_expeditions.PEAKID_EXP == df_peaks.PEAKID)
df_expeditions = df_expeditions.drop(*(F.col(c) for c in df_peaks.columns))
#display(df_expeditions.head(10))

And now I'll filter the Members dataframe using the Peaks dataframe. So again:
1. Rename the PEAKID column in the Members dataframe
2. Join the Members with the Peaks using the PEAKID column
3. Clearing from the resulting dataframe all the columns that were imported from the Peaks dataframe 

In [None]:
df_members = df_members.withColumnRenamed('PEAKID', 'PEAKID_MEM')
df_members = df_members.join(df_peaks, df_members.PEAKID_MEM == df_peaks.PEAKID)
df_members = df_members.drop(*(F.col(c) for c in df_peaks.columns))
#display(df_members.head(10))

After filtering these files, I'm changing the types and names for each column.

In [None]:
df_peaks = (
    df_peaks
    .select(
        F.col('PEAKID').alias('PeakID'),
        F.col('PKNAME').alias('PeakName'),
        F.col('PKNAME2').alias('AlternativePeakName'),
        F.col('LOCATION').alias('Location'),
        F.col('HEIGHTM').cast(T.IntegerType()).alias('Height_m'),
        F.col('OPEN').alias('Open'),
        F.col('PEXPID').alias('ExpeditionID'),
        F.date_format(F.to_date(F.concat_ws(' ', df_peaks.PYEAR, df_peaks.PSMTDATE), 'yyyy MMM dd'), 'yyyy-MM-dd').alias('SummitDate'),
        F.col('PSUMMITERS').alias('Summiters'),
        F.col('PCOUNTRY').alias('SummitersCountry')
    )
)

In [None]:
df_expeditions = (
    df_expeditions
    .select(
        F.col('EXPID').alias('ExpeditionID'),
        F.col('PEAKID_EXP').alias('PeakID'),
        F.col('YEAR').cast(T.ShortType()).alias('ExpeditionYear'),
        F.col('ROUTE1').alias('Route'),
        F.col('NATION').alias('CountryOfOrigin'),
        F.col('LEADERS').alias('ExpeditionLeaders'),
        F.col('SPONSOR').alias('Sponsor'),
        F.col('SUCCESS1').alias('SummitSuccess'),
        F.date_format(F.to_date(F.col('BCDATE'), 'dd/MM/yyyy'), 'yyyy-MM-dd').alias('BaseCampDate'),
        F.date_format(F.to_date(F.col('SMTDATE'), 'dd/MM/yyyy'), 'yyyy-MM-dd').alias('SummitDate'),
        F.concat(F.substring(F.col('SMTTIME'), 1, 2), F.lit(":"), F.substring(F.col('SMTTIME'), 3, 2)).alias('SummitTime'),
        F.col('SMTDAYS').cast(T.ShortType()).alias('DaysToSummit'),
        F.date_format(F.to_date(F.col('TERMDATE'), 'dd/MM/yyyy'), 'yyyy-MM-dd').alias('ExpeditionEndDate'),
        F.col('TERMNOTE').alias('Notes'),
        F.col('CAMPS').cast(T.ByteType()).alias('Camps'),
        F.col('Rope').alias('RopeUsed_m'),
        F.col('TOTMEMBERS').cast(T.ByteType()).alias('TotalMembers'),
        F.col('SMTMEMBERS').cast(T.ByteType()).alias('SummitMembers'),
        F.col('MDEATHS').cast(T.ByteType()).alias('MembersDeath'),
        F.col('TOTHIRED').cast(T.ByteType()).alias('TotalHired'),
        F.col('SMTHIRED').cast(T.ByteType()).alias('SummitHired'),
        F.col('HDEATHS').cast(T.ByteType()).alias('HiredDeath'),
        F.col('O2USED').alias('UsedO2'),
        F.col('OTHERSMTS').alias('OtherSummits'),
        F.col('CAMPSITES').alias('Campsites'),
        F.col('ACCIDENTS').alias('Accidents'),
        F.col('ACHIEVMENT').alias('Achievement')
    )
)

In [None]:
df_members = (
    df_members
    .select(
        F.col('EXPID').alias('ExpeditionID'),
        F.col('PEAKID_MEM').alias('PeakID'),
        F.col('MYEAR').cast(T.ShortType()).alias('ExpeditionYear'),
        F.col('FNAME').alias('FirstName'),
        F.col('LNAME').alias('LastName'),
        F.col('Sex').alias('Gender'),
        F.col('YOB').cast(T.ShortType()).alias('YearOfBirth'),
        F.col('CALCAGE').cast(T.ByteType()).alias('Age'),
        F.col('CITIZEN').alias('Nationality'),
        F.col('STATUS').alias('Status'),
        F.col('OCCUPATION').alias('Occupation'),
        F.col('LEADER').alias('Leader'),
        F.col('SUPPORT').alias('Support'),
        F.col('DISABLED').alias('Disabled'),
        F.col('SHERPA').alias('Sherpa'),
        F.col('TIBETAN').alias('Tibetan'),
        F.col('MSUCCESS').alias('SummitSuccess'),
        F.col('MSOLO').alias('SoloSummit'),
        F.date_format(F.to_date(F.col('MSMTDATE1'), 'dd/MM/yyyy'), 'yyyy-MM-dd').alias('SummitDate'),
        F.concat(F.substring(F.col('MSMTTIME1'), 1, 2), F.lit(":"), F.substring(F.col('MSMTTIME1'), 3, 2)).alias('SummitTime'),
        F.col('MROUTE1').alias('MountainRoute'),
        F.col('MASCENT1').alias('AscentRoute'),
        F.col('MO2USED').alias('UsedO2'),
        F.col('DEATH').alias('Death'),
        F.date_format(F.to_date(F.col('DEATHDATE'), 'dd/MM/yyyy'), 'yyyy-MM-dd').alias('DeathDate'),
        F.concat(F.substring(F.col('DEATHTIME'), 1, 2), F.lit(":"), F.substring(F.col('DEATHTIME'), 3, 2)).alias('DeathTime'),
        F.col('DEATHNOTE').alias('DeathNote'),
        F.col('INJURY').alias('Injury'),
        F.date_format(F.to_date(F.col('INJURYDATE'), 'dd/MM/yyyy'), 'yyyy-MM-dd').alias('InjuryDate'),
        F.concat(F.substring(F.col('INJURYTIME'), 1, 2), F.lit(":"), F.substring(F.col('INJURYTIME'), 3, 2)).alias('InjuryTime')
    )
)

### Weather data

Next up I'm going to treat the weather data, starting with the current weather data.

The values here are stored in a "key: value" format, so I need to extract each value in order to create a table with the columns I'm interested in.

It's important to note that Datetime is reflecting the timezone in Mount Everest, which is GMT + 8h.

In [None]:
df = spark.read.option("multiline", "true").json("Files/mteverest_weather_data.json")
df_current_weather = (df.select(
    F.date_format(F.col('current.time'), 'yyyy-MM-dd').alias('Date'),
    F.date_format(F.col('current.time'), 'HH:mm').alias('Time'),
    F.date_format(F.col('daily.sunrise').getItem(0), 'HH:mm').alias('Sunrise'), 
    F.date_format(F.col('daily.sunset').getItem(0), 'HH:mm').alias('Sunset'),
    F.col('latitude').alias('Latitude'),
    F.col('longitude').alias('Longitude'),
    F.col('elevation').alias('Elevation_m'),
    F.col('timezone').alias('Location'),
    F.col('timezone_abbreviation').alias('Timezone'),
    F.col('current.weather_code').alias('WeatherCode'),
    F.col('current.precipitation').alias('Precipitation_mm'),
    F.col('current.temperature_2m').alias('Temperature_ºC'),
    F.col('current.wind_speed_10m').alias('WindSpeed_km/h'),
    F.col('current.wind_gusts_10m').alias('WindGusts_km/h'),
    F.col('current.relative_humidity_2m').alias('RelativeHumidity_%'),
    F.col('current.snowfall').alias('Snowfall_cm')
))

Next I'm organizing the information for historical weather data. I need to expand the values in different columns because each column had the results stored for all dates and I need to convert that to multiple rows.

In [None]:
df = spark.read.option("multiline", "true").json("Files/mteverest_hist_weather_data.json")
df = df.select(
    F.col('hourly.time').alias('Datetime'),
    F.col('daily.time').alias('Date'),
    F.col('daily.sunrise').alias('Sunrise'),
    F.col('daily.sunset').alias('Sunset'),
    F.col('latitude').alias('Latitude'),
    F.col('longitude').alias('Longitude'),
    F.col('elevation').alias('Elevation_m'),
    F.col('timezone').alias('Location'),
    F.col('timezone_abbreviation').alias('Timezone'),
    F.col('hourly.weather_code').alias('WeatherCode'),
    F.col('hourly.precipitation').alias('Precipitation'), 
    F.col('hourly.temperature_2m').alias('Temperature'),
    F.col('hourly.wind_speed_10m').alias('WindSpeed'),
    F.col('hourly.wind_gusts_10m').alias('WindGusts'),
    F.col('hourly.relative_humidity_2m').alias('RelativeHumidity'),
    F.col('hourly.snowfall').alias('Snowfall')
)

In [None]:
df_hist_weather = (
    df
    .withColumn(
        'zipped',
        F.arrays_zip(
            F.col('Datetime'),
            F.col('WeatherCode'),
            F.col('Precipitation'),
            F.col('Temperature'),
            F.col('WindSpeed'),
            F.col('WindGusts'),
            F.col('RelativeHumidity'),
            F.col('Snowfall')
        )
    )
    .withColumn('zipped', F.explode(F.col('zipped')))
    .select(
        F.date_format(F.col('zipped.Datetime'), 'yyyy-MM-dd').alias('Date'),
        F.date_format(F.col('zipped.Datetime'), 'HH:mm').alias('Time'),
        F.col('Latitude'),
        F.col('Longitude'),
        F.col('Elevation_m'),
        F.col('Location'),
        F.col('Timezone'),
        F.col('zipped.WeatherCode').alias('WeatherCode'),
        F.col('zipped.Precipitation').alias('Precipitation_mm'),
        F.col('zipped.Temperature').alias('Temperature_ºC'),
        F.col('zipped.WindSpeed').alias('WindSpeed_km/h'),
        F.col('zipped.WindGusts').alias('WindGusts_km/h'),
        F.col('zipped.RelativeHumidity').alias('RelativeHumidity_%'),
        F.col('zipped.Snowfall').alias('Snowfall_cm')
    )
)

I'm creating a separate dataframe to store the sunrise and sunset values.

In [None]:
df_weather_daily = (
    df
    .withColumn(
        'zipped',
        F.arrays_zip(
            F.col('Date'),
            F.col('Sunrise'),
            F.col('Sunset')
        )
    )
    .withColumn('zipped', F.explode(F.col('zipped')))
    .select(
        F.col('zipped.Date').alias('Date'),
        F.date_format(F.col('zipped.Sunrise'), 'HH:mm').alias('Sunrise'),   
        F.date_format(F.col('zipped.Sunset'), 'HH:mm').alias('Sunset'),     
        F.col('Latitude'),
        F.col('Longitude'),
        F.col('Elevation_m'),
        F.col('Location'),
        F.col('Timezone')
    )
)

I now need to import the information about the weather codes that is also stored in a csv file and that was previously imported directly to the lakehouse.

In [None]:
df_weather_codes = spark.read.format("csv").option("header","true").option("sep", ";").load("Files/weather_codes.csv")

Finally, I'm saving this information as a table for the gold layer.

In [None]:
df_expeditions.write.mode('overwrite').saveAsTable('expeditions_silver')
df_peaks.write.mode('overwrite').saveAsTable('peaks_silver')
df_members.write.mode('overwrite').saveAsTable('members_silver')
df_current_weather.write.mode('overwrite').saveAsTable('weather_current_silver')
df_hist_weather.write.mode('overwrite').saveAsTable('weather_hist_silver')
df_weather_codes.write.mode('overwrite').saveAsTable('weather_codes_silver')
df_weather_daily.write.mode('overwrite').saveAsTable('weather_daily_silver')