In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [3]:
# Создание SparkSession
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("WordCount") \
    .getOrCreate()

In [28]:
# Чтение CSV файла
df = spark.read.csv("hdfs://namenode:8020/airports.dat", header=True, inferSchema=True)


In [29]:
# Вывод первых 5 строк
df.show(2)


+---+--------------------+-----------+----------------+---+----+------------------+------------------+----+---+---+--------------------+-------+-----------+
|  1|      Goroka Airport|     Goroka|Papua New Guinea|GKA|AYGA|-6.081689834590001|     145.391998291|5282| 10|  U|Pacific/Port_Moresby|airport|OurAirports|
+---+--------------------+-----------+----------------+---+----+------------------+------------------+----+---+---+--------------------+-------+-----------+
|  2|      Madang Airport|     Madang|Papua New Guinea|MAG|AYMD|    -5.20707988739|     145.789001465|  20| 10|  U|Pacific/Port_Moresby|airport|OurAirports|
|  3|Mount Hagen Kagam...|Mount Hagen|Papua New Guinea|HGU|AYMH|-5.826789855957031|144.29600524902344|5388| 10|  U|Pacific/Port_Moresby|airport|OurAirports|
+---+--------------------+-----------+----------------+---+----+------------------+------------------+----+---+---+--------------------+-------+-----------+
only showing top 2 rows



In [18]:
# Фильтрация данных
high_price_products = df.filter(col("_c3") == "Kazakhstan")
high_price_products.show()

+----+--------------------+---------------+----------+---+----+------------------+-----------------+----+---+----+--------------+-------+-----------+
| _c0|                 _c1|            _c2|       _c3|_c4| _c5|               _c6|              _c7| _c8|_c9|_c10|          _c11|   _c12|       _c13|
+----+--------------------+---------------+----------+---+----+------------------+-----------------+----+---+----+--------------+-------+-----------+
|2908|      Almaty Airport|       Alma-ata|Kazakhstan|ALA|UAAA| 43.35210037231445|77.04049682617188|2234|  6|   U|Asia/Qyzylorda|airport|OurAirports|
|2909|    Balkhash Airport|       Balkhash|Kazakhstan|BXH|UAAH|  46.8932991027832|75.00499725341797|1446|  6|   U|Asia/Qyzylorda|airport|OurAirports|
|2910|Astana Internatio...|    Tselinograd|Kazakhstan|TSE|UACC| 51.02220153808594|71.46690368652344|1165|  6|   U|Asia/Qyzylorda|airport|OurAirports|
|2911|       Taraz Airport|       Dzhambul|Kazakhstan|DMB|UADD|42.853599548339844|71.30359649658203|

In [19]:
# Присвоение имен колонкам
df = df.withColumnRenamed("_c0", "id") \
       .withColumnRenamed("_c1", "name") \
       .withColumnRenamed("_c2", "city") \
       .withColumnRenamed("_c3", "country") \
       .withColumnRenamed("_c4", "iata_code") \
       .withColumnRenamed("_c5", "icao_code") \
       .withColumnRenamed("_c6", "latitude") \
       .withColumnRenamed("_c7", "longitude") \
       .withColumnRenamed("_c8", "altitude") \
       .withColumnRenamed("_c9", "timezone") \
       .withColumnRenamed("_c10", "type") \
       .withColumnRenamed("_c11", "region") \
       .withColumnRenamed("_c12", "object_type") \
       .withColumnRenamed("_c13", "source")

In [20]:
# Вывод первых 5 строк
df.show(5)

