# Weather Enhanced MN Phenology Dataset
Combines phenology and weather data at the county level for the state of Minnesota.

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, DateType

INPUT_FOLDER = "data/"
OUTPUT_FOLDER = "out/"
CLIMHEADER=["FIPS-YEAR","JAN","FEB","MAR","APR","MAY","JUN","JUL",\
            "AUG","SEP","OCT","NOV","DEC"]

## Phenology and County Codes
Done together since FIPS codes aid with phenology data cleanup

### Data clean up for phenology data
- remove empty counties
- drop duplicates
- align multiple spellings
- correct abbreviations, etc...

County Changes to make based off of mismatched county data
- "ASHLAND" county is in WI, remove since it's out of scope for this project
- "RAMSEY " has an extra space to take out
- "ST.LOUIS" and "ST. LOUIS" should be "ST LOUIS"
- "BR" is probably "Brown", it's the only "Br*" county and there are no counties with "BR" as initials
- "MORRISON COUNTY" should just be "MORRISON"
- "AITKIN/ITASCA" & "AITKIN /ITASCA" could be either one, remove for sake of cleanliness

In [2]:
def processPhenology(spark, phenology):
    """
    Takes a spark session and the data to be read in, performing some data cleanup.
    """
    # read in phenology
    mnphn_df = spark.read.option("header", True) \
        .csv(INPUT_FOLDER + phenology)
    # Data clean up
    mnphn_df = mnphn_df.filter("COUNTY is not NULL").dropDuplicates()
    mnphn_df = mnphn_df.filter("COUNTY not in ('ASHLAND', 'AITKIN /ITASCA', 'AITKIN/ITASCA')")
    mnphn_df = mnphn_df.withColumn("COUNTY", when(mnphn_df.COUNTY == "RAMSEY ","RAMSEY")
                    .when(mnphn_df.COUNTY == "ST.LOUIS","ST LOUIS")
                    .when(mnphn_df.COUNTY == "ST. LOUIS","ST LOUIS")
                    .when(mnphn_df.COUNTY == "BR","BROWN")
                    .when(mnphn_df.COUNTY == "MORRISON COUNTY","MORRISON")
                    .otherwise(mnphn_df.COUNTY))
    return mnphn_df

In [3]:
def processCounties(spark, counties):
    """
    Takes a spark session and the input data for counties and returns a data frame.
    """
    fips_df = spark.read.json(INPUT_FOLDER + counties, multiLine=True)
    # filter to minnesota for coutnies
    mnfips_df = fips_df.filter(fips_df.State == "MN")
    return mnfips_df

#### Data Quality Check
Compare phenology data's counties to FIPS codes to determine invalid county names. Zero mismatches is a pass.

In [4]:
def chkPhnCntMismatch(mnphn_df, fips_df):
    mismatches = mnphn_df.join(mnfips_df, lower(mnphn_df.COUNTY) == lower(mnfips_df.Name), "leftanti") \
        .select("COUNTY").distinct().count()
    if mismatches == 0:
        return True
    else:
        return False

#### Climate Data
Percipitation and temperatures per county and month.

For perciptation, range of values is listed as 00.00 to 99.99. "-9.99" indicates no reading, so should be null. For temperature, the range is -50 to 140 and -99.99 indicates no reading.

In [5]:
@udf(returnType=DoubleType()) 
def noRead2null(obsv, nullVal):
    '''
    When percipitation is the "no reading" value it should be null
    '''
    if obsv == nullVal:
        return None
    else:
        return obsv

In [6]:
def processClim(spark, clim, noReadVal):
    """
    Takes in the spark session, input climate file, and no reading value,
    and and returns a data frame.
    """
    # Climate tables have a variable amount of spaces as their delimiters
    # Pandas can handle this where pyspark can't, so use that first
    clim_pd = pd.read_csv(INPUT_FOLDER + clim, \
                         sep="\s+",\
                         names=CLIMHEADER)
    clim_df = spark.createDataFrame(clim_pd)
    nullVal = lit(noReadVal)
    clim_df = clim_df.withColumn("JAN", noRead2null(col("JAN"), nullVal))\
    .withColumn("FEB", noRead2null(col("FEB"), nullVal))\
    .withColumn("MAR", noRead2null(col("MAR"), nullVal))\
    .withColumn("APR", noRead2null(col("APR"), nullVal))\
    .withColumn("MAY", noRead2null(col("MAY"), nullVal))\
    .withColumn("JUN", noRead2null(col("JUN"), nullVal))\
    .withColumn("JUL", noRead2null(col("JUL"), nullVal))\
    .withColumn("AUG", noRead2null(col("AUG"), nullVal))\
    .withColumn("SEP", noRead2null(col("SEP"), nullVal))\
    .withColumn("OCT", noRead2null(col("OCT"), nullVal))\
    .withColumn("NOV", noRead2null(col("NOV"), nullVal))\
    .withColumn("DEC", noRead2null(col("DEC"), nullVal))
    return clim_df

