# Задание 5.

Загрузите [данные по изменению температуры поверхности земли](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data). Для этого может понадобится зарегистрироваться на [Kaggle](https://kaggle.com). Затем нужно будет работать с данными, которые содержатся в файле **GlobalLandTemperaturesByMajorCity.csv**

**NB** Все подсчеты необходимо делать с помощью `PySpark`, без применения `pandas api`. Можно использоать `SQL`.

In [None]:
import zipfile

import pandas as pd
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession

%matplotlib inline

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("PySpark") \
    .getOrCreate()

with zipfile.ZipFile("archive.zip") as z:
    with z.open("GlobalLandTemperaturesByMajorCity.csv") as f:
        pandas_df = pd.read_csv(f)
        df = spark.createDataFrame(pandas_df)

df.show(10)

# Задание 4.1 (1 балл)

В последующих заданиях будут учитываться данные начиная с 01.01.1950. Для этого создайте новый `DataFrame`, в котором удалены все строки до 01.01.1950. Используйте созданный DataFrame в последующих заданиях.  

In [3]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, round

# Создание SparkSession
spark = SparkSession.builder.master("local[*]").appName("PySpark").getOrCreate()

# Путь к файлу
file_path = 'C:/Users/MSI-PC/Desktop/my tipo dz po pitonu/task-4-2024/GlobalLandTemperaturesByMajorCity.csv'

df = spark.read.csv(file_path, header=True, inferSchema=True)

# Преобразование столбца с датами в формат даты
df = df.withColumn('dt', to_date(col('dt'), 'yyyy-MM-dd'))

# Фильтрация строк
filtered_df = df.filter(col('dt') >= '1950-01-01')

rounded_df = filtered_df.withColumn('AverageTemperature', round(col('AverageTemperature'), 2))

rounded_df.show(10)



+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|1950-01-01|             26.77|                        0.239|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-02-01|             27.53|                        0.348|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-03-01|             28.34|                        0.431|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-04-01|             27.83|                        0.467|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-05-01|              26.9|                        0.248|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-06-01|             25.45|                        0.209|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1950-07-01|             24.88|                        0.403|Abidjan|Côte

# Задание 4.2 (2 балла)

Найдите город, для которого выборочная дисперсия температур на приведенных данных максимальна. 

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, variance

# Создание SparkSession
spark = SparkSession.builder.master("local[*]").appName("PySpark").getOrCreate()

# Путь к файлу
file_path = 'C:/Users/MSI-PC/Desktop/my tipo dz po pitonu/task-4-2024/GlobalLandTemperaturesByMajorCity.csv'

df = spark.read.csv(file_path, header=True, inferSchema=True)

# Вычисление выборочной дисперсии температур для каждого города
variance_df = filtered_df.groupBy('City').agg(variance(col('AverageTemperature')).alias('temp_variance'))

# Поиск города с максимальной дисперсией
max_variance_city = variance_df.orderBy(col('temp_variance').desc()).first()

# Вывод ответа
print(f"Город с максимальной дисперсией: {max_variance_city['City']}, Дисперсия: {max_variance_city['temp_variance']}")

# Остановка SparkSession
spark.stop()


Город с максимальной дисперсией: Harbin, Дисперсия: 218.898615951821


# Задание 4.3 (2 баллов)

Посчитайте данные по среднегодовой температуре в Санкт-Петербурге. Определите года, в которых средняя температура была выше, чем в предыдущем  и последующем году. 

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, avg, lag
from pyspark.sql.window import Window

# Создание SparkSession
spark = SparkSession.builder.master("local[*]").appName("PySpark").getOrCreate()

# Путь к файлу
file_path = 'C:/Users/MSI-PC/Desktop/my tipo dz po pitonu/task-4-2024/GlobalLandTemperaturesByMajorCity.csv'

df = spark.read.csv(file_path, header=True, inferSchema=True)

# Преобразование столбца с датами в формат даты
df = df.withColumn('dt', to_date(col('dt'), 'yyyy-MM-dd'))

# Фильтрация строк, относящихся к Санкт-Петербургу
spb_df = df.filter(col('City') == 'Saint Petersburg')

# Вычисление среднегодовой температуры
yearly_avg_temp_df = spb_df.withColumn('year', year(col('dt'))) \
    .groupBy('year') \
    .agg(avg(col('AverageTemperature')).alias('avg_temp')) \
    .orderBy('year')

window_spec = Window.orderBy('year')
yearly_avg_temp_df = yearly_avg_temp_df.withColumn('prev_avg_temp', lag('avg_temp').over(window_spec)) \
    .withColumn('next_avg_temp', lag('avg_temp', -1).over(window_spec))

# Поиск по году
peak_years_df = yearly_avg_temp_df.filter((col('avg_temp') > col('prev_avg_temp')) & (col('avg_temp') > col('next_avg_temp')))

# Вывод результата
peak_years_df.show()

# Остановка SparkSession
spark.stop()


+----+------------------+------------------+------------------+
|year|          avg_temp|     prev_avg_temp|     next_avg_temp|
+----+------------------+------------------+------------------+
|1744|             6.192|             0.416|          -6.38575|
|1754|3.8869166666666657|3.7727500000000003| 3.849083333333334|
|1757| 4.532916666666667|           4.18025|2.1264166666666666|
|1759|3.5047499999999996|2.1264166666666666|           2.19575|
|1761| 4.078166666666667|           2.19575|           3.87675|
|1764| 4.192166666666666|2.8677499999999996|3.9683333333333324|
|1766| 4.296833333333333|3.9683333333333324|3.8264999999999993|
|1770|3.8831666666666664|3.7726666666666673|2.9468333333333336|
|1773| 4.927666666666667| 3.447249999999999| 3.911749999999999|
|1775| 5.293666666666666| 3.911749999999999| 4.320916666666666|
|1779| 5.063166666666667|3.9546666666666668| 3.489166666666667|
|1781|3.8560000000000003| 3.489166666666667|2.8357500000000004|
|1783|3.8662499999999995|2.8357500000000

# Задание 4.4 (4 балла)

Найдите города, для которых: 
1. Разница между максимальным и минимальным значением среднегодовой температуры в выборке максимальна.
2. Самая большая средняя разница между средней температурой января и средней температурой июля.
3. Наибольшее среднее количество месяцев с отрицательной температурой в году.

City with maximum temperature range: Montreal, Range: 13.92222222222222


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `1` cannot be resolved. Did you mean one of the following? [`dt`, `City`, `year`, `month`, `Country`].;
'Project [dt#622, AverageTemperature#631, AverageTemperatureUncertainty#610, City#611, Country#612, Latitude#613, Longitude#614, year#701, month#710, abs(('1 - '7)) AS temp_diff#720]
+- Filter month#710 IN (1,7)
   +- Project [dt#622, AverageTemperature#631, AverageTemperatureUncertainty#610, City#611, Country#612, Latitude#613, Longitude#614, year#701, month(dt#622) AS month#710]
      +- Project [dt#622, AverageTemperature#631, AverageTemperatureUncertainty#610, City#611, Country#612, Latitude#613, Longitude#614, year(dt#622) AS year#701]
         +- Project [dt#622, round(AverageTemperature#609, 2) AS AverageTemperature#631, AverageTemperatureUncertainty#610, City#611, Country#612, Latitude#613, Longitude#614]
            +- Project [to_date(dt#608, Some(yyyy-MM-dd), Some(GMT+03:00), false) AS dt#622, AverageTemperature#609, AverageTemperatureUncertainty#610, City#611, Country#612, Latitude#613, Longitude#614]
               +- Relation [dt#608,AverageTemperature#609,AverageTemperatureUncertainty#610,City#611,Country#612,Latitude#613,Longitude#614] csv
