In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ad389f5ac6edb8d25b6ca54e01a124481f293c05c4a3a9d66481b02a7017bc5b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [5]:
import zipfile

import pandas as pd
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession

%matplotlib inline

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("PySpark") \
    .getOrCreate()

with open("GlobalLandTemperaturesByMajorCity.csv") as f:
    pandas_df = pd.read_csv(f)
    df = spark.createDataFrame(pandas_df)

df.show(10)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+
|1849-01-01|            26.704|                        1.435|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-02-01|            27.434|                        1.362|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-03-01|            28.101|                        1.612|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-04-01|             26.14|           1.3869999999999998|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-05-01|            25.427|                          1.2|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-06-01|            24.844|                        1.402|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-07-01|24.058000000000003|                        1.254|Abidjan|Côte

В последующих заданиях будут учитываться данные начиная с 01.01.1950. Для этого создайте новый DataFrame, в котором удалены все строки до 01.01.1950. Используйте созданный DataFrame в последующих заданиях.

In [41]:
from pyspark.sql.functions import col

df = df.withColumn("dt", col("dt").cast("date"))
df = df.filter(col("dt") >= '1950-01-01')
df = df.filter(col("AverageTemperature").isNotNull())
df = df.dropna(subset=["AverageTemperature"])

df.show(10)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+----+-----+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|Year|Month|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+----+-----+
|1950-01-01|26.773000000000003|                        0.239|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|1950|    1|
|1950-02-01|            27.527|                        0.348|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|1950|    2|
|1950-03-01|            28.344|                        0.431|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|1950|    3|
|1950-04-01|             27.83|                        0.467|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|1950|    4|
|1950-05-01|            26.896|                        0.248|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|1950|    5|
|1950-06-01|            25.454|                        0.209|Abidjan|Côte D'Ivoire|   5.

Найдите город, для которого выборочная дисперсия температур на приведенных данных максимальна.

In [48]:
from pyspark.sql.functions import variance, desc

variance_df = df.groupBy("City").agg(variance("AverageTemperature").alias("TemperatureVariance"))
max_variance_city = variance_df.orderBy(desc("TemperatureVariance")).first()

max_variance_city

Row(City='Harbin', TemperatureVariance=218.898615951821)

Посчитайте данные по среднегодовой температуре в Санкт-Петербурге. Определите года, в которых средняя температура была выше, чем в предыдущем и последующем году.

In [43]:
from pyspark.sql.functions import year, avg
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, col

spb_data = df.filter(col("City") == "Saint Petersburg")
spb_data = spb_data.withColumn("Year", year(col("dt")))

# Cреднегодовая температура для каждого года
avg_yearly_temp = spb_data.groupBy("Year").agg(avg("AverageTemperature").alias("AvgTemp")).orderBy("Year")

window_spec = Window.orderBy("Year")
avg_yearly_temp = avg_yearly_temp.withColumn("PrevYearTemp", lag("AvgTemp").over(window_spec))
avg_yearly_temp = avg_yearly_temp.withColumn("NextYearTemp", lead("AvgTemp").over(window_spec))

years_with_high_temps = avg_yearly_temp.filter(
    (col("AvgTemp") > col("PrevYearTemp")) & (col("AvgTemp") > col("NextYearTemp"))
).select("Year")

years_with_high_temps.show()

+----+
|Year|
+----+
|1953|
|1957|
|1959|
|1961|
|1964|
|1967|
|1972|
|1975|
|1977|
|1979|
|1983|
|1986|
|1989|
|1992|
|1995|
|1997|
|2000|
|2002|
|2005|
|2008|
+----+
only showing top 20 rows



Найдите города, для которых:

1. Разница между максимальным и минимальным значением среднегодовой температуры в выборке максимальна.

In [44]:
from pyspark.sql.functions import min, max

df = df.withColumn("Year", year(col("dt")))
avg_yearly_temp_city = df.groupBy("City", "Year").agg(avg("AverageTemperature").alias("AvgTemp"))
temp_diff_city = avg_yearly_temp_city.groupBy("City").agg(
    (max("AvgTemp") - min("AvgTemp")).alias("TempDiff")
)
max_temp_diff_city = temp_diff_city.orderBy(col("TempDiff").desc()).first()

max_temp_diff_city


Row(City='Mashhad', TempDiff=5.250000000000002)

2. Самая большая средняя разница между средней температурой января и средней температурой июля.

In [45]:
from pyspark.sql.functions import month, countDistinct

jan_jul_data = df.filter((month(col("dt")) == 1) | (month(col("dt")) == 7))

jan_jul_avg_temp = jan_jul_data.groupBy("City", "Year", month(col("dt")).alias("Month")).agg(avg("AverageTemperature").alias("AvgTemp"))
jan_avg_temp = jan_jul_avg_temp.filter(col("Month") == 1).select("City", "Year", col("AvgTemp").alias("JanTemp"))
jul_avg_temp = jan_jul_avg_temp.filter(col("Month") == 7).select("City", "Year", col("AvgTemp").alias("JulTemp"))
jan_jul_avg_temp_diff = jan_avg_temp.join(jul_avg_temp, on=["City", "Year"])
jan_jul_avg_temp_diff = jan_jul_avg_temp_diff.withColumn("TempDiff", col("JulTemp") - col("JanTemp"))
avg_temp_diff_city = jan_jul_avg_temp_diff.groupBy("City").agg(avg("TempDiff").alias("AvgTempDiff"))
max_avg_temp_diff_city = avg_temp_diff_city.orderBy(col("AvgTempDiff").desc()).first()

max_avg_temp_diff_city


Row(City='Harbin', AvgTempDiff=41.99271875000001)

3. Наибольшее среднее количество месяцев с отрицательной температурой в году

In [46]:
df = df.withColumn("Month", month(col("dt")))

negative_temp_months = df.filter(col("AverageTemperature") < 0).groupBy("City", "Year").agg(countDistinct("Month").alias("NegativeMonths"))
avg_negative_months_city = negative_temp_months.groupBy("City").agg(avg("NegativeMonths").alias("AvgNegativeMonths"))
max_avg_negative_months_city = avg_negative_months_city.orderBy(col("AvgNegativeMonths").desc()).first()

max_avg_negative_months_city


Row(City='Harbin', AvgNegativeMonths=4.90625)