#### Data Quality Check
For perciptation, range of values is listed as 00.00 to 99.99, so make sure we don't have any invalid numbers. We can use this same function for temperature as well.

In [7]:
def chkClimRange(df, low, high):
    """
    Takes a climate data frame and low and high values and checks for invalid entries.
    """
    invalids = df.filter("""JAN NOT BETWEEN {lo} AND {hi} OR
            FEB NOT BETWEEN {lo} AND {hi} OR
            MAR NOT BETWEEN {lo} AND {hi} OR
            APR NOT BETWEEN {lo} AND {hi} OR
            MAY NOT BETWEEN {lo} AND {hi} OR
            JUN NOT BETWEEN {lo} AND {hi} OR
            JUL NOT BETWEEN {lo} AND {hi} OR
            AUG NOT BETWEEN {lo} AND {hi} OR
            SEP NOT BETWEEN {lo} AND {hi} OR
            OCT NOT BETWEEN {lo} AND {hi} OR
            NOV NOT BETWEEN {lo} AND {hi} OR
            DEC NOT BETWEEN {lo} AND {hi}""".format(lo = low, hi = high)).count()
    if invalids == 0:
        return True
    else:
        return False

## Create Analytic Tables
Star schema with _Observations_ as the fact table and the following dimension tables:
_Biological_, _County_, _Climate_, _Time_

### Biological
- species
- common name
- genus
- lifeform
- group
- invasive


In [8]:
def createBioTbl(mnphn_df):
    """
    Takes in the phenology data frame and returns the biological table.
    """
    bio_tbl = mnphn_df.select(\
       col("SPECIES").alias("species"),
       col("SPECIES (COMMON NAME)").alias("common_name"),
       col("GENUS ").alias("genus"),
       col("LIFEFORM").alias("lifeform"),
       col("GROUP").alias("group"),
       col("INVASIVE").alias("mn_invasive")).distinct()
    return bio_tbl

### County
- county_code
- county
- state

In [9]:
def createCntyTbl(mnfips_df):
    """
    Takes in the county data frame to create the county table.
    """
    cnty_tbl = mnfips_df.select(\
        col("FIPS").alias("county_code"),
        col("Name").alias("county"),
        col("State").alias("state"))
    return cnty_tbl

### Climate
- clim_id
- county_code
- year
- month
- tmpMin
- tmpMax
- tmpAvg
- pcpn

County code gets a little tricky here since NOAA and everyone else uses different state codes. For this MN restricted set, it's easy to go from 21 => 27, but for a national set, more care would be needed.
Also, the climate data is monthly data, so there's no day involved.

In [10]:
def convertClim(df, dataCol):
    '''
    Takes a climate data frame with month columns and FIPS-YEAR and converts
    them into month rows and splits county_code and year.
    Thanks to Doug on this post for the method: https://stackoverflow.com/a/64130519
    '''
    df = df.withColumn("county_code", col("FIPS-YEAR").substr(1,5))\
    .withColumn("year", col("FIPS-YEAR").substr(8,4)).drop("FIPS-YEAR")

    df = df.withColumn('j1', array(lit('01'), col('JAN')))
    df = df.withColumn('f2', array(lit('02'), col('FEB')))
    df = df.withColumn('m3', array(lit('03'), col('MAR')))
    df = df.withColumn('a4', array(lit('04'), col('APR')))
    df = df.withColumn('m5', array(lit('05'), col('MAY')))
    df = df.withColumn('j6', array(lit('06'), col('JUN')))
    df = df.withColumn('j7', array(lit('07'), col('JUL')))
    df = df.withColumn('a8', array(lit('08'), col('AUG')))
    df = df.withColumn('s9', array(lit('09'), col('SEP')))
    df = df.withColumn('o10', array(lit('10'), col('OCT')))
    df = df.withColumn('n11', array(lit('11'), col('NOV')))
    df = df.withColumn('d12', array(lit('12'), col('DEC')))

    df = df.withColumn('months', array('j1','f2','m3','a4','m5','j6','j7','a8','s9','o10','n11','d12'))

    df = df.withColumn('months', explode('months'))

    df = df.withColumn('month', col('months')[0])
    df = df.withColumn(dataCol, col('months')[1])
    return df.select("county_code", "year", "month", dataCol)

