<a href="https://colab.research.google.com/github/glincow/netology-spark-sql/blob/main/%D0%9D%D0%B5%D1%82%D0%BE%D0%BB%D0%BE%D0%B3%D0%B8%D1%8F_Spark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Установка pyspark с помощью pip

In [None]:
pip install --quiet pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practice").getOrCreate()

# Чтение из файла

In [None]:
from pyspark import SparkFiles

covid_data_file_url = "https://raw.githubusercontent.com/glincow/netology-spark-sql/main/data/covid-data.csv"
spark.sparkContext.addFile(covid_data_file_url)
file_path  = 'file://' + SparkFiles.get('covid-data.csv')
df = spark.read.option('inferSchema', 'true').option('header', 'true').csv(file_path)

# Основные операции

In [None]:
df.show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------------+--

In [None]:
df.schema

In [None]:
df.schema.json()

In [None]:
df.printSchema()

# Выборка и фильтрация

In [None]:
df.show()

In [None]:
df.select('iso_code', 'location', 'date', 'total_cases').show()

In [None]:
from pyspark.sql import functions as F

df_russia = df.select('iso_code', 'location', 'date', 'total_cases').where(F.col('location').startswith("Rus"))
df_russia.show()

In [None]:
df_russia.count()

# Сортировка

In [None]:
df_russia.sort('date').desc().show()

In [None]:
df_russia.sort(F.col('date').desc()).show()

# Операции над колонками

In [43]:
df_deaths = df.select("location", F.col('total_deaths') / 1000).sort(F.col('date').desc())
df_deaths.show()

+-------------------+---------------------+
|           location|(total_deaths / 1000)|
+-------------------+---------------------+
|    North Macedonia|                4.419|
|            Belgium|               23.718|
|              Samoa|                 NULL|
|           Colombia|               67.931|
|             Norway|                0.708|
|             Belize|                0.318|
|         Montenegro|                1.427|
|        Afghanistan|                2.539|
|            Oceania|                 1.02|
|              Benin|                0.095|
|            Namibia|                0.602|
|            Albania|                 2.34|
|               Oman|                1.821|
|             Bhutan|                0.001|
|            Myanmar|                3.206|
|            Andorra|                0.123|
|           Pakistan|               16.243|
|            Bolivia|               12.625|
|              Nepal|                3.075|
|Antigua and Barbuda|           

In [45]:
# Переименование колонки у существующего Dataframe
# df.withColumnRenamed("<old_name>", "<new_name>")

df_deaths.withColumnRenamed("(total_deaths / 1000)", "total_deaths_per_1000").show()

+-------------------+---------------------+
|           location|total_deaths_per_1000|
+-------------------+---------------------+
|    North Macedonia|                4.419|
|            Belgium|               23.718|
|              Samoa|                 NULL|
|           Colombia|               67.931|
|             Norway|                0.708|
|             Belize|                0.318|
|         Montenegro|                1.427|
|        Afghanistan|                2.539|
|            Oceania|                 1.02|
|              Benin|                0.095|
|            Namibia|                0.602|
|            Albania|                 2.34|
|               Oman|                1.821|
|             Bhutan|                0.001|
|            Myanmar|                3.206|
|            Andorra|                0.123|
|           Pakistan|               16.243|
|            Bolivia|               12.625|
|              Nepal|                3.075|
|Antigua and Barbuda|           

In [46]:
# Переименование колонки "на лету"
df_deaths = df.select("location", (F.col('total_deaths') / 1000).alias("total_deaths_per_1000")).sort(F.col('date').desc())
df_deaths.show()

+-------------------+---------------------+
|           location|total_deaths_per_1000|
+-------------------+---------------------+
|    North Macedonia|                4.419|
|            Belgium|               23.718|
|              Samoa|                 NULL|
|           Colombia|               67.931|
|             Norway|                0.708|
|             Belize|                0.318|
|         Montenegro|                1.427|
|        Afghanistan|                2.539|
|            Oceania|                 1.02|
|              Benin|                0.095|
|            Namibia|                0.602|
|            Albania|                 2.34|
|               Oman|                1.821|
|             Bhutan|                0.001|
|            Myanmar|                3.206|
|            Andorra|                0.123|
|           Pakistan|               16.243|
|            Bolivia|               12.625|
|              Nepal|                3.075|
|Antigua and Barbuda|           

In [49]:
# Условные операции
df_deaths.select("*", F.when(F.col("total_deaths_per_1000")>10, "Red").otherwise("Green").alias("status")).show()

+-------------------+---------------------+------+
|           location|total_deaths_per_1000|status|
+-------------------+---------------------+------+
|    North Macedonia|                4.419| Green|
|            Belgium|               23.718|   Red|
|              Samoa|                 NULL| Green|
|           Colombia|               67.931|   Red|
|             Norway|                0.708| Green|
|             Belize|                0.318| Green|
|         Montenegro|                1.427| Green|
|        Afghanistan|                2.539| Green|
|            Oceania|                 1.02| Green|
|              Benin|                0.095| Green|
|            Namibia|                0.602| Green|
|            Albania|                 2.34| Green|
|               Oman|                1.821| Green|
|             Bhutan|                0.001| Green|
|            Myanmar|                3.206| Green|
|            Andorra|                0.123| Green|
|           Pakistan|          

# Группировки и аггрегации

In [50]:
df.groupBy("location").sum("people_fully_vaccinated").show()

+-------------+----------------------------+
|     location|sum(people_fully_vaccinated)|
+-------------+----------------------------+
|         Chad|                        NULL|
|     Anguilla|                        NULL|
|International|                        NULL|
|        Macao|                    255381.0|
|       Guyana|                        NULL|
|      Eritrea|                        NULL|
|       Jersey|                    163501.0|
|     Djibouti|                        NULL|
|     Malaysia|                   6895196.0|
|         Fiji|                        NULL|
|       Malawi|                        NULL|
|         Iraq|                        NULL|
|       Europe|               1.682348426E9|
|      Germany|                2.20678679E8|
|      Comoros|                        NULL|
|  Afghanistan|                        NULL|
|     Cambodia|                   7573974.0|
|       Jordan|                    609113.0|
|     Maldives|                    199271.0|
|       Fr

In [53]:
df_agg = df.groupBy("location").agg(F.max("new_deaths").alias("Max new deaths"), F.sum("new_cases").alias("Total cases"))
df_agg.show()

+-------------+--------------+-----------+
|     location|Max new deaths|Total cases|
+-------------+--------------+-----------+
|         Chad|          10.0|     4691.0|
|     Anguilla|          NULL|       NULL|
|International|           3.0|      721.0|
|        Macao|          NULL|       NULL|
|       Guyana|           6.0|    11762.0|
|      Eritrea|           3.0|     3491.0|
|       Jersey|          NULL|       NULL|
|     Djibouti|           9.0|    10412.0|
|     Malaysia|          25.0|   372859.0|
|         Fiji|           1.0|       72.0|
|       Malawi|          73.0|    33919.0|
|         Iraq|         122.0|   970987.0|
|       Europe|        7554.0|4.2981282E7|
|      Germany|        1734.0|  3154305.0|
|      Comoros|          12.0|     3815.0|
|  Afghanistan|          46.0|    57721.0|
|     Cambodia|           4.0|     5771.0|
|       Jordan|         111.0|   683466.0|
|     Maldives|           2.0|    26145.0|
|       France|        1438.0|  5321176.0|
+----------

# Запись в файл

In [55]:
# df.write.parquet("<path>")
# df.write.format("delta").save("<path>")

df_agg.write.csv("/content/result2")