<a href="https://colab.research.google.com/github/Blind5518/pyspark_weather_data/blob/main/weather_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**1. Загрузка библиотек,  импорт и чтение данных**

In [None]:
# установка pyspark и создание SparkSession
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, avg, year, month, desc, col, coalesce

spark = SparkSession.builder.appName("WeatherAnalysis").getOrCreate()



In [None]:
# pyspark-скрипт - читать /weather_data.csv
df = spark.read.csv("/weather_data.csv", header=True, inferSchema=True)

**2. Обработка данных**

In [None]:
# перевод в date и числовые колонки
df = (
    df.withColumn("station_id", col("station_id"))
      .withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
      .withColumn("temperature", col("temperature").cast("double"))
      .withColumn("precipitation", col("precipitation").cast("double"))
      .withColumn("wind_speed", col("wind_speed").cast("double"))
)


# заполняем пропуски средними по каждой станции
station_means = (
    df.groupBy("station_id")
      .agg(
          avg("temperature").alias("temp_mean"),
          avg("precipitation").alias("prec_mean"),
          avg("wind_speed").alias("wind_mean"),
      )
)

df = df.join(station_means, on="station_id", how="left")

# Корректно заполняем пропуски средними по станции:
df = df.withColumn("temperature",  coalesce(col("temperature"), col("temp_mean"))) \
       .withColumn("precipitation", coalesce(col("precipitation"), col("prec_mean"))) \
       .withColumn("wind_speed",    coalesce(col("wind_speed"), col("wind_mean"))) \
       .select("station_id", "date", "temperature", "precipitation", "wind_speed")

**3. Анализ данных**

In [None]:
# 3.1 Топ-5 самых жарких дней
top5_hot = df.orderBy(desc("temperature")).limit(5)

# 3.2 Станция с максимальными осадками за последний год
year_max = df.agg({"date": "max"}).collect()[0][0].year
prec_last_year = (
    df.filter(year(col("date")) == year_max)
      .groupBy("station_id")
      .agg(avg("precipitation").alias("total_prec"))
      .orderBy(desc("total_prec"))
      .limit(1)
)

# 3.3 Средняя температура по месяцам за всё время
monthly_avg_temp = (
    df.groupBy(year(col("date")).alias("year"), month(col("date")).alias("month"))
      .agg(avg("temperature").alias("avg_temp"))
      .orderBy("year", "month")
)

**4. Вывод результатов**

In [None]:
print("Топ-5 самых жарких дней:")
top5_hot.show()

print("Станция с максимальными осадками за последний год:")
prec_last_year.show()

print("Средняя температура по месяцам:")
monthly_avg_temp.show()

Топ-5 самых жарких дней:
+----------+----------+------------------+------------------+------------------+
|station_id|      date|       temperature|     precipitation|        wind_speed|
+----------+----------+------------------+------------------+------------------+
| station_8|2021-08-20|39.982828249354846|31.582175754144583|19.143939270384692|
| station_9|2023-12-02| 39.96797489293784|  38.6258143039702| 19.73126228228821|
| station_9|2022-03-28|  39.8246894248997| 18.61001959126096|11.623908474419132|
| station_3|2019-02-11| 39.76737697836647| 34.56059046932702|18.160301787499485|
| station_6|2020-06-10| 39.69147838355929| 3.466416519388238| 11.75394036575092|
+----------+----------+------------------+------------------+------------------+

Станция с максимальными осадками за последний год:
+----------+-----------------+
|station_id|       total_prec|
+----------+-----------------+
| station_6|35.28693149722922|
+----------+-----------------+

Средняя температура по месяцам:
+----+