## COVID-19 Ingestion Pipeline

### Step 1: get COVID-19 dataset and copy daily report files to GCP bucket (Skip this step)

In [None]:
bucket_name = "covid-19-csv"

In [None]:
!git clone https://github.com/CSSEGISandData/COVID-19.git

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/01-*.csv gs://{bucket_name}/01-22-to-03-21

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/02-*.csv gs://{bucket_name}/01-22-to-03-21

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-0*.csv gs://{bucket_name}/01-22-to-03-21

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-1*.csv gs://{bucket_name}/01-22-to-03-21

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-20-2020.csv gs://{bucket_name}/01-22-to-03-21

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-21-2020.csv gs://{bucket_name}/01-22-to-03-21

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-22-2020.csv gs://{bucket_name}/03-22-to-now

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-23-2020.csv gs://{bucket_name}/03-22-to-now

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-24-2020.csv gs://{bucket_name}/03-22-to-now

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-25-2020.csv gs://{bucket_name}/03-22-to-now

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-26-2020.csv gs://{bucket_name}/03-22-to-now

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-27-2020.csv gs://{bucket_name}/03-22-to-now

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-28-2020.csv gs://{bucket_name}/03-22-to-now

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-29-2020.csv gs://{bucket_name}/03-22-to-now

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/03-3*.csv gs://{bucket_name}/03-22-to-now

In [None]:
!gsutil cp COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/04-*.csv gs://{bucket_name}/03-22-to-now

### Step 2: load files into BQ (Start from here)

In [None]:
dataset_id = "covid_19_staging"

In [None]:
!bq --location=US mk --dataset {dataset_id}

In [None]:
!bq --location=US load --skip_leading_rows=1 --allow_jagged_rows=true --source_format=CSV {dataset_id}.Winter_Cases \
gs://covid-19-csv/01-22-to-03-21/*.csv \
state:STRING,country:STRING,last_update:STRING,confirmed:INTEGER,deaths:INTEGER,recovered:INTEGER,latitude:NUMERIC,longitude:NUMERIC

In [None]:
!bq --location=US load --skip_leading_rows=1 --allow_jagged_rows=true --source_format=CSV {dataset_id}.Spring_Cases \
gs://covid-19-csv/03-22-to-now/*.csv \
fips:INTEGER,admin2:String,state:STRING,country:STRING,last_update:STRING,latitude:STRING,longitude:STRING,confirmed:INTEGER,deaths:INTEGER,recovered:INTEGER,active:INTEGER,combined_key:STRING

### Step 3: Validate load

In [None]:
%%bigquery
select count(*) as winter_cases from covid_19_staging.Winter_Cases

In [None]:
%%bigquery
select count(*) as spring_cases from covid_19_staging.Spring_Cases

#### Task 1: Retrieve some sample records from both tables

In [None]:
%%bigquery
select min(last_update) as oldest_timestamp, max(last_update) as newest_timestamp
from covid_19_staging.Winter_Cases

In [None]:
%%bigquery
select min(last_update) as oldest_timestamp, max(last_update) as newest_timestamp
from covid_19_staging.Spring_Cases

#### List number of records per timestamp

In [None]:
%%bigquery
select last_update as timestamp, count(*) as number_of_records
from covid_19_staging.Winter_Cases
group by last_update
order by last_update 
limit 10

In [None]:
%%bigquery
select last_update as timestamp, count(*) as number_of_records
from covid_19_staging.Spring_Cases
group by last_update
order by last_update 
limit 10

#### Task 2: Describe the two table schemas

#### Task 3: Merge the two tables

In [None]:
%%bigquery
create table covid_19_staging.Cases
(fips INT64,
admin2 String,
state STRING,
country STRING,
last_update STRING,
latitude NUMERIC,
longitude NUMERIC,
confirmed INT64,
deaths INT64,
recovered INT64,
active INT64,
combined_key STRING); 