In [None]:
! pip install s3fs
! PYTHONWARNINGS="ignore"

In [None]:
import boto3
from botocore import UNSIGNED
from botocore.client import Config
from collections import defaultdict
from datetime import datetime
import glob
import logging
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pathlib
import requests
import us
import shutil
from typing import Dict, List, Optional
import warnings
%matplotlib inline

warnings.filterwarnings("ignore")
pd.options.display.max_rows = 999

logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)

In [None]:
DELPHI_BUCKET_NAME = 'covid19-lake' 
COVIDCAST_PREFIX = "covidcast/json"

In [None]:
def _get_unsigned_s3_client():
    return boto3.client('s3', config=Config(signature_version=UNSIGNED))

def get_latest_delphi_files(bucket_name: str=DELPHI_BUCKET_NAME, 
                            prefix: Optional[str]=COVIDCAST_PREFIX) -> List[str]:
    """
    Given an s3 bucket name and optional path prefix, fetch all file names matching that prefix.

    """
    s3 = _get_unsigned_s3_client()
    paginator = s3.get_paginator('list_objects')
    # The bucket has a ton of stuff and depending on the prefix value you 
    # choose you may exceed the max list_objects return size (1000). 
    #The paginator allows you always fetch all of the file paths. 
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
    s3_keys = []
    for page in page_iterator:
        s3_keys.extend(x["Key"] for x in page["Contents"])
    return s3_keys

def group_covidcast_files_by_source(s3_keys: List[str]) -> Dict[str, List[str]]:
    """
    The delphi public s3 bucket contains files with paths that look like this, 
    'covidcast/json/data/consensus/part-00000-64b3ef4a-f21d-4ff8-8993-80e9447b3e42-c000.json'
    
    Given an array of keys, this function groups all the file parts for a given data type/data source
    for further processing.
    
    There is an additional metadatafile which is not includeded. 
    
    """
    files_by_type = defaultdict(list)
    for path in s3_keys:
        data_type = path.split("/")[3]
        if data_type != "metadata.json":
            files_by_type[data_type].append(path)
    return files_by_type

def get_delphi_covidcast_metadata(bucket_name: str=DELPHI_BUCKET_NAME) -> pd.DataFrame:
    covidcast_metadata_path = "s3://" + DELPHI_BUCKET_NAME + "/" +  'covidcast/json/metadata/metadata.json'
    metadata_df = pd.read_json(covidcast_metadata_path, lines=True)
    
    metadata_df.min_time = pd.to_datetime(metadata_df.min_time, format="%Y%m%d")
    metadata_df.max_time = pd.to_datetime(metadata_df.max_time, format="%Y%m%d")
    metadata_df.last_update = pd.to_datetime(metadata_df.last_update, unit='s')
    return metadata_df

In [None]:
keys = get_latest_delphi_files()
files_by_source = group_covidcast_files_by_source(keys)
covidcast_metadata = get_delphi_covidcast_metadata()

In [None]:
covidcast_metadata = get_delphi_covidcast_metadata()

# What data is there?
## look at the dedicated metadata file

In [None]:
covidcast_metadata.head(20)

In [None]:
covidcast_metadata[['data_source', 'signal', 'time_type','geo_type']].drop_duplicates()

Observations
* multiple data types available
* multiple signals per datatype
* each data type appears to have both county and state-level data. 

In [None]:
def cache_data_locally(s3_keys: List[str], local_folder: str, bucket_name=DELPHI_BUCKET_NAME) -> None:
    """
    Downloads json files from s3 
    """
    local_folder = pathlib.Path(local_folder)

    shutil.rmtree(local_folder, ignore_errors=True)
    local_folder.mkdir(parents=True)
        
    s3 = _get_unsigned_s3_client()
    for key in s3_keys:
        log.info(f"Downloading file s3://{bucket_name}/{key}")
        filename=pathlib.Path(key).name
        local_file = local_folder/filename
        s3.download_file(bucket_name, key, str(local_file))


# Cache all data locally

In [None]:
for data_source, keys in files_by_source.items():
    if not data_source.startswith("jhu"):
        log.info(f"Caching {len(keys)} {data_source} files locally.")
        cache_data_locally(keys, f"covidcast/{data_source}")

In [None]:
! ls covidcast

In [None]:
# Load data from source into dataframes

def construct_combined_dataframe(local_folder: str) -> pd.DataFrame:
    combined_df = pd.DataFrame()
    for f in glob.glob(f"{local_folder}/*.json"):
        part_df = pd.read_json(f, lines=True)
        part_df.time_value = pd.to_datetime(part_df.time_value, format="%Y%m%d")
        combined_df = combined_df.append(part_df, ignore_index=True)
    return combined_df

# Quick plots of all signals 

In [None]:
common_target_columns = ["time_value", "geo_value", "geo_type", "signal", "value"]
target_data_sources = [x.name for x in pathlib.Path("./covidcast/").iterdir() if x.is_dir()]
target_data_sources

In [None]:
state='ny'
for source in target_data_sources:
    fig, ax = plt.subplots()
    source_df = construct_combined_dataframe(f"covidcast/{source}")
    source_df = source_df[source_df.geo_value==state]
    groups = source_df.groupby("signal")
    for name, group in groups:
        group.plot(x='time_value', y='value', label=f"{source}_{name}", ax=ax, kind='line')


# Consensus Data source

In [None]:
consensus_df = construct_combined_dataframe("covidcast/consensus")

In [None]:
consensus_df.signal.unique()

In [None]:
consensus_df[consensus_df.geo_type=='state']

In [None]:
consensus_df[consensus_df.geo_value=="ny"].plot(x="time_value", y="value")

In [None]:
consensus_df[consensus_df.geo_value=="ca"].plot(x="time_value", y="value")

# Doctor Visits

In [None]:

doctor_visits_df = construct_combined_dataframe("covidcast/doctor-visits")

In [None]:
doctor_visits_df.signal.unique()

In [None]:
state="ca"
groups = doctor_visits_df[doctor_visits_df.geo_value==state].groupby("signal")
fig, ax = plt.subplots()
for name, group in groups: 
    group.plot(x="time_value", y="value", title=f"{state} doctor visits", label=name, ax=ax)

# Indicator Combination

In [None]:
indicator_combination_df = construct_combined_dataframe("covidcast/indicator-combination")