In [11]:
def createClimTbls(pcpn_df, tmpMin_df, tmpMax_df, tmpAvg_df):
    """
    Takes in the climate data frames to create the climate table.
    """
    # filter to minnesota and change FIPS code
    pcpn_tbl = pcpn_df.filter(col("FIPS-YEAR").like('21%'))\
        .withColumn("FIPS-YEAR", ((col("FIPS-YEAR")\
        .cast('bigint')+6000000000).cast('string')))
    tmpMin_tbl = tmpMin_df.filter(col("FIPS-YEAR").like('21%'))\
        .withColumn("FIPS-YEAR", ((col("FIPS-YEAR")\
        .cast('bigint')+6000000000).cast('string')))
    tmpMax_tbl = tmpMax_df.filter(col("FIPS-YEAR").like('21%'))\
        .withColumn("FIPS-YEAR", ((col("FIPS-YEAR")\
        .cast('bigint')+6000000000).cast('string')))
    tmpAvg_tbl = tmpAvg_df.filter(col("FIPS-YEAR").like('21%'))\
        .withColumn("FIPS-YEAR", ((col("FIPS-YEAR")\
        .cast('bigint')+6000000000).cast('string')))
    # run the convert function on our four climate tables
    pcpn_tbl = convertClim(pcpn_tbl, 'pcpn')
    tmpMin_tbl = convertClim(tmpMin_tbl, 'tmpMin')
    tmpMax_tbl = convertClim(tmpMax_tbl, 'tmpMax')
    tmpAvg_tbl = convertClim(tmpAvg_tbl, 'tmpAvg')
    return pcpn_tbl, tmpMin_tbl, tmpMax_tbl, tmpAvg_tbl

In [12]:
def joinClimTbls(pcpn_tbl, tmpMin_tbl, tmpMax_tbl, tmpAvg_tbl):
    """
    Takes in the individual climate tables and joins them.
    """
    # Combine all four climate tables
    clim_tbl = pcpn_tbl.join(tmpMin_tbl, ["county_code", "year", "month"])
    clim_tbl = clim_tbl.join(tmpMax_tbl, ["county_code", "year", "month"])
    clim_tbl = clim_tbl.join(tmpAvg_tbl, ["county_code", "year", "month"])
    #add id column and set data types
    clim_tbl = clim_tbl.withColumn("clim_id", monotonically_increasing_id())\
        .withColumn("year", col("year").cast('int'))\
        .withColumn("month", col("month").cast('int'))\
        .withColumn("pcpn", col("pcpn").cast('double'))\
        .withColumn("tmpMin", col("tmpMin").cast('double'))\
        .withColumn("tmpMax", col("tmpMax").cast('double'))\
        .withColumn("tmpAvg", col("tmpAvg").cast('double'))
    return clim_tbl

#### Data Quality Check
Ensure the joining was succesfull.
Row counts for all the tables should be the same and a look at a random entry to spot check validity. 

