<a href="https://colab.research.google.com/github/ThanaVi/THCovid19_Report/blob/master/ETL_THCovid19_Sum.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Target

1. จำนวนผู้ป่วย Covid19 จากข้อมูลผู้ป่วย Covid19 ที่เตรียมไว้
2. นำข้อมูลโหลดเข้า BigQuery

#Result

ข้อมูลจาก Source
![alt text](https://drive.google.com/uc?id=1kIi-suj7SuI0R9yBTCfgHvFxQqT_pSPt)

ข้อมูลที่ได้จาก Transformation
![alt text](https://drive.google.com/uc?id=1wfI2JRsBVawv8Ho6fMHs8pZlVvdDd26e)



#Install Pyspark


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xzvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

!pip install pyspark

spark-2.4.5-bin-hadoop2.7/
spark-2.4.5-bin-hadoop2.7/licenses/
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-jtransforms.html
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd-jni.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-xmlenc.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-vis.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-spire.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-sorttable.js.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-slf4j.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-scopt.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-scala.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-sbt-launch-lib.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-respond.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-reflectasm.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-pyrolite.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-py4j.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-protobuf.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-pmml-model

# Import library

In [0]:
from pyspark.sql import SparkSession, Row, functions as fn
from pyspark.sql.window import Window

import pandas as pd

# Create spark session

In [0]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

# Extract

ข้อมูลที่ใช้มาจากการทำ ETL ที่ https://colab.research.google.com/drive/145h5YYKzitwuB6uR-LcNFkiIZUGHiK_N?usp=sharing

In [0]:
covid19_df = spark.read.csv('/content/drive/My Drive/Project/Covid19/single_covid19_data.csv/part-00000-6916f067-9e47-48aa-8d4c-86a4c2571a16-c000.csv', header=True, inferSchema=True)
covid19_df.show()

+---+----+----+-----------+-------------+-----------------+
| no| age| sex|nationality|Announce Date|Province of onset|
+---+----+----+-----------+-------------+-----------------+
|  1|61.0|หญิง|        จีน|     12-01-20|    กรุงเทพมหานคร|
|  2|74.0|หญิง|        จีน|     17-01-20|    กรุงเทพมหานคร|
|  3|73.0|หญิง|        ไทย|     22-01-20|           นครปฐม|
|  4|68.0| ชาย|        จีน|     22-01-20|    กรุงเทพมหานคร|
|  5|66.0|หญิง|        จีน|     24-01-20|    กรุงเทพมหานคร|
|  6|33.0|หญิง|        จีน|     25-01-20|    กรุงเทพมหานคร|
|  7|57.0|หญิง|        จีน|     26-01-20|    กรุงเทพมหานคร|
|  8|73.0|หญิง|        จีน|     26-01-20|  ประจวบคีรีขันธ์|
|  9|63.0| ชาย|        จีน|     28-01-20|    กรุงเทพมหานคร|
| 10|28.0|หญิง|        จีน|     28-01-20|          นนทบุรี|
| 11|33.0| ชาย|        จีน|     28-01-20|    กรุงเทพมหานคร|
| 12|61.0| ชาย|        จีน|     28-01-20|    กรุงเทพมหานคร|
| 13| 6.0| ชาย|        จีน|     28-01-20|    กรุงเทพมหานคร|
| 14|32.0|หญิง|        จีน|     28-01-20

In [0]:
covid19_df.count()

2947

#Transform


นับจำนวนผู้ป่วยต่อวัน

In [0]:
count_covid19_df = covid19_df.groupBy('Announce Date').count()
count_covid19_df.show()

+-------------+-----+
|Announce Date|count|
+-------------+-----+
|     17-04-20|   28|
|     05-04-20|  102|
|     27-04-20|    9|
|     28-01-20|    6|
|     03-04-20|  103|
|     28-02-20|    1|
|     27-03-20|   91|
|     12-04-20|   33|
|     24-01-20|    1|
|     07-03-20|    2|
|     11-03-20|    6|
|     12-01-20|    1|
|     11-02-20|    1|
|     20-03-20|   50|
|     26-04-20|   15|
|     06-04-20|   51|
|     18-03-20|   35|
|     06-03-20|    1|
|     15-04-20|   30|
|     13-03-20|    3|
+-------------+-----+
only showing top 20 rows



เปลี่ยน Format วันที่ เพื่อใช้เรียงลำดับตามวัน

In [0]:
newDate_count_covid19_df = count_covid19_df.withColumn('date', fn.unix_timestamp('Announce Date', 'dd-MM-yy').cast('timestamp'))
newDate_count_covid19_df.show()

+-------------+-----+-------------------+
|Announce Date|count|               date|
+-------------+-----+-------------------+
|     17-04-20|   28|2020-04-17 00:00:00|
|     05-04-20|  102|2020-04-05 00:00:00|
|     27-04-20|    9|2020-04-27 00:00:00|
|     28-01-20|    6|2020-01-28 00:00:00|
|     03-04-20|  103|2020-04-03 00:00:00|
|     28-02-20|    1|2020-02-28 00:00:00|
|     27-03-20|   91|2020-03-27 00:00:00|
|     12-04-20|   33|2020-04-12 00:00:00|
|     24-01-20|    1|2020-01-24 00:00:00|
|     07-03-20|    2|2020-03-07 00:00:00|
|     11-03-20|    6|2020-03-11 00:00:00|
|     12-01-20|    1|2020-01-12 00:00:00|
|     11-02-20|    1|2020-02-11 00:00:00|
|     20-03-20|   50|2020-03-20 00:00:00|
|     26-04-20|   15|2020-04-26 00:00:00|
|     06-04-20|   51|2020-04-06 00:00:00|
|     18-03-20|   35|2020-03-18 00:00:00|
|     06-03-20|    1|2020-03-06 00:00:00|
|     15-04-20|   30|2020-04-15 00:00:00|
|     13-03-20|    3|2020-03-13 00:00:00|
+-------------+-----+-------------

In [0]:
ordered_df = newDate_count_covid19_df.select('date', 'count').orderBy('date')
ordered_df.show()

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2020-01-12 00:00:00|    1|
|2020-01-17 00:00:00|    1|
|2020-01-22 00:00:00|    2|
|2020-01-24 00:00:00|    1|
|2020-01-25 00:00:00|    1|
|2020-01-26 00:00:00|    2|
|2020-01-28 00:00:00|    6|
|2020-01-31 00:00:00|    5|
|2020-02-04 00:00:00|    6|
|2020-02-08 00:00:00|    7|
|2020-02-11 00:00:00|    1|
|2020-02-15 00:00:00|    1|
|2020-02-17 00:00:00|    1|
|2020-02-25 00:00:00|    2|
|2020-02-26 00:00:00|    3|
|2020-02-28 00:00:00|    1|
|2020-02-29 00:00:00|    1|
|2020-03-02 00:00:00|    1|
|2020-03-05 00:00:00|    4|
|2020-03-06 00:00:00|    1|
+-------------------+-----+
only showing top 20 rows



จะเห็นว่าวันที่ไม่ต่อเนื่องกัน จึงได้เพิ่มวันที่ที่หายไป โดยใช้ Pandas  

In [0]:
ordered_df.collect()[-1][0]

datetime.datetime(2020, 4, 29, 0, 0)

In [0]:
start_date = ordered_df.collect()[0][0]
end_date = ordered_df.collect()[-1][0]

In [0]:
start_date, end_date

(datetime.datetime(2020, 1, 12, 0, 0), datetime.datetime(2020, 4, 29, 0, 0))

In [0]:
date_series = pd.date_range(start_date, end_date)

In [0]:
date_series

DatetimeIndex(['2020-01-12', '2020-01-13', '2020-01-14', '2020-01-15',
               '2020-01-16', '2020-01-17', '2020-01-18', '2020-01-19',
               '2020-01-20', '2020-01-21',
               ...
               '2020-04-20', '2020-04-21', '2020-04-22', '2020-04-23',
               '2020-04-24', '2020-04-25', '2020-04-26', '2020-04-27',
               '2020-04-28', '2020-04-29'],
              dtype='datetime64[ns]', length=109, freq='D')

In [0]:
pd_date_df = pd.DataFrame({'date': date_series})
pd_date_df

Unnamed: 0,date
0,2020-01-12
1,2020-01-13
2,2020-01-14
3,2020-01-15
4,2020-01-16
...,...
104,2020-04-25
105,2020-04-26
106,2020-04-27
107,2020-04-28


ทำให้ Dataframe ของ Pandas เป็น Dataframe ของ Spark

In [0]:
spark_date_df = spark.createDataFrame(pd_date_df)
spark_date_df.show()

+-------------------+
|               date|
+-------------------+
|2020-01-12 00:00:00|
|2020-01-13 00:00:00|
|2020-01-14 00:00:00|
|2020-01-15 00:00:00|
|2020-01-16 00:00:00|
|2020-01-17 00:00:00|
|2020-01-18 00:00:00|
|2020-01-19 00:00:00|
|2020-01-20 00:00:00|
|2020-01-21 00:00:00|
|2020-01-22 00:00:00|
|2020-01-23 00:00:00|
|2020-01-24 00:00:00|
|2020-01-25 00:00:00|
|2020-01-26 00:00:00|
|2020-01-27 00:00:00|
|2020-01-28 00:00:00|
|2020-01-29 00:00:00|
|2020-01-30 00:00:00|
|2020-01-31 00:00:00|
+-------------------+
only showing top 20 rows



In [0]:
spark_date_df.count()

109

นำ Dataframe ทั้ง 2 มารวมกันเเล้วให้วันที่เพิ่มมามี count เป็น 0  

In [0]:
add_time_df = ordered_df.join(spark_date_df, on='date', how='full').fillna(0).orderBy('date')
add_time_df.show()

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2020-01-12 00:00:00|    1|
|2020-01-13 00:00:00|    0|
|2020-01-14 00:00:00|    0|
|2020-01-15 00:00:00|    0|
|2020-01-16 00:00:00|    0|
|2020-01-17 00:00:00|    1|
|2020-01-18 00:00:00|    0|
|2020-01-19 00:00:00|    0|
|2020-01-20 00:00:00|    0|
|2020-01-21 00:00:00|    0|
|2020-01-22 00:00:00|    2|
|2020-01-23 00:00:00|    0|
|2020-01-24 00:00:00|    1|
|2020-01-25 00:00:00|    1|
|2020-01-26 00:00:00|    2|
|2020-01-27 00:00:00|    0|
|2020-01-28 00:00:00|    6|
|2020-01-29 00:00:00|    0|
|2020-01-30 00:00:00|    0|
|2020-01-31 00:00:00|    5|
+-------------------+-----+
only showing top 20 rows



นับจำนวนผู้ป่วยโดยรวมกับวันก่อนหน้า

In [0]:
w = Window.rowsBetween(
    Window.unboundedPreceding,
    Window.currentRow
)

In [0]:
sum_covid19_df = add_time_df.withColumn('sum', fn.sum('count').over(w))
sum_covid19_df.show()

+-------------------+-----+---+
|               date|count|sum|
+-------------------+-----+---+
|2020-01-12 00:00:00|    1|  1|
|2020-01-13 00:00:00|    0|  1|
|2020-01-14 00:00:00|    0|  1|
|2020-01-15 00:00:00|    0|  1|
|2020-01-16 00:00:00|    0|  1|
|2020-01-17 00:00:00|    1|  2|
|2020-01-18 00:00:00|    0|  2|
|2020-01-19 00:00:00|    0|  2|
|2020-01-20 00:00:00|    0|  2|
|2020-01-21 00:00:00|    0|  2|
|2020-01-22 00:00:00|    2|  4|
|2020-01-23 00:00:00|    0|  4|
|2020-01-24 00:00:00|    1|  5|
|2020-01-25 00:00:00|    1|  6|
|2020-01-26 00:00:00|    2|  8|
|2020-01-27 00:00:00|    0|  8|
|2020-01-28 00:00:00|    6| 14|
|2020-01-29 00:00:00|    0| 14|
|2020-01-30 00:00:00|    0| 14|
|2020-01-31 00:00:00|    5| 19|
+-------------------+-----+---+
only showing top 20 rows



สร้าง index

In [0]:
index_sum_covid19_df  = sum_covid19_df.withColumn('id', fn.monotonically_increasing_id() + 1)
index_sum_covid19_df.show()

+-------------------+-----+---+---+
|               date|count|sum| id|
+-------------------+-----+---+---+
|2020-01-12 00:00:00|    1|  1|  1|
|2020-01-13 00:00:00|    0|  1|  2|
|2020-01-14 00:00:00|    0|  1|  3|
|2020-01-15 00:00:00|    0|  1|  4|
|2020-01-16 00:00:00|    0|  1|  5|
|2020-01-17 00:00:00|    1|  2|  6|
|2020-01-18 00:00:00|    0|  2|  7|
|2020-01-19 00:00:00|    0|  2|  8|
|2020-01-20 00:00:00|    0|  2|  9|
|2020-01-21 00:00:00|    0|  2| 10|
|2020-01-22 00:00:00|    2|  4| 11|
|2020-01-23 00:00:00|    0|  4| 12|
|2020-01-24 00:00:00|    1|  5| 13|
|2020-01-25 00:00:00|    1|  6| 14|
|2020-01-26 00:00:00|    2|  8| 15|
|2020-01-27 00:00:00|    0|  8| 16|
|2020-01-28 00:00:00|    6| 14| 17|
|2020-01-29 00:00:00|    0| 14| 18|
|2020-01-30 00:00:00|    0| 14| 19|
|2020-01-31 00:00:00|    5| 19| 20|
+-------------------+-----+---+---+
only showing top 20 rows



In [0]:
final_df = index_sum_covid19_df.select('id', 'date', 'count', 'sum')

#Final

In [0]:
final_df.show()

+---+-------------------+-----+---+
| id|               date|count|sum|
+---+-------------------+-----+---+
|  1|2020-01-12 00:00:00|    1|  1|
|  2|2020-01-13 00:00:00|    0|  1|
|  3|2020-01-14 00:00:00|    0|  1|
|  4|2020-01-15 00:00:00|    0|  1|
|  5|2020-01-16 00:00:00|    0|  1|
|  6|2020-01-17 00:00:00|    1|  2|
|  7|2020-01-18 00:00:00|    0|  2|
|  8|2020-01-19 00:00:00|    0|  2|
|  9|2020-01-20 00:00:00|    0|  2|
| 10|2020-01-21 00:00:00|    0|  2|
| 11|2020-01-22 00:00:00|    2|  4|
| 12|2020-01-23 00:00:00|    0|  4|
| 13|2020-01-24 00:00:00|    1|  5|
| 14|2020-01-25 00:00:00|    1|  6|
| 15|2020-01-26 00:00:00|    2|  8|
| 16|2020-01-27 00:00:00|    0|  8|
| 17|2020-01-28 00:00:00|    6| 14|
| 18|2020-01-29 00:00:00|    0| 14|
| 19|2020-01-30 00:00:00|    0| 14|
| 20|2020-01-31 00:00:00|    5| 19|
+---+-------------------+-----+---+
only showing top 20 rows



In [0]:
final_df.orderBy(fn.col('Id').desc()).show()

+---+-------------------+-----+----+
| id|               date|count| sum|
+---+-------------------+-----+----+
|109|2020-04-29 00:00:00|    9|2947|
|108|2020-04-28 00:00:00|    7|2938|
|107|2020-04-27 00:00:00|    9|2931|
|106|2020-04-26 00:00:00|   15|2922|
|105|2020-04-25 00:00:00|   53|2907|
|104|2020-04-24 00:00:00|   15|2854|
|103|2020-04-23 00:00:00|   13|2839|
|102|2020-04-22 00:00:00|   15|2826|
|101|2020-04-21 00:00:00|   19|2811|
|100|2020-04-20 00:00:00|   27|2792|
| 99|2020-04-19 00:00:00|   32|2765|
| 98|2020-04-18 00:00:00|   33|2733|
| 97|2020-04-17 00:00:00|   28|2700|
| 96|2020-04-16 00:00:00|   29|2672|
| 95|2020-04-15 00:00:00|   30|2643|
| 94|2020-04-14 00:00:00|   34|2613|
| 93|2020-04-13 00:00:00|   28|2579|
| 92|2020-04-12 00:00:00|   33|2551|
| 91|2020-04-11 00:00:00|   45|2518|
| 90|2020-04-10 00:00:00|   50|2473|
+---+-------------------+-----+----+
only showing top 20 rows



#Load to BigQuery


In [0]:
final_df.coalesce(1).write.csv('./THCovid19_sum.csv', header=True)

In [0]:
! gcloud auth login

In [0]:
! gcloud config set project cobalt-bond-272904

Updated property [core/project].


In [0]:
! gsutil cp ./THCovid19_sum.csv/part-00000-c98aae8c-a3ed-477e-ae94-8ee343ca66ed-c000.csv gs://thanavi_project_data

Copying file://./THCovid19_sum.csv/part-00000-c98aae8c-a3ed-477e-ae94-8ee343ca66ed-c000.csv [Content-Type=text/csv]...
/ [1 files][  3.7 KiB/  3.7 KiB]                                                
Operation completed over 1 objects/3.7 KiB.                                      


In [0]:
! gsutil mv gs://thanavi_project_data/part-00000-c98aae8c-a3ed-477e-ae94-8ee343ca66ed-c000.csv gs://thanavi_project_data/THCovid19_sum.csv

Copying gs://thanavi_project_data/part-00000-c98aae8c-a3ed-477e-ae94-8ee343ca66ed-c000.csv [Content-Type=text/csv]...
/ [0 files][    0.0 B/  3.7 KiB]                                                / [1 files][  3.7 KiB/  3.7 KiB]                                                Removing gs://thanavi_project_data/part-00000-c98aae8c-a3ed-477e-ae94-8ee343ca66ed-c000.csv...

Operation completed over 1 objects/3.7 KiB.                                      


In [0]:
! gsutil ls gs://thanavi_project_data

gs://thanavi_project_data/THCovid19_Data.csv
gs://thanavi_project_data/THCovid19_sum.csv


In [0]:
! bq ls


Welcome to BigQuery! This script will walk you through the 
process of initializing your .bigqueryrc configuration file.

First, we need to set up your credentials if they do not 
already exist.

Credential creation complete. Now we will select a default project.

List of projects:
  #       projectId          friendlyName    
 --- -------------------- ------------------ 
  1   cobalt-bond-272904   My First Project  
  2   r2de-275802          R2DE              
Found multiple projects. Please enter a selection for 
which should be the default, or leave blank to not 
set a default.

Enter a selection (1 - 2): 1

BigQuery configuration complete! Type "bq" to get started.

     datasetId     
 ----------------- 
  Starbuck_Stores  
  covid19          


In [0]:
! bq load --autodetect covid19.THCovid19_Sum gs://thanavi_project_data/THCovid19_sum.csv

Waiting on bqjob_r795e5a5f8e4512d1_00000171da44777f_1 ... (3s) Current status: DONE   


In [0]:
! bq head -n 10 covid19.THCovid19_Sum

+----+---------------------+-------+-----+
| id |        date         | count | sum |
+----+---------------------+-------+-----+
|  2 | 2020-01-13 00:00:00 |     0 |   1 |
|  3 | 2020-01-14 00:00:00 |     0 |   1 |
|  4 | 2020-01-15 00:00:00 |     0 |   1 |
|  5 | 2020-01-16 00:00:00 |     0 |   1 |
|  7 | 2020-01-18 00:00:00 |     0 |   2 |
|  8 | 2020-01-19 00:00:00 |     0 |   2 |
|  9 | 2020-01-20 00:00:00 |     0 |   2 |
| 10 | 2020-01-21 00:00:00 |     0 |   2 |
| 12 | 2020-01-23 00:00:00 |     0 |   4 |
| 16 | 2020-01-27 00:00:00 |     0 |   8 |
+----+---------------------+-------+-----+


In [0]:
! bq help head

Python script for interacting with BigQuery.


USAGE: bq.py [--global_flags] <command> [--command_flags] [args]


head            Displays rows in a table.

                Examples:
                bq head dataset.table
                bq head -j job
                bq head -n 10 dataset.table
                bq head -s 5 -n 10 dataset.table

                Flags for head:

                  /tools/google-cloud-sdk/platform/bq/bq.py:
                    -j,--[no]job: Reads the results of a query job.
                      (default: 'false')
                    -n,--max_rows: The number of rows to print when showing
                      table data.
                      (default: '100')
                      (an integer)
                    -c,--selected_fields: A subset of fields (including nested
                      fields) to return when showing table data. If not
                      specified, full row will be retrieved. For example,
                      "-c:a,b".
          