# Xử lí dữ liệu Covid19 theo ngày sử dụng Spark, AWS EMR
## Load dữ liệu ngày cần phân tích lên hệ thống Hadoop - AWS EMR
1. Chọn ngày cần xử lí

In [1]:
from datetime import date

day = date(2022,11,10)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4,application_1671214930074_0005,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2. Định nghĩa nguồn dữ liệu và nơi lưu trữ file phân tán (AWS S3)

In [2]:
S3_DATA_SOURCE_PATH = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/'+ day.strftime('%m-%d-%Y')+'.csv'
S3_DATA_OUTPUT_PATH = 's3://nhom5-cloud/covid19/' + day.strftime('%d-%m-%Y')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3. Định nghĩa cấu trúc của dữ liệu

In [3]:
from pyspark.sql.types import *
schema = StructType([ 
    StructField("FIPS", StringType(), True),
    StructField("Admin2", StringType(), True),
    StructField("Province_State", StringType(), True),
    StructField("Country_Region", StringType(), True),
    StructField("Last_Update", StringType(), True),
    StructField("Lat", StringType(), True),
    StructField("Long_", StringType(), True),
    StructField("Confirmed", StringType(), True),
    StructField("Deaths", StringType(), True),
    StructField("Recovered", StringType(), True),
    StructField("Active", StringType(), True),
    StructField("Combined_Key", StringType(), True),
    StructField("Incident_Rate", StringType(), True),
    StructField("Case_Fatality_Ratio", StringType(), True),
 ])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4. Tạo phiên viết dữ liệu

In [4]:
from pyspark.sql import SparkSession

spark_write = SparkSession.builder.appName('Nhom5-Cloud-Write').getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5. Đọc dữ liệu và định nghĩa dữ liệu theo cấu trúc

In [5]:
import pandas as pd

df = pd.read_csv(S3_DATA_SOURCE_PATH).fillna('')
all_data = spark_write.createDataFrame(df,schema=schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

6. Lưu dữ liệu dưới dạng file phân tán lên S3

In [6]:
# ghi dữ liệu nếu chưa tồn tại
all_data.write.mode('ignore').parquet(S3_DATA_OUTPUT_PATH)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Lấy dữ liệu theo ngày và xử lí
1. Tạo phiên đọc dữ liệu và đọc dữ liệu từ file phân tán

In [7]:
spark_read = SparkSession.builder.appName('Nhom5-Cloud-Read').getOrCreate()
df_read = spark_read.read.parquet(S3_DATA_OUTPUT_PATH)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2. In ra một số dữ liệu đọc được, số dòng, số cột của tập dữ liệu 

In [8]:
df_read.show(truncate=False)

rows = df_read.count()
print(f"Data from {day} have {rows} rows.")

cols = len(df_read.columns)
print(f"Data from {day} have {cols} columns.")

print(df_read)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+--------------+--------------+-------------------+-----------+------------+---------+------+---------+------+--------------------+------------------+-------------------+
|FIPS   |Admin2   |Province_State|Country_Region|Last_Update        |Lat        |Long_       |Confirmed|Deaths|Recovered|Active|Combined_Key        |Incident_Rate     |Case_Fatality_Ratio|
+-------+---------+--------------+--------------+-------------------+-----------+------------+---------+------+---------+------+--------------------+------------------+-------------------+
|48001.0|Anderson |Texas         |US            |2022-11-11 04:21:22|31.81534745|-95.65354823|10628    |251   |         |      |Anderson, Texas, US |18408.24456568806 |2.361686112156568  |
|48003.0|Andrews  |Texas         |US            |2022-11-11 04:21:22|32.30468633|-102.6376548|4723     |73    |         |      |Andrews, Texas, US  |25249.93317294841 |1.545627778954055  |
|48005.0|Angelina |Texas         |US            |2022-1

3. Xử lí dữ liệu
* Top 10 đất nước có số lượng mắc covid nhiều nhất

In [9]:
from pyspark.sql import functions as func

# Chuyển kiểu dữ liệu của cột 'Confirmed' kiểu String sang Int
converDF = df_read.withColumn('Confirmed', df_read['Confirmed'].cast(IntegerType()))
# Chuyển các dòng dữ liệu bị null thành 0
formatDF = converDF.fillna(value=0,subset=['Confirmed'])
# Tính tổng dữ liệu cột 'Confirmed' theo 'Country_Region'
ConfirmedDF = formatDF.select('Country_Region','Confirmed').groupBy('Country_Region').agg(func.sum('Confirmed').alias('Total'))
ConfirmedDF.sort(ConfirmedDF['Total'].desc()).show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------+
|Country_Region|   Total|
+--------------+--------+
|            US|97985646|
|         India|44664810|
|        France|37288432|
|       Germany|36005025|
|        Brazil|34889576|
|  Korea, South|26091539|
|United Kingdom|24179611|
|         Italy|23642011|
|         Japan|22981382|
|        Russia|21178202|
+--------------+--------+
only showing top 10 rows

* Top 10 đất nước có số lượng tử vong vì covid nhiều nhất

In [10]:
# Chuyển kiểu dữ liệu của cột 'Deaths' kiểu String sang Int
converDF = df_read.withColumn('Deaths', df_read['Deaths'].cast(IntegerType()))
# Chuyển các dòng dữ liệu bị null thành 0
formatDF = converDF.fillna(value=0,subset=['Deaths'])
# Tính tổng dữ liệu cột 'Deaths' theo 'Country_Region'
DeathsDF = formatDF.select('Country_Region','Deaths').groupBy('Country_Region').agg(func.sum('Deaths').alias('Total'))
DeathsDF.sort(DeathsDF['Total'].desc()).show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+-------+
|Country_Region|  Total|
+--------------+-------+
|            US|1074743|
|        Brazil| 688607|
|         India| 530520|
|        Russia| 382949|
|        Mexico| 330430|
|          Peru| 217146|
|United Kingdom| 212810|
|         Italy| 179436|
|     Indonesia| 158989|
|        France| 158711|
+--------------+-------+
only showing top 10 rows

* Top 10 đất nước có số lượng hồi phục sau mắc covid nhiều nhất

In [11]:
# Chuyển kiểu dữ liệu của cột 'Recovered' kiểu String sang Int
converDF = df_read.withColumn('Recovered', df_read['Recovered'].cast(IntegerType()))
# Chuyển các dòng dữ liệu bị null thành 0
formatDF = converDF.fillna(value=0,subset=['Recovered'])
# Tính tổng dữ liệu cột 'Recovered' theo 'Country_Region'
RecoveredDF = formatDF.select('Country_Region','Recovered').groupBy('Country_Region').agg(func.sum('Recovered').alias('Total'))
RecoveredDF.sort(RecoveredDF['Total'].desc()).show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-----+
| Country_Region|Total|
+---------------+-----+
|North Macedonia|    0|
|   Korea, South|    0|
|        Lebanon|    0|
|       Slovenia|    0|
|    New Zealand|    0|
|         Poland|    0|
|       Portugal|    0|
|      Mauritius|    0|
|          China|    0|
|        Somalia|    0|
+---------------+-----+
only showing top 10 rows