In [0]:
silver_country_df = spark.table("silver_land_temperatures_country")


In [0]:
from pyspark.sql.functions import year, col

country_year_df = silver_country_df.withColumn(
    "year", year(col("date"))
)


In [0]:
country_year_avg_df = (
    country_year_df
        .groupBy("country", "year")
        .agg(
            {"AverageTemperature": "avg"}
        )
        .withColumnRenamed(
            "avg(AverageTemperature)",
            "avg_yearly_temperature"
        )
)


In [0]:
from pyspark.sql.functions import avg

country_baseline_df = (
    country_year_avg_df
        .groupBy("country")
        .agg(
            avg("avg_yearly_temperature")
            .alias("historical_avg_temperature")
        )
)


In [0]:
gold_country_df = (
    country_year_avg_df
        .join(
            country_baseline_df,
            on="country",
            how="inner"
        )
)


In [0]:
gold_country_df = gold_country_df.withColumn(
    "temperature_anomaly",
    col("avg_yearly_temperature") - col("historical_avg_temperature")
)


In [0]:
gold_country_df.printSchema()
gold_country_df.show(10, truncate=False)


root
 |-- country: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- avg_yearly_temperature: double (nullable = true)
 |-- historical_avg_temperature: double (nullable = true)
 |-- temperature_anomaly: double (nullable = true)

+-------------------+----+----------------------+--------------------------+---------------------+
|country            |year|avg_yearly_temperature|historical_avg_temperature|temperature_anomaly  |
+-------------------+----+----------------------+--------------------------+---------------------+
|Anguilla           |1853|26.241500000000002    |26.6102766147186          |-0.36877661471859824 |
|Antigua And Barbuda|1891|26.22741666666667     |26.437816203463186        |-0.21039953679651546 |
|Antigua And Barbuda|1962|26.736750000000004    |26.437816203463186        |0.2989337965368186   |
|Argentina          |1971|14.651166666666668    |14.614787473794554        |0.036379192872114885 |
|Armenia            |1879|9.149583333333332     |8.57667016866

In [0]:
(
    gold_country_df
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("gold_climate_country_indicators")
)


In [0]:
spark.sql(
    """
    SELECT country, year, temperature_anomaly
    FROM gold_climate_country_indicators
    ORDER BY year DESC
    LIMIT 10
    """
).show(truncate=False)


+-----------+----+-------------------+
|country    |year|temperature_anomaly|
+-----------+----+-------------------+
|Denmark    |2013|3.1203455497382127 |
|Slovenia   |2013|2.119085929519919  |
|Niue       |2013|0.9645377830203508 |
|Turkey     |2013|2.3909458509142087 |
|South Korea|2013|1.907768304431599  |
|Congo      |2013|0.6829503261378136 |
|Mauritania |2013|1.5275959595959705 |
|Costa Rica |2013|0.8404800613496839 |
|Indonesia  |2013|0.7926573438790463 |
|Switzerland|2013|1.2166034360458502 |
+-----------+----+-------------------+



In [0]:
gold_country_df = spark.table("gold_climate_country_indicators")


In [0]:
from pyspark.sql.functions import stddev

country_anomaly_stats_df = (
    gold_country_df
        .groupBy("country")
        .agg(
            stddev("temperature_anomaly")
            .alias("anomaly_stddev")
        )
)



In [0]:
gold_country_with_stats_df = (
    gold_country_df
        .join(
            country_anomaly_stats_df,
            on="country",
            how="inner"
        )
)


In [0]:
from pyspark.sql.functions import when,col

gold_country_labeled_df = (
    gold_country_with_stats_df
        .withColumn(
            "high_climate_risk",
            when(
                col("temperature_anomaly") > col("anomaly_stddev"),
                1
            ).otherwise(0)
        )
)



In [0]:
gold_country_labeled_df.select(
    "country",
    "year",
    "temperature_anomaly",
    "anomaly_stddev",
    "high_climate_risk"
).orderBy("year", ascending=False).show(15, truncate=False)


