In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col

spark = SparkSession.builder \
    .appName('Weather') \
    .config('spark.master', 'spark://spark-master:7077') \
    .config('spark.hadoop.fs.defaultFS', 'hdfs://hadoop-namenode:9000') \
    .getOrCreate()

# Чтение исходных данных
df = spark.read.option('header', 'true').csv('hdfs://hadoop-namenode:9000/user/jovyan/input/weather_data.csv')
df.printSchema()

# Преобразование типа столбца
df = df.withColumn('date', to_date(df['date'], 'yyyy-MM-dd'))
df.printSchema()
df.show()

# Поиск пустых значений
# df.filter(
#     " OR ".join([f"{c} IS NULL" for c in df.columns])
# ).show()

# Топ 5 самых жарких дней за все время наблюдений
print('Топ 5 самых жарких дней за все время наблюдений')
df.createOrReplaceTempView('weather')

# print('Отладка')
# spark.sql("""
#     select 
#         station_id, date, precipitation
#     from weather
#     where station_id = 'station_5'
#     order by date desc
# """).show()

spark.sql("""
    with temp_rank as (
        select 
            date,
            temperature,
            rank() over (order by cast(temperature as decimal(12, 10)) desc) as temp_rank
        from weather
    )
    select tr.date, tr.temperature
    from temp_rank tr
    where tr.temp_rank in (1, 2, 3, 4, 5)
    order by temp_rank
""").show()


print('Метеостанция с наибольшм количеством осадков за последний год')

spark.sql("""
    with max_year as (
        select extract(year from max(date)) as last_year
        from weather
    ),
    max_prec as (
        select max(cast(precipitation as decimal(13,10))) as max_prec
        from weather
    )
    select 
        station_id, 
        sum(cast(precipitation as decimal(13,10))) as sum
    from weather
    cross join max_year
    cross join max_prec
    where extract(year from date) = max_year.last_year
    group by station_id
    order by sum desc
    limit 1
""").show()


print('Средняя температура по месяцам за все время наблюдений')

spark.stop()

root
 |-- station_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- temperature: string (nullable = true)
 |-- precipitation: string (nullable = true)
 |-- wind_speed: string (nullable = true)

root
 |-- station_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- temperature: string (nullable = true)
 |-- precipitation: string (nullable = true)
 |-- wind_speed: string (nullable = true)

+----------+----------+-------------------+------------------+-------------------+
|station_id|      date|        temperature|     precipitation|         wind_speed|
+----------+----------+-------------------+------------------+-------------------+
| station_7|2022-06-28| -6.751842212861652| 23.67004407474563|  5.458999894629757|
| station_4|2020-04-07|  -9.57484426026233|2.9858244485444665|  6.828505392085966|
| station_8|2018-12-22|  19.34342035369938| 33.58211407730149|  8.975576079892296|
| station_5|2021-09-09| 30.880953114964086|29.110437988411558|  18.264653608