In [0]:
from pyspark.sql.functions import col, when, count, isnan, lag,round, lit
from pyspark.sql.window import Window

In [0]:
dbutils.fs.unmount("/mnt/gcg-data"); 
dbutils.fs.mount(source = "wasbs://gcganalysisdatalake@<storage-account>.blob.core.windows.net/", 
                 mount_point = "/mnt/data",
                 "fs.azure.account.key.<storage-account>.blob.core.windows.net", "<storage-account-key>")

/mnt/gcg-data has been unmounted.


True

In [0]:
dbutils.fs.ls("/mnt/gcg-data")

[FileInfo(path='dbfs:/mnt/gcg-data/raw-data/', name='raw-data/', size=0, modificationTime=0)]

In [0]:
spark

In [0]:
rawData = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/data-gcg/gcg-rawdata-1900-2023")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4730312714984602>, line 1[0m
[0;32m----> 1[0m rawData [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)[38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)[38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferSchema[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)[38;5;241m.[39mload([38;5;124m"[39m[38;5;124m/mnt/data-gcg/raw-data/gcg-rawdata-1900-2023[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()


In [0]:
rawData.show(5)

+----+-------------------------------+-----------------------+-------------------+-----------------------------+
|Year|Global Average Temperature (°C)|CO2 Concentration (ppm)|Sea Level Rise (mm)|Arctic Ice Area (million km²)|
+----+-------------------------------+-----------------------+-------------------+-----------------------------+
|1948|                          13.17|                 397.04|             116.25|                         5.97|
|1996|                           13.1|                 313.17|             277.92|                         9.66|
|2015|                          14.67|                 311.95|             290.32|                          8.4|
|1966|                          14.79|                 304.25|             189.71|                        11.83|
|1992|                          13.15|                 354.52|              14.84|                        11.23|
+----+-------------------------------+-----------------------+-------------------+--------------

In [0]:
rawData.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Global Average Temperature (°C): double (nullable = true)
 |-- CO2 Concentration (ppm): double (nullable = true)
 |-- Sea Level Rise (mm): double (nullable = true)
 |-- Arctic Ice Area (million km²): double (nullable = true)



In [0]:
# null_year_count = rawData.filter(col("year").isNull()).count()
# print(null_year_count)

In [0]:
rawData.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in rawData.columns]
   ).show()


+----+-------------------------------+-----------------------+-------------------+-----------------------------+
|Year|Global Average Temperature (°C)|CO2 Concentration (ppm)|Sea Level Rise (mm)|Arctic Ice Area (million km²)|
+----+-------------------------------+-----------------------+-------------------+-----------------------------+
|   0|                              0|                      0|                  0|                            0|
+----+-------------------------------+-----------------------+-------------------+-----------------------------+



In [0]:
# rawData.where(col("Year") == '1900').count()

In [0]:
avgByYear = rawData.groupBy("Year").avg()
avgByYear = avgByYear.orderBy("Year")
avgByYear.show(2)

+----+---------+------------------------------------+----------------------------+------------------------+----------------------------------+
|Year|avg(Year)|avg(Global Average Temperature (°C))|avg(CO2 Concentration (ppm))|avg(Sea Level Rise (mm))|avg(Arctic Ice Area (million km²))|
+----+---------+------------------------------------+----------------------------+------------------------+----------------------------------+
|1900|   1900.0|                  14.506663070837831|          350.37340524991026|      150.40828838547287|                 8.978658755843231|
|1901|   1901.0|                  14.485342725227884|          349.75713981295127|      150.54882798626747|                 8.947272404403929|
+----+---------+------------------------------------+----------------------------+------------------------+----------------------------------+
only showing top 2 rows



In [0]:
avgByYear = avgByYear.select(
    avgByYear["avg(Year)"].alias("Year"),
    avgByYear["avg(Global Average Temperature (°C))"].alias("Avg_GlobalTemperature"),
    avgByYear["avg(CO2 Concentration (ppm))"].alias("Avg_CO2Concentration"),
    avgByYear["avg(Sea Level Rise (mm))"].alias("Avg_SeaLevelRise"),
    avgByYear["avg(Arctic Ice Area (million km²))"].alias("Avg_ArcticIceArea")
)
avgByYear = avgByYear.orderBy("Year")
display(avgByYear.limit(5))

Year,Avg_GlobalTemperature,Avg_CO2Concentration,Avg_SeaLevelRise,Avg_ArcticIceArea
1900.0,14.506663070837831,350.37340524991026,150.40828838547287,8.978658755843231
1901.0,14.485342725227884,349.75713981295127,150.54882798626747,8.947272404403929
1902.0,14.47626190196308,349.2996861408254,152.1748207358646,9.03555424944164
1903.0,14.492360062327704,349.64437492508654,150.1383375284669,9.0565012585401
1904.0,14.494240694493993,349.53703175169466,150.6673183493875,8.990690926388393


In [0]:
# Year over year change in CO2 concentration
YOYDF = avgByYear.withColumn("YOYAvgCo2 change", col("Avg_CO2Concentration")- lag("Avg_CO2Concentration", 1,0).over(Window.orderBy("Year")))

# Year over year SeaLevelRise change in 
YOYDF = YOYDF.withColumn("YOYAvgSeaLevelRise change", col("Avg_SeaLevelRise")- lag("Avg_SeaLevelRise", 1,0).over(Window.orderBy("Year")))

# Year over year SeaLevelRise change in 
YOYDF = YOYDF.withColumn("YOYAvgTemperature  change", col("Avg_GlobalTemperature")- lag("Avg_GlobalTemperature", 1,0).over(Window.orderBy("Year")))

# Year over year Arctic Ice Area change
YOYDF = YOYDF.withColumn("YOYAvgArcticIceArea change", col("Avg_ArcticIceArea")- lag("Avg_ArcticIceArea", 1,0).over(Window.orderBy("Year")))

YOYDF.show(2)

+------+---------------------+--------------------+------------------+-----------------+-------------------+-------------------------+-------------------------+--------------------------+
|  Year|Avg_GlobalTemperature|Avg_CO2Concentration|  Avg_SeaLevelRise|Avg_ArcticIceArea|   YOYAvgCo2 change|YOYAvgSeaLevelRise change|YOYAvgTemperature  change|YOYAvgArcticIceArea change|
+------+---------------------+--------------------+------------------+-----------------+-------------------+-------------------------+-------------------------+--------------------------+
|1900.0|   14.506663070837831|  350.37340524991026|150.40828838547287|8.978658755843231| 350.37340524991026|       150.40828838547287|       14.506663070837831|         8.978658755843231|
|1901.0|   14.485342725227884|  349.75713981295127|150.54882798626747|8.947272404403929|-0.6162654369589973|       0.1405396007945967|     -0.02132034560994711|      -0.03138635143930202|
+------+---------------------+--------------------+---------

In [0]:
yoyCols = [c for c in YOYDF.columns if c.startswith("YOY")]
Cols = [c for c in YOYDF.columns if not c.startswith("YOY")]
YOYTrends = YOYDF.select(
    *Cols,
    *[when(col("Year") == 1900, lit(0)).otherwise(col(c)).alias(c) for c in yoyCols]
)
display(YOYTrends)

Year,Avg_GlobalTemperature,Avg_CO2Concentration,Avg_SeaLevelRise,Avg_ArcticIceArea,YOYAvgCo2 change,YOYAvgSeaLevelRise change,YOYAvgTemperature change,YOYAvgArcticIceArea change
1900.0,14.506663070837831,350.37340524991026,150.40828838547287,8.978658755843231,0.0,0.0,0.0,0.0
1901.0,14.485342725227884,349.75713981295127,150.54882798626747,8.947272404403929,-0.6162654369589973,0.1405396007945967,-0.0213203456099471,-0.031386351439302
1902.0,14.47626190196308,349.2996861408254,152.1748207358646,9.03555424944164,-0.4574536721258937,1.6259927495971451,-0.0090808232648047,0.0882818450377111
1903.0,14.492360062327704,349.64437492508654,150.1383375284669,9.0565012585401,0.3446887842611659,-2.0364832073977084,0.0160981603646259,0.0209470090984584
1904.0,14.494240694493993,349.53703175169466,150.6673183493875,8.990690926388393,-0.1073431733918823,0.5289808209205944,0.0018806321662871,-0.065810332151706
1905.0,14.48622227465785,349.76851698914595,150.27739971684747,9.000759792354875,0.2314852374512952,-0.3899186325400308,-0.0080184198361425,0.0100688659664829
1906.0,14.50161015948021,350.2692876550503,148.9720945067927,8.95512817483756,0.5007706659043265,-1.3053052100547688,0.0153878848223598,-0.0456316175173157
1907.0,14.50735192878338,349.70745163204737,149.4286302670623,8.962523442136497,-0.5618360230029111,0.4565357602695883,0.0057417693031691,0.0073952672989374
1908.0,14.489931636020753,349.90853842527093,149.75017916077329,8.944291607732207,0.2010867932235669,0.3215488937109967,-0.0174202927626261,-0.0182318344042897
1909.0,14.524319831421211,349.47765745727,149.0916892999298,9.020469445094824,-0.4308809680009631,-0.6584898608434742,0.0343881954004583,0.0761778373626178


In [0]:
decadesDF = updatedDF.withColumn("Decade", (col("Year")/10).cast("int")*10)
DecadeTrends = decadesDF.groupBy("Decade").avg("Avg_GlobalTemperature", "Avg_CO2Concentration", "Avg_SeaLevelRise", "Avg_ArcticIceArea")
DecadeTrends = DecadeTrends.select(
    DecadeTrends["Decade"],
    DecadeTrends["avg(Avg_GlobalTemperature)"].alias("Avg_GlobalTemperature"),
    DecadeTrends["avg(Avg_CO2Concentration)"].alias("Avg_CO2Concentration"),
    DecadeTrends["avg(Avg_SeaLevelRise)"].alias("Avg_SeaLevelRise"),
    DecadeTrends["avg(Avg_ArcticIceArea)"].alias("Avg_ArcticIceArea")
)
display(DecadeTrends.sort("Decade"))

Decade,Avg_GlobalTemperature,Avg_CO2Concentration,Avg_SeaLevelRise,Avg_ArcticIceArea
1900,14.49643042852139,349.7743090039253,150.14575859368648,8.989185005677328
1910,14.500568437731667,350.0161496528329,149.50144538689048,9.01370245982721
1920,14.498891876852245,350.2570725515749,150.3264759035073,9.02192778904816
1930,14.498248820165989,350.0380147813354,150.3503860169996,8.995248661272232
1940,14.499918813826108,350.0217499914089,150.12676979829666,9.003961757429886
1950,14.50802846852277,349.9663928939546,150.18039353322604,9.01460197294693
1960,14.502068997194254,349.9734365334796,150.22157935659817,8.994665656305882
1970,14.498896936905163,350.0618070358114,149.84080667877586,8.98789792214057
1980,14.496296796523978,350.0226681673336,149.73946292321386,9.000388239650672
1990,14.501375393031935,349.89330612780736,149.68306312466908,8.984979553435252


In [0]:
#dbutils.fs.mkdirs("/mnt/gcg-data/transform-data")

In [0]:
YOYTrends.write.option("header","true").csv("/mnt/gcg-data/transform-data/YOYTrends")
DecadeTrends.write.option("header","true").csv("/mnt/gcg-data/transform-data/DecadeTrends")