+----------------+----+-------------------+-------------------+-----------------+
|country         |year|temperature_anomaly|anomaly_stddev     |high_climate_risk|
+----------------+----+-------------------+-------------------+-----------------+
|Senegal         |2013|1.3378230245492837 |0.5171282308439741 |1                |
|Congo           |2013|0.6829503261378136 |0.4630028550272012 |1                |
|Turkey          |2013|2.3909458509142087 |0.6743681475213483 |1                |
|Costa Rica      |2013|0.8404800613496839 |0.4888799985894067 |1                |
|South Korea     |2013|1.907768304431599  |0.6407764052083886 |1                |
|Niue            |2013|0.9645377830203508 |0.48065576687723277|1                |
|South America   |2013|0.38067484662576234|0.44553606897451453|0                |
|Indonesia       |2013|0.7926573438790463 |0.44345379366440657|1                |
|French Polynesia|2013|0.8088832201413787 |0.4786543338333781 |1                |
|Denmark        

In [0]:
(
    gold_country_labeled_df
        .write
        .format("delta")
        .mode("overwrite")
        .option("mergeSchema", "true")
        .saveAsTable("gold_climate_country_indicators")
)



In [0]:
spark.sql(
    """
    SELECT high_climate_risk, COUNT(*) AS count
    FROM gold_climate_country_indicators
    GROUP BY high_climate_risk
    """
).show()


+-----------------+-----+
|high_climate_risk|count|
+-----------------+-----+
|                0|41788|
|                1| 6455|
+-----------------+-----+



In [0]:
silver_state_df = spark.table("silver_land_temperatures_state")


In [0]:
from pyspark.sql.functions import year, col

state_year_df = silver_state_df.withColumn(
    "year", year(col("date"))
)


In [0]:
state_year_avg_df = (
    state_year_df
        .groupBy("country", "state", "year")
        .agg(
            {"AverageTemperature": "avg"}
        )
        .withColumnRenamed(
            "avg(AverageTemperature)",
            "avg_yearly_temperature"
        )
)


In [0]:
from pyspark.sql.functions import avg

state_baseline_df = (
    state_year_avg_df
        .groupBy("country", "state")
        .agg(
            avg("avg_yearly_temperature")
            .alias("historical_avg_temperature")
        )
)


In [0]:
gold_state_df = (
    state_year_avg_df
        .join(
            state_baseline_df,
            on=["country", "state"],
            how="inner"
        )
)


In [0]:
gold_state_df = gold_state_df.withColumn(
    "temperature_anomaly",
    col("avg_yearly_temperature") - col("historical_avg_temperature")
)


In [0]:
from pyspark.sql.functions import stddev

state_anomaly_stats_df = (
    gold_state_df
        .groupBy("country", "state")
        .agg(
            stddev("temperature_anomaly")
            .alias("anomaly_stddev")
        )
)


In [0]:
from pyspark.sql.functions import when

gold_state_labeled_df = (
    gold_state_df
        .join(
            state_anomaly_stats_df,
            on=["country", "state"],
            how="inner"
        )
        .withColumn(
            "high_climate_risk",
            when(
                col("temperature_anomaly") > col("anomaly_stddev"),
                1
            ).otherwise(0)
        )
)


In [0]:
gold_state_labeled_df.select(
    "country",
    "state",
    "year",
    "temperature_anomaly",
    "high_climate_risk"
).orderBy("year", ascending=False).show(15, truncate=False)


+-------------+----------------------------+----+-------------------+-----------------+
|country      |state                       |year|temperature_anomaly|high_climate_risk|
+-------------+----------------------------+----+-------------------+-----------------+
|United States|Virginia                    |2013|2.003028177451206  |1                |
|Australia    |Australian Capital Territory|2013|0.7102371474864331 |1                |
|Russia       |Novosibirsk                 |2013|2.443347293083901  |1                |
|United States|Alaska                      |2013|2.7879861980460316 |1                |
|United States|Maryland                    |2013|2.208315965102152  |1                |
|China        |Shanxi                      |2013|2.894225812529438  |1                |
|United States|West Virginia               |2013|2.15846900584795   |1                |
|Russia       |Sakhalin                    |2013|1.0657213963963956 |1                |
|Russia       |Aga Buryat       

In [0]:
(
    gold_state_labeled_df
        .write
        .format("delta")
        .mode("overwrite")
        .option("mergeSchema", "true")
        .saveAsTable("gold_climate_state_indicators")
)


