In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("GlobalTemperatures").getOrCreate()

file_path = "GlobalLandTemperaturesByCity.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.printSchema()

df.show(5)


root
 |-- dt: date (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              NULL|                         NULL|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              NULL|                         NULL|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              NULL|                         NULL|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              NULL

In [2]:
cleaned_df = df.dropna()

columns_to_drop = ["AverageTemperatureUncertainty"]
cleaned_df = cleaned_df.drop(*columns_to_drop)

cleaned_df.show(5)


+----------+------------------+-----+-------+--------+---------+
|        dt|AverageTemperature| City|Country|Latitude|Longitude|
+----------+------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01|14.050999999999998|Århus|Denmark|  57.05N|   10.33E|
|1744-07-01|            16.082|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----+-------+--------+---------+
only showing top 5 rows



In [3]:
from pyspark.sql.functions import col
from pyspark.sql.types import DateType

transformed_df = cleaned_df.withColumn("dt", col("dt").cast(DateType()))
transformed_df.show(5)


+----------+------------------+-----+-------+--------+---------+
|        dt|AverageTemperature| City|Country|Latitude|Longitude|
+----------+------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01|5.7879999999999985|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|            10.644|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01|14.050999999999998|Århus|Denmark|  57.05N|   10.33E|
|1744-07-01|            16.082|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----+-------+--------+---------+
only showing top 5 rows



In [4]:
filtered_df = transformed_df.filter((col("Country") == "United States") & (col("dt").cast("string").substr(1, 4) >= "2000"))
filtered_df.show(5)


+----------+------------------+-------+-------------+--------+---------+
|        dt|AverageTemperature|   City|      Country|Latitude|Longitude|
+----------+------------------+-------+-------------+--------+---------+
|2000-01-01| 8.039000000000001|Abilene|United States|  32.95N|  100.53W|
|2000-02-01|            11.908|Abilene|United States|  32.95N|  100.53W|
|2000-03-01|            14.423|Abilene|United States|  32.95N|  100.53W|
|2000-04-01|            18.274|Abilene|United States|  32.95N|  100.53W|
|2000-05-01|            25.358|Abilene|United States|  32.95N|  100.53W|
+----------+------------------+-------+-------------+--------+---------+
only showing top 5 rows



In [5]:
from pyspark.sql.functions import year, month, dayofmonth

final_df = filtered_df.withColumn("Year", year(col("dt"))) \
                      .withColumn("Month", month(col("dt"))) \
                      .withColumn("Day", dayofmonth(col("dt")))
final_df.show(5)


+----------+------------------+-------+-------------+--------+---------+----+-----+---+
|        dt|AverageTemperature|   City|      Country|Latitude|Longitude|Year|Month|Day|
+----------+------------------+-------+-------------+--------+---------+----+-----+---+
|2000-01-01| 8.039000000000001|Abilene|United States|  32.95N|  100.53W|2000|    1|  1|
|2000-02-01|            11.908|Abilene|United States|  32.95N|  100.53W|2000|    2|  1|
|2000-03-01|            14.423|Abilene|United States|  32.95N|  100.53W|2000|    3|  1|
|2000-04-01|            18.274|Abilene|United States|  32.95N|  100.53W|2000|    4|  1|
|2000-05-01|            25.358|Abilene|United States|  32.95N|  100.53W|2000|    5|  1|
+----------+------------------+-------+-------------+--------+---------+----+-----+---+
only showing top 5 rows



In [6]:
avg_temp_by_city = final_df.groupBy("City").avg("AverageTemperature").withColumnRenamed("avg(AverageTemperature)", "AvgTemperature")
avg_temp_by_city.show(5)


+-------------+------------------+
|         City|    AvgTemperature|
+-------------+------------------+
|   Charleston|19.514345454545456|
|       Corona|16.823933333333333|
|Coral Springs|23.816096969696957|
|    Anchorage|-1.013993902439024|
|    Allentown|10.888254545454545|
+-------------+------------------+
only showing top 5 rows



In [7]:
from pyspark.sql.functions import desc, asc

hottest_cities = avg_temp_by_city.orderBy(desc("AvgTemperature")).limit(5)
hottest_cities.show()

coldest_cities = avg_temp_by_city.orderBy(asc("AvgTemperature")).limit(5)
coldest_cities.show()


+----------------+------------------+
|            City|    AvgTemperature|
+----------------+------------------+
|Port Saint Lucie|23.816096969696957|
|  Pembroke Pines|23.816096969696957|
| Fort Lauderdale|23.816096969696957|
|   Coral Springs|23.816096969696957|
|         Hialeah|23.816096969696957|
+----------------+------------------+

+-----------+------------------+
|       City|    AvgTemperature|
+-----------+------------------+
|  Anchorage|-1.013993902439024|
|     Arvada| 3.391303030303032|
|Minneapolis| 6.432606060606066|
| Saint Paul| 6.432606060606066|
|Sioux Falls|7.7314303030303035|
+-----------+------------------+



In [8]:
ny_avg_temp = final_df.filter(col("City") == "New York").groupBy("Year").avg("AverageTemperature").withColumnRenamed("avg(AverageTemperature)", "YearlyAvgTemperature")
ny_avg_temp.show()


+----+--------------------+
|Year|YearlyAvgTemperature|
+----+--------------------+
|2003|               9.836|
|2007|  10.627333333333334|
|2006|            11.51925|
|2013|  12.163888888888886|
|2004|             10.3895|
|2012|             11.9715|
|2009|  10.141833333333334|
|2001|  10.930999999999997|
|2005|  10.681416666666665|
|2000|   9.969083333333334|
|2010|  11.357583333333332|
|2011|            11.27225|
|2008|  10.641666666666666|
|2002|  11.252166666666668|
+----+--------------------+



In [None]:
import matplotlib.pyplot as plt

hottest_cities_data = hottest_cities.toPandas()

plt.figure(figsize=(10, 6))
plt.bar(hottest_cities_data["City"], hottest_cities_data["AvgTemperature"], color='red')
plt.title("Top 5 Hottest Cities")
plt.xlabel("City")
plt.ylabel("Average Temperature")
plt.savefig("hottest_cities_chart.png")
plt.show()


In [None]:
final_df.write.partitionBy("Day").csv("output/cleaned_data", header=True, mode="overwrite")