In [13]:
def chkClimJoin(clim_tbl, pcpn_tbl, tmpMin_tbl, tmpMax_tbl, tmpAvg_tbl):
    """
    Takes a random row out of the climate table and confirms the values match
    against the cliamte data frames.
    """
    randRow = clim_tbl.sample(False, 0.0001).first()
    # print(randRow)
    pcpn_match = pcpn_tbl.select(col("pcpn")).where((col("county_code") == randRow["county_code"])\
                                       & (col("year") == randRow["year"])\
                                       & (col("month") == randRow["month"]))
    tmpMin_match = tmpMin_tbl.select(col("tmpMin")).where((col("county_code") == randRow["county_code"])\
                                       & (col("year") == randRow["year"])\
                                       & (col("month") == randRow["month"]))
    tmpMax_match = tmpMax_tbl.select(col("tmpMax")).where((col("county_code") == randRow["county_code"])\
                                       & (col("year") == randRow["year"])\
                                       & (col("month") == randRow["month"]))
    tmpAvg_match = tmpAvg_tbl.select(col("tmpAvg")).where((col("county_code") == randRow["county_code"])\
                                       & (col("year") == randRow["year"])\
                                       & (col("month") == randRow["month"]))
    # print(float(pcpn_match.first()["pcpn"]), float(tmpMin_match.first()["tmpMin"]), float(tmpMax_match.first()["tmpMax"]), float(tmpAvg_match.first()["tmpAvg"]))
    # All *match should be one row and match the randRow.
    if (pcpn_match.count() != 1)\
        or (tmpMin_match.count() != 1)\
        or (tmpMax_match.count() != 1)\
        or (tmpAvg_match.count() != 1):
        raise Exception("Join failed, too many entries.")
        return False
    elif (randRow["pcpn"] != float(pcpn_match.first()["pcpn"]))\
            or (randRow["tmpMin"] != float(tmpMin_match.first()["tmpMin"]))\
            or (randRow["tmpMax"] != float(tmpMax_match.first()["tmpMax"]))\
            or (randRow["tmpAvg"] != float(tmpAvg_match.first()["tmpAvg"])):
        raise Exception("Join failed, mismatched data.")
        return False
    else:
        return True

### Observations (fact table)
- obsv_id
- date
- species
- county_code
- clim_id
- event
Join in the county data to the phenology set, so we can select it back out.

In [14]:
def createObsvTbl(mnphn_df, mnfips_df, clim_tbl):
    """
    Takes in the phenology and counties data frames and the climate table
    to return the observation table.
    """
    mnphn_df = mnphn_df.join(mnfips_df, lower(mnphn_df.COUNTY) == lower(mnfips_df.Name))
    obsv_tbl = mnphn_df.select(monotonically_increasing_id().alias("obsv_id"), \
        from_unixtime(unix_timestamp(\
            concat_ws("-",col("YEAR"),col("DAY")), 'yyyy-dd-MMM')).cast("date").alias("date"), \
        col("YEAR").cast("int").alias("year"), \
        from_unixtime(unix_timestamp(col("DAY"), 'dd-MMM'), 'MM').cast("int").alias("month"), \
        col("SPECIES").alias("species"), \
        col("FIPS").alias("county_code"), \
        col("EVENT").alias("event"))
    # join with the climate table to extract the clim_ids.
    # this also has the effect of pruning poorly formatted dates from phenology
    obsv_tbl = obsv_tbl.join(clim_tbl, ["county_code", "year", "month"])
    obsv_tbl = obsv_tbl.select("obsv_id", "date", "species", "county_code", "clim_id", "event")
    return obsv_tbl

### Time Table
The most specific time item is date from observations, so go from there.
- date
- day
- week
- month
- year
- weekday

In [15]:
def createTimeTbl(obsv_tbl):
    """
    Takes the observations table and creates the time table on the same scale.
    """
    # create datetime column from original timestamp column
    time_tbl = obsv_tbl.select('date').distinct()

    # extract columns to create time table
    time_tbl = time_tbl.select(
        col("date"),
        dayofmonth(col("date")).alias("day"),
        weekofyear(col("date")).alias("week"),
        month(col("date")).alias("month"),
        year(col("date")).alias("year"),
        date_format(col("date"), "E").alias("weekday")
        )

    return time_tbl

## Write out Analysis Tables to Parquet files

In [16]:
def writeTables():
    """
    Writes out the analysis tables into parquet files.
    "limit(1000)" is added due to size constraints on the Udacity Workspace.
    """
    obsv_tbl.limit(1000).write.partitionBy("date", "county_code").mode("overwrite")\
        .parquet(OUTPUT_FOLDER + "obsv.parquet")
    bio_tbl.limit(1000).write.mode("overwrite")\
        .parquet(OUTPUT_FOLDER + "bio.parquet")
    cnty_tbl.limit(1000).write.partitionBy("state").mode("overwrite")\
        .parquet(OUTPUT_FOLDER + "cnty.parquet")
    clim_tbl.limit(1000).write.partitionBy("year", "county_code").mode("overwrite")\
        .parquet(OUTPUT_FOLDER + "clim.parquet")
    time_tbl.limit(1000).write.partitionBy("year").mode("overwrite")\
        .parquet(OUTPUT_FOLDER + "time.parquet")

