[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](
https://colab.research.google.com/drive/1KWV4ODsr30osaEt7KxfjhoMFop9JqpKh?usp=sharing)

# PySpark ETL Process

PySpark орчныг Jupyter Notebook дээр ашиглахын тулд хамгийн эхлээд PySpark-ийг суулгах хэрэгтэй:

In [2]:
!pip install pyspark



2. SparkSession үүсгэх:
PySpark орчин үүсгэж, SparkSession үүсгэх:

In [3]:
from pyspark.sql import SparkSession
import urllib.request
from io import StringIO

# SparkSession үүсгэж байна
spark = SparkSession.builder.appName("ETL with PySpark").getOrCreate()

# Spark-ийн хувийн конфигурацийг шалгах
print(f"PySpark version: {spark.version}")

PySpark version: 3.5.5


In [4]:
import urllib.request

# URL болон хадгалах зам
url = "https://raw.githubusercontent.com/datasets/covid-19/master/data/countries-aggregated.csv"
file_path = "/content/countries-aggregated.csv"

# Файлыг татаж авах
urllib.request.urlretrieve(url, file_path)


('/content/countries-aggregated.csv',
 <http.client.HTTPMessage at 0x7a589044a750>)

1. Өгөгдөл унших (Extract):
Өгөгдлийг CSV хэлбэрээр Google Colab-аас шууд унших:

In [5]:
# Өгөгдөл унших (CSV форматаар)

# DataFrame-д хөрвүүлэх
df = spark.read.csv(file_path, header=True, inferSchema=True)



# Эхний 5 мөрийг харах
df.show(5)

# ДатаFrame-ийн бүтцийг шалгах
df.printSchema()

# Статистик мэдээллийг харах
df.describe().show()

+----------+-----------+---------+---------+------+
|      Date|    Country|Confirmed|Recovered|Deaths|
+----------+-----------+---------+---------+------+
|2020-01-22|Afghanistan|        0|        0|     0|
|2020-01-23|Afghanistan|        0|        0|     0|
|2020-01-24|Afghanistan|        0|        0|     0|
|2020-01-25|Afghanistan|        0|        0|     0|
|2020-01-26|Afghanistan|        0|        0|     0|
+----------+-----------+---------+---------+------+
only showing top 5 rows

root
 |-- Date: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Deaths: integer (nullable = true)

+-------+-----------+-----------------+------------------+-----------------+
|summary|    Country|        Confirmed|         Recovered|           Deaths|
+-------+-----------+-----------------+------------------+-----------------+
|  count|     161568|           161568|            161568|           16156

2. Өгөгдөл трансформ хийх (Transform):
Өгөгдлийг шүүх, шинэ багануудад өгөгдөл нэмэх:

In [6]:

# Шинэ багана нэмж байна: 'Total' гэдэг баганад нийт тоо гаргах
df = df.withColumn("Total", df["Confirmed"] + df["Deaths"])

# Батлагдсан тохиолдол 1000-аас их
df_filtered = df.filter(df["Confirmed"] > 1000)

# Өгөгдлийн дараалал болон агрегаци
df_grouped = df_filtered.groupBy("Country").sum("Confirmed").orderBy("sum(Confirmed)", ascending=False)

df_grouped.show(10)


+--------------+--------------+
|       Country|sum(Confirmed)|
+--------------+--------------+
|            US|   22963147887|
|         India|   14681423060|
|        Brazil|    9991733077|
|        France|    4588774768|
|United Kingdom|    4376486345|
|        Russia|    4055399602|
|        Turkey|    3505367544|
|       Germany|    3007328664|
|         Italy|    2914670880|
|         Spain|    2754457388|
+--------------+--------------+
only showing top 10 rows



🔑 3. Өгөгдөл хадгалах (Load):
Шинэ өгөгдлийг CSV болон Parquet форматад хадгалах:

In [7]:

# Өгөгдлийг CSV болон Parquet хэлбэрээр хадгалах
df.write.csv("/content/covid_aggregated.csv", header=True)
df.write.parquet("/content/covid_aggregated.parquet")


In [None]:
df = spark.read.csv(file_path, header=True, inferSchema=True)
# Filter for Mongolia
df_mongolia = df.filter(df['Country'] == 'Mongolia')

# Show the data for Mongolia
df_mongolia.show()

# For Mongolian translation, you can set up the labels or display in Mongolian
df_grouped = df_mongolia.groupBy("Country").sum("Confirmed").orderBy("sum(Confirmed)", ascending=False)
df_grouped.show()

# Display result in Mongolian
print("Хамгийн их батлагдсан тохиолдолтой улсууд:")
df_grouped.show()


+----------+--------+---------+---------+------+
|      Date| Country|Confirmed|Recovered|Deaths|
+----------+--------+---------+---------+------+
|2020-01-22|Mongolia|        0|        0|     0|
|2020-01-23|Mongolia|        0|        0|     0|
|2020-01-24|Mongolia|        0|        0|     0|
|2020-01-25|Mongolia|        0|        0|     0|
|2020-01-26|Mongolia|        0|        0|     0|
|2020-01-27|Mongolia|        0|        0|     0|
|2020-01-28|Mongolia|        0|        0|     0|
|2020-01-29|Mongolia|        0|        0|     0|
|2020-01-30|Mongolia|        0|        0|     0|
|2020-01-31|Mongolia|        0|        0|     0|
|2020-02-01|Mongolia|        0|        0|     0|
|2020-02-02|Mongolia|        0|        0|     0|
|2020-02-03|Mongolia|        0|        0|     0|
|2020-02-04|Mongolia|        0|        0|     0|
|2020-02-05|Mongolia|        0|        0|     0|
|2020-02-06|Mongolia|        0|        0|     0|
|2020-02-07|Mongolia|        0|        0|     0|
|2020-02-08|Mongolia