In [1]:
!pip install pyspark




In [2]:

from pyspark.sql import SparkSession
import urllib.request
from io import StringIO

# SparkSession үүсгэж байна
spark = SparkSession.builder.appName("ETL with PySpark").getOrCreate()

# Spark-ийн хувийн конфигурацийг шалгах
print(f"PySpark version: {spark.version}")

PySpark version: 3.5.5


In [3]:
import urllib.request

# URL болон хадгалах зам
url = "https://raw.githubusercontent.com/datasets/covid-19/master/data/countries-aggregated.csv"
file_path = "/content/countries-aggregated.csv"

# Файлыг татаж авах
urllib.request.urlretrieve(url, file_path)

('/content/countries-aggregated.csv',
 <http.client.HTTPMessage at 0x7e5ae477a410>)

In [11]:
# Өгөгдөл унших (CSV форматаар)

# DataFrame-д хөрвүүлэх
df = spark.read.csv(file_path, header=True, inferSchema=True)



# Эхний 5 мөрийг харах
df.show(5)

# ДатаFrame-ийн бүтцийг шалгах
df.printSchema()

# Статистик мэдээллийг харах
df.describe().show()

+----------+-----------+---------+---------+------+
|      Date|    Country|Confirmed|Recovered|Deaths|
+----------+-----------+---------+---------+------+
|2020-01-22|Afghanistan|        0|        0|     0|
|2020-01-23|Afghanistan|        0|        0|     0|
|2020-01-24|Afghanistan|        0|        0|     0|
|2020-01-25|Afghanistan|        0|        0|     0|
|2020-01-26|Afghanistan|        0|        0|     0|
+----------+-----------+---------+---------+------+
only showing top 5 rows

root
 |-- Date: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Deaths: integer (nullable = true)

+-------+-----------+-----------------+------------------+-----------------+
|summary|    Country|        Confirmed|         Recovered|           Deaths|
+-------+-----------+-----------------+------------------+-----------------+
|  count|     161568|           161568|            161568|           16156

In [31]:

# Шинэ багана нэмж байна: 'Total' гэдэг баганад нийт тоо гаргах
df = df.withColumn("Total", df["Confirmed"] + df["Recovered"])
df_filtered.show(11)

# Батлагдсан тохиолдол 1000-аас их Монголын үзүүлэлт
df_filtered = df.filter(df["Country"] == 'Mongolia').filter(df["Confirmed"] > 1000)
df_filtered.show(11)

# Монгол эдгэрсэн хүний нийт тоо
df_grouped = df_filtered.groupBy("Country").sum("Recovered").orderBy("sum(Recovered)", ascending=False)
df_grouped.show(11)


+----------+--------+---------+---------+------+-----+
|      Date| Country|Confirmed|Recovered|Deaths|Total|
+----------+--------+---------+---------+------+-----+
|2020-12-23|Mongolia|     1006|      584|     0| 1590|
|2020-12-24|Mongolia|     1063|      584|     0| 1647|
|2020-12-25|Mongolia|     1069|      711|     0| 1780|
|2020-12-26|Mongolia|     1075|      711|     0| 1786|
|2020-12-27|Mongolia|     1082|      711|     0| 1793|
|2020-12-28|Mongolia|     1121|      745|     0| 1866|
|2020-12-29|Mongolia|     1137|      824|     1| 1961|
|2020-12-30|Mongolia|     1175|      830|     1| 2005|
|2020-12-31|Mongolia|     1195|      837|     1| 2032|
|2021-01-01|Mongolia|     1220|      855|     1| 2075|
|2021-01-02|Mongolia|     1242|      869|     1| 2111|
+----------+--------+---------+---------+------+-----+
only showing top 11 rows

+----------+--------+---------+---------+------+-----+
|      Date| Country|Confirmed|Recovered|Deaths|Total|
+----------+--------+---------+--------

In [18]:
# Өгөгдлийг CSV болон Parquet хэлбэрээр хадгалах
df.write.csv("/content/covid_aggregated.csv", header=True)
df.write.parquet("/content/covid_aggregated.parquet")

In [19]:
df = spark.read.csv(file_path, header=True, inferSchema=True)
# Filter for Mongolia
df_mongolia = df.filter(df['Country'] == 'Mongolia')

# Show the data for Mongolia
df_mongolia.show()

# For Mongolian translation, you can set up the labels or display in Mongolian
df_grouped = df_mongolia.groupBy("Country").sum("Confirmed").orderBy("sum(Confirmed)", ascending=False)
df_grouped.show()

# Display result in Mongolian
print("Хамгийн их батлагдсан тохиолдолтой улсууд:")
df_grouped.show()

+----------+--------+---------+---------+------+
|      Date| Country|Confirmed|Recovered|Deaths|
+----------+--------+---------+---------+------+
|2020-01-22|Mongolia|        0|        0|     0|
|2020-01-23|Mongolia|        0|        0|     0|
|2020-01-24|Mongolia|        0|        0|     0|
|2020-01-25|Mongolia|        0|        0|     0|
|2020-01-26|Mongolia|        0|        0|     0|
|2020-01-27|Mongolia|        0|        0|     0|
|2020-01-28|Mongolia|        0|        0|     0|
|2020-01-29|Mongolia|        0|        0|     0|
|2020-01-30|Mongolia|        0|        0|     0|
|2020-01-31|Mongolia|        0|        0|     0|
|2020-02-01|Mongolia|        0|        0|     0|
|2020-02-02|Mongolia|        0|        0|     0|
|2020-02-03|Mongolia|        0|        0|     0|
|2020-02-04|Mongolia|        0|        0|     0|
|2020-02-05|Mongolia|        0|        0|     0|
|2020-02-06|Mongolia|        0|        0|     0|
|2020-02-07|Mongolia|        0|        0|     0|
|2020-02-08|Mongolia