## Run ETL

In [17]:
spark = SparkSession \
        .builder \
        .getOrCreate()

Extract and clean-up on Phenology and Counties.
Returns True for county mismatch data quality check.

In [18]:
mnphn_df = processPhenology(spark, "2018.4.6-MNPN-WEBSITE-master-dataset.csv")
mnfips_df = processCounties(spark, "USDA-NRCS_FIPS_Codes.json")
print(chkPhnCntMismatch(mnphn_df, mnfips_df))

True


Extract and clean-up for Percipitation and temperature data. Returns true for a passing check of invalid measurements outside of the appropriate ranges (0-99.99 for percipitation and -50 to 140 for temperature).

In [19]:
pcpn_df = processClim(spark, "climdiv-pcpncy-v1.0.0-20211104", -9.99)
print(chkClimRange(pcpn_df, 0, 99.99))

True


In [20]:
tmpMax_df = processClim(spark, "climdiv-tmaxcy-v1.0.0-20211104", -99.99)
print(chkClimRange(pcpn_df, -50, 140))

True


In [21]:
tmpMin_df = processClim(spark, "climdiv-tmincy-v1.0.0-20211104", -99.99)
print(chkClimRange(pcpn_df, -50, 140))

True


In [22]:
tmpAvg_df = processClim(spark, "climdiv-tmpccy-v1.0.0-20211104", -99.99)
print(chkClimRange(pcpn_df, -50, 140))

True


Create analytics tables.

In [23]:
bio_tbl = createBioTbl(mnphn_df)
cnty_tbl = createCntyTbl(mnfips_df)
pcpn_tbl, tmpMin_tbl, tmpMax_tbl, tmpAvg_tbl \
    = createClimTbls(pcpn_df, tmpMin_df, tmpMax_df, tmpAvg_df)
clim_tbl = joinClimTbls(pcpn_tbl, tmpMin_tbl, tmpMax_tbl, tmpAvg_tbl)
obsv_tbl = createObsvTbl(mnphn_df, mnfips_df, clim_tbl)
time_tbl = createTimeTbl(obsv_tbl)

In [24]:
# Data Qualtiy check on climate table join
print(chkClimJoin(clim_tbl, pcpn_tbl, tmpMin_tbl, tmpMax_tbl, tmpAvg_tbl))

True


In [25]:
writeTables()

## Sample Query
A query to examing percipitation in the month of the last flower.

In [26]:
obsv_tbl.createOrReplaceTempView("OBSV")
bio_tbl.createOrReplaceTempView("BIO")
cnty_tbl.createOrReplaceTempView("CNTY")
clim_tbl.createOrReplaceTempView("CLIM")
time_tbl.createOrReplaceTempView("TIME")

In [27]:
spark.sql("""
        SELECT o.date,
        c.pcpn,
        cy.county,
        o.event,
        b.common_name
        FROM OBSV o
        JOIN CLIM c
        ON o.clim_id == c.clim_id
        JOIN BIO b
        ON o.species == b.species
        JOIN CNTY cy
        ON o.county_code == cy.county_code
        WHERE o.event == 'LAST FLOWER' AND
        cy.county == 'Hennepin'
        ORDER BY c.pcpn DESC
        """).limit(10).toPandas()

Unnamed: 0,date,pcpn,county,event,common_name
0,1987-07-23,10.1,Hennepin,LAST FLOWER,AMERICAN TOAD
1,1987-07-23,10.1,Hennepin,LAST FLOWER,SNOWSHOE HARES
2,1987-07-23,10.1,Hennepin,LAST FLOWER,YELLOW BILLED CUCKOO
3,1987-07-03,10.1,Hennepin,LAST FLOWER,WHITE WATER LILY
4,1987-07-23,10.1,Hennepin,LAST FLOWER,NEW JERSEY TEA
5,1987-07-23,10.1,Hennepin,LAST FLOWER,CUT LEAVED WATER HOREHOUND
6,1987-07-23,10.1,Hennepin,LAST FLOWER,WHITE PRAIRIE CLOVER
7,1987-07-03,10.1,Hennepin,LAST FLOWER,BUTTERFLY WEED
8,1987-07-23,10.1,Hennepin,LAST FLOWER,WATER HOREHOUND
9,1987-07-23,10.1,Hennepin,LAST FLOWER,YELLOW BELLIED CUCKOO
