# Task 4

Загрузите [данные по изменению температуры поверхности земли](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data). Для этого может понадобится зарегистрироваться на [Kaggle](https://kaggle.com). Затем нужно будет работать с данными, которые содержатся в файле **GlobalLandTemperaturesByMajorCity.csv**

**NB** Все подсчеты необходимо делать с помощью `PySpark`, без применения `pandas api`. Можно использоать `SQL`.

In [15]:
import zipfile
import pandas as pd
import matplotlib.pyplot as plt

import pyspark
import pyspark.sql.functions as F

from pyspark.sql.window import Window
from pyspark.sql import SparkSession

In [17]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("PySpark") \
    .getOrCreate()

#Распаковка данных и создание датафрейма
with zipfile.ZipFile("weather_data.zip") as z:
    with z.open("GlobalLandTemperaturesByMajorCity.csv") as f:
        df = spark.createDataFrame(pd.read_csv(f))

In [18]:
df.show(5)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|1849-01-01|            26.704|                        1.435|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-02-01|            27.434|                        1.362|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-03-01|            28.101|                        1.612|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-04-01|             26.14|           1.3869999999999998|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-05-01|            25.427|                          1.2|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
only showing top 5 rows



# Задание 4.1 (1 балл)

В последующих заданиях будут учитываться данные начиная с 01.01.1950. Для этого создайте новый `DataFrame`, в котором удалены все строки до 01.01.1950. Используйте созданный DataFrame в последующих заданиях.

In [25]:
new_df = df.filter((F.year('dt') >= 1950))
new_df = new_df.dropna()
new_df = new_df.withColumn(colName="AverageTemperature", col=F.col("AverageTemperature").cast("double"))

In [26]:
new_df.show(5)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|1950-01-01|26.773000000000003|                        0.239|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-02-01|            27.527|                        0.348|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-03-01|            28.344|                        0.431|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-04-01|             27.83|                        0.467|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-05-01|            26.896|                        0.248|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
only showing top 5 rows



# Задание 4.2 (2 балла)

Найдите город, для которого выборочная дисперсия температур на приведенных данных максимальна.

In [27]:
var_df = new_df.groupBy("City").agg(F.variance("AverageTemperature").alias("TemperatureVariance"))
max_var_city = var_df.orderBy(F.desc("TemperatureVariance")).first()
print(max_var_city['City'])

Harbin


# Задание 4.3 (2 баллов)

Посчитайте данные по среднегодовой температуре в Санкт-Петербурге. Определите года, в которых средняя температура была выше, чем в предыдущем  и последующем году.

In [29]:
spb_df = new_df.filter((F.col("City") == "Saint Petersburg") & F.col("AverageTemperature").isNotNull())
spb_df = spb_df.withColumn(colName="Year", col=F.col("dt").substr(1, 4).cast("int"))

# Опредение средней температуры по годам
avg_temp_by_year = spb_df.groupBy("Year").agg(F.avg("AverageTemperature").alias("AvgTempByYear"))
year_window = Window.orderBy("Year")

# Средние температуры для предыдущего и последующего года
avg_temp_by_year = avg_temp_by_year.withColumn(colName="PrevYearTemperature", col=F.lag("AvgTempByYear").over(year_window))
avg_temp_by_year = avg_temp_by_year.withColumn(colName="NextYearTemperature", col=F.lead("AvgTempByYear").over(year_window))

result3 = avg_temp_by_year.filter(
    (F.col("AvgTempByYear") > F.col("PrevYearTemperature")) &
    (F.col("AvgTempByYear") > F.col("NextYearTemperature"))
)

result3.show()

+----+------------------+-------------------+-------------------+
|Year|     AvgTempByYear|PrevYearTemperature|NextYearTemperature|
+----+------------------+-------------------+-------------------+
|1953| 4.840083333333333|  3.749333333333333|              4.454|
|1957| 4.856249999999999| 2.5654999999999997| 3.2618333333333336|
|1959| 5.026000000000001| 3.2618333333333336|  4.108249999999999|
|1961| 5.842083333333334|  4.108249999999999|  4.038250000000001|
|1964| 4.414166666666666| 3.4835833333333333|  3.702166666666667|
|1967| 4.618666666666667| 2.8411666666666666| 3.4489999999999994|
|1972| 5.294333333333333|  4.157750000000001|  4.421500000000001|
|1975| 5.885249999999999|  5.642583333333334| 2.5989166666666668|
|1977|3.9462500000000005| 2.5989166666666668| 2.6957500000000003|
|1979|4.0605833333333345| 2.6957500000000003| 3.6558333333333333|
|1983| 5.237583333333333|  4.482166666666667|  5.008583333333333|
|1986| 4.054749999999999| 2.6400833333333336| 2.2548333333333335|
|1989| 6.5

# Задание 4.4 (4 балла)

Найдите города, для которых:
1. Разница между максимальным и минимальным значением среднегодовой температуры в выборке максимальна.
2. Самая большая средняя разница между средней температурой января и средней температурой июля.
3. Наибольшее среднее количество месяцев с отрицательной температурой в году.

1. Город, в котором разница между максимальным и минимальным значением среднегодовой температуры в выборке максимальна

In [31]:
df41 = new_df.withColumn("Year", F.col("dt").substr(1, 4).cast("int"))

avg_temp_by_year = df41.groupBy("City", "Year").agg(F.avg("AverageTemperature").alias("AvgTempByYear"))
temp_range = avg_temp_by_year.groupBy("City").agg(
    pyspark.sql.functions.max("AvgTempByYear").alias("MaxAvgTemperature"),
    pyspark.sql.functions.min("AvgTempByYear").alias("MinAvgTemperature")
)

temp_range = temp_range.withColumn("TemperatureRange", F.col("MaxAvgTemperature") - F.col("MinAvgTemperature"))
max_temp_range = temp_range.orderBy(F.desc("TemperatureRange")).first()
max_temp_range['City']

'Mashhad'

2. Город, в котором самая большая разница между средней температурой января и июля

In [32]:
df42 = new_df.withColumn("Month", F.month(F.col("dt")))
temp1 = df42.filter(F.col("Month") == 1).groupBy("City").agg(F.avg("AverageTemperature").alias("JanuaryAvgTemperature"))
temp7 = df42.filter(F.col("Month") == 7).groupBy("City").agg(F.avg("AverageTemperature").alias("JulyAvgTemperature"))
temperature_difference = temp1.join(temp7, "City").withColumn("TemperatureDifference", pyspark.sql.functions.abs(F.col("JulyAvgTemperature") - F.col("JanuaryAvgTemperature")))
max_temperature_difference = temperature_difference.orderBy(F.col("TemperatureDifference").desc()).first()
max_temperature_difference['City']

'Harbin'

3.Город у которого наибольшее среднее количество месяцев с отрицательной температурой в году

In [35]:
df43 = new_df.withColumn("Month", F.month(F.col("dt"))).withColumn("Year", F.year(F.col("dt")))
negative_temperatures_monthly = df43.withColumn("Year", F.year(F.col("dt"))).groupBy("City", "Year", "Month").agg(F.avg("AverageTemperature").alias("AverageTemperature"))
negative_temperatures_monthly = negative_temperatures_monthly.withColumn("UnderZero", F.when(F.col("AverageTemperature") < 0, 1).otherwise(0))
#Количество месяцев с отрицательной температурой в разбивке по городам и годам
negative_temperatures_yearly = negative_temperatures_monthly.groupBy("City", "Year").agg(pyspark.sql.functions.sum("UnderZero").alias("NegativeMonths"))
average_negative_months = negative_temperatures_yearly.groupBy("City").agg(F.avg("NegativeMonths").alias("AverageNegativeMonths"))
max_neg_months_city = average_negative_months.orderBy(F.col("AverageNegativeMonths").desc()).first()
max_neg_months_city['City']

'Harbin'