In [0]:
spark.sql(
    """
    SELECT high_climate_risk, COUNT(*) AS count
    FROM gold_climate_state_indicators
    GROUP BY high_climate_risk
    """
).show()


+-----------------+-----+
|high_climate_risk|count|
+-----------------+-----+
|                0|48006|
|                1| 5969|
+-----------------+-----+



In [0]:
silver_city_df = spark.table("silver_land_temperatures_city")



In [0]:
from pyspark.sql.functions import year, month, col

city_time_df = (
    silver_city_df
        .withColumn("year", year(col("date")))
        .withColumn("month", month(col("date")))
)


In [0]:
from pyspark.sql.window import Window

city_window = (
    Window
        .partitionBy("country", "city")
        .orderBy("date")
        .rowsBetween(-11, 0)
)


In [0]:
from pyspark.sql.functions import avg

city_rolling_df = city_time_df.withColumn(
    "rolling_12_month_avg_temp",
    avg(col("AverageTemperature")).over(city_window)
)


In [0]:
city_baseline_df = (
    city_rolling_df
        .groupBy("country", "city")
        .agg(
            avg("rolling_12_month_avg_temp")
            .alias("historical_avg_temperature")
        )
)


In [0]:
gold_city_df = (
    city_rolling_df
        .join(
            city_baseline_df,
            on=["country", "city"],
            how="inner"
        )
        .withColumn(
            "temperature_anomaly",
            col("rolling_12_month_avg_temp") - col("historical_avg_temperature")
        )
)


In [0]:
from pyspark.sql.functions import stddev

city_anomaly_stats_df = (
    gold_city_df
        .groupBy("country", "city")
        .agg(
            stddev("temperature_anomaly")
            .alias("anomaly_stddev")
        )
)


In [0]:
from pyspark.sql.functions import when

gold_city_labeled_df = (
    gold_city_df
        .join(
            city_anomaly_stats_df,
            on=["country", "city"],
            how="inner"
        )
        .withColumn(
            "high_climate_risk",
            when(
                col("temperature_anomaly") > col("anomaly_stddev"),
                1
            ).otherwise(0)
        )
)


In [0]:
gold_city_labeled_df.select(
    "country",
    "city",
    "date",
    "rolling_12_month_avg_temp",
    "temperature_anomaly",
    "high_climate_risk"
).orderBy("date", ascending=False).show(15, truncate=False)


+-----------+--------------+----------+-------------------------+--------------------+-----------------+
|country    |city          |date      |rolling_12_month_avg_temp|temperature_anomaly |high_climate_risk|
+-----------+--------------+----------+-------------------------+--------------------+-----------------+
|China      |Tianjin       |2013-09-01|11.389181818181818       |-0.38190884188702867|0                |
|Afghanistan|Kabul         |2013-09-01|14.441454545454546       |0.1284281279561661  |0                |
|Brazil     |Fortaleza     |2013-09-01|28.181545454545457       |1.1839278254759975  |1                |
|Brazil     |Belo Horizonte|2013-09-01|22.237636363636362       |1.1611880202985176  |1                |
|China      |Chongqing     |2013-09-01|17.657363636363637       |0.8390864649406318  |1                |
|Australia  |Sydney        |2013-09-01|18.126636363636365       |1.1219154858313303  |1                |
|China      |Wuhan         |2013-09-01|17.2226363636363

In [0]:
(
    gold_city_labeled_df
        .write
        .format("delta")
        .mode("overwrite")
        .option("mergeSchema", "true")
        .saveAsTable("gold_climate_city_indicators")
)


In [0]:
spark.sql(
    """
    SELECT high_climate_risk, COUNT(*) AS count
    FROM gold_climate_city_indicators
    GROUP BY high_climate_risk
    """
).show()


+-----------------+------+
|high_climate_risk| count|
+-----------------+------+
|                0|212051|
|                1| 27126|
+-----------------+------+



In [0]:
spark.sql(
    """
    SELECT high_climate_risk, COUNT(*) AS count
    FROM gold_climate_city_indicators
    GROUP BY high_climate_risk
    """
).show()


+-----------------+------+
|high_climate_risk| count|
+-----------------+------+
|                0|212051|
|                1| 27126|
+-----------------+------+

