In [126]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import month
from pyspark.sql.functions import year
from pyspark.sql.functions import col

spark = SparkSession.builder \
.appName("read")  \
.getOrCreate()
"""
station_id: ID метеостанции
date: Дата наблюдения (в формате YYYY-MM-DD)
temperature: Средняя температура в градусах Цельсия
precipitation: Количество осадков в миллиметрах
wind_speed: Средняя скорость ветра в метрах в секунду
"""
"""
1) Преобразуйте столбец date в формат даты. +
2) Заполните пропущенные значения, если такие в csv файле есть (например, используя средние значения по метеостанциям). + средние по месяцам
3) Найдите топ-5 самых жарких дней за все время наблюдений. +
4) Найдите метеостанцию с наибольшим количеством осадков за последний год.
5) Подсчитайте среднюю температуру по месяцам за все время наблюдений.
"""
df = spark.read.option("header","true").csv("/content/weather_data.csv")
df_date = (df.withColumn("date", df["date"].cast('date'))
                 .withColumn("temperature", df["temperature"].cast('float'))
                 .withColumn("precipitation", df["precipitation"].cast('float'))
                 .withColumn("wind_speed", df["wind_speed"].cast('float'))
                 )
df_date = df_date.withColumn("month", month("date")).alias("month")
df_date = df_date.withColumn("year", year("date")).alias("year")
#df_date.printSchema()

print((df_date.filter((df_date.temperature.isNull() | df_date.precipitation.isNull()
                      | df_date.wind_speed.isNull() | (df_date.temperature == 0.0)
                      | (df_date.precipitation == 0.0) | (df_date.wind_speed == 0.0))).count()))

grouped_df = (df_date.filter((df_date.temperature.isNotNull() & df_date.precipitation.isNotNull()
                      & df_date.wind_speed.isNotNull() & (df_date.temperature != 0.0)
                      & (df_date.precipitation != 0.0) & (df_date.wind_speed != 0.0))).groupBy("station_id", "month")
.agg({"temperature": "avg", "precipitation": "avg", "wind_speed": "avg"})
.withColumnRenamed("avg(temperature)", "avg_temperature")
.withColumnRenamed("avg(precipitation)", "avg_precipitation")
.withColumnRenamed("avg(wind_speed)", "avg_wind_speed")
)
#grouped_df.show()

df_fill_na = (df_date.join(grouped_df, ['station_id', 'month']
                           , how="inner")
     .select(F.coalesce("temperature", "avg_temperature").alias("temperature")
     , F.coalesce("precipitation", "avg_precipitation").alias("precipitation")
     , F.coalesce("wind_speed", "avg_wind_speed").alias("wind_speed")
     , df_date.station_id, df_date.date, df_date.month, df_date.year))
#df_fill_na.where((df_fill_na.station_id=="station_9") & (df_fill_na.date=='2021-10-15')).show()

df_heat_days = (df_fill_na.groupBy("date").agg({"temperature": "max"})
                      .withColumnRenamed("max(temperature)", "temperature"))
#df_top5_heat_days = df_heat_days.orderBy(F.desc('temperature'))
###### 3
df_heat_days.sort('temperature', ascending=False).show(5, truncate=False)
#df_top5_heat_days = df_heat_days.orderBy(F.col("temperature").astype("double").desc())
#df_top5_heat_days = df_heat_days.orderBy(col("temperature").desc())
#df_top5_heat_days.show(5)
df_sum_precipitation = (df_fill_na.groupBy("year","station_id").agg({"precipitation": "sum"})
                      .withColumnRenamed("sum(precipitation)", "sum_precipitation")
                      )
###### 4
df_sum_precipitation.select("station_id","sum_precipitation").orderBy(col("year").desc(),col("sum_precipitation").desc()).show(1)

###### 5
df_avg_temp = (df_fill_na.groupBy("month").agg({"temperature": "avg"})
                      .withColumnRenamed("avg(temperature)", "avg_temperature"))

df_avg_temp.select("month","avg_temperature").orderBy(col("month").asc()).show()

spark.stop()

0
+----------+-----------------+
|date      |temperature      |
+----------+-----------------+
|2021-08-20|39.98283004760742|
|2023-12-02|39.96797561645508|
|2022-03-28|39.82468795776367|
|2019-02-11|39.76737594604492|
|2020-06-10|39.69147872924805|
+----------+-----------------+
only showing top 5 rows

+----------+-----------------+
|station_id|sum_precipitation|
+----------+-----------------+
| station_5|642.9302670955658|
+----------+-----------------+
only showing top 1 row

+-----+------------------+
|month|   avg_temperature|
+-----+------------------+
|    1|11.356518470007797|
|    2|  9.06722993850708|
|    3| 7.244080115937525|
|    4|12.024529053670603|
|    5| 9.902883278588726|
|    6|13.421092370936744|
|    7| 6.185718382110021|
|    8|10.967800280712183|
|    9| 9.596744181906304|
|   10| 9.098843419781097|
|   11| 7.265890174138714|
|   12|11.218592057529005|
+-----+------------------+