+---+--------------------+------------+----------------+---------+---------+------------------+------------------+--------+--------+----+--------------------+-----------+-----------+
| id|                name|        city|         country|iata_code|icao_code|          latitude|         longitude|altitude|timezone|type|              region|object_type|     source|
+---+--------------------+------------+----------------+---------+---------+------------------+------------------+--------+--------+----+--------------------+-----------+-----------+
|  1|      Goroka Airport|      Goroka|Papua New Guinea|      GKA|     AYGA|-6.081689834590001|     145.391998291|    5282|      10|   U|Pacific/Port_Moresby|    airport|OurAirports|
|  2|      Madang Airport|      Madang|Papua New Guinea|      MAG|     AYMD|    -5.20707988739|     145.789001465|      20|      10|   U|Pacific/Port_Moresby|    airport|OurAirports|
|  3|Mount Hagen Kagam...| Mount Hagen|Papua New Guinea|      HGU|     AYMH|-5.826789

In [23]:
# Вывод только определенных колонок
selected_columns = df.select("name", "city", "country", "iata_code", "timezone")
selected_columns.show(5)

+--------------------+------------+----------------+---------+--------+
|                name|        city|         country|iata_code|timezone|
+--------------------+------------+----------------+---------+--------+
|      Goroka Airport|      Goroka|Papua New Guinea|      GKA|      10|
|      Madang Airport|      Madang|Papua New Guinea|      MAG|      10|
|Mount Hagen Kagam...| Mount Hagen|Papua New Guinea|      HGU|      10|
|      Nadzab Airport|      Nadzab|Papua New Guinea|      LAE|      10|
|Port Moresby Jack...|Port Moresby|Papua New Guinea|      POM|      10|
+--------------------+------------+----------------+---------+--------+
only showing top 5 rows



In [24]:
kazAirports = selected_columns.filter(col("_c3") == "Kazakhstan")
kazAirports .show()

+--------------------+---------------+----------+---------+--------+
|                name|           city|   country|iata_code|timezone|
+--------------------+---------------+----------+---------+--------+
|      Almaty Airport|       Alma-ata|Kazakhstan|      ALA|       6|
|    Balkhash Airport|       Balkhash|Kazakhstan|      BXH|       6|
|Astana Internatio...|    Tselinograd|Kazakhstan|      TSE|       6|
|       Taraz Airport|       Dzhambul|Kazakhstan|      DMB|       6|
|    Shymkent Airport|       Chimkent|Kazakhstan|      CIT|       6|
|      Uralsk Airport|         Uralsk|Kazakhstan|      URA|       5|
|    Pavlodar Airport|       Pavlodar|Kazakhstan|      PWQ|       6|
|Semipalatinsk Air...|   Semiplatinsk|Kazakhstan|      PLX|       6|
|      Aktobe Airport|     Aktyubinsk|Kazakhstan|      AKX|       5|
|      Atyrau Airport|         Atyrau|Kazakhstan|      GUW|       5|
|Kzyl-Orda Southwe...|      Kzyl-Orda|Kazakhstan|      KZO|       6|
|       Aktau Airport|          Ak

In [25]:
kazAirports = selected_columns.filter(col("_c3") == "Russia")
kazAirports .show()

+--------------------+-----------------+-------+---------+--------+
|                name|             city|country|iata_code|timezone|
+--------------------+-----------------+-------+---------+--------+
|     Yakutsk Airport|          Yakutsk| Russia|      YKS|       9|
|       Mirny Airport|           Mirnyj| Russia|      MJZ|       9|
|   Ignatyevo Airport|   Blagoveschensk| Russia|      BQS|       9|
|Khabarovsk-Novy A...|       Khabarovsk| Russia|      KHV|      10|
|Provideniya Bay A...|  Provideniya Bay| Russia|      PVS|      12|
|       Sokol Airport|          Magadan| Russia|      GDX|      11|
|       Pevek Airport|            Pevek| Russia|      PWE|      12|
|    Yelizovo Airport|    Petropavlovsk| Russia|      PKC|      12|
|Yuzhno-Sakhalinsk...|Yuzhno-sakhalinsk| Russia|      UUS|      11|
|Vladivostok Inter...|      Vladivostok| Russia|      VVO|      10|
|Chita-Kadala Airport|            Chita| Russia|      HTA|       9|
|      Bratsk Airport|           Bratsk| Russia|