In [None]:
import os
from tqdm import tqdm
from smart_open import open
import random
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import numpy as np
from datetime import datetime
from functools import partial
from typing import Dict, Tuple, Any, List  # https://fastapi.tiangolo.com/python-types/
from pydantic import validate_arguments


peekd_c = [
    "#0210AA",
    "#07A8B2",
    "#F642FA",
    "#FF5F1D",
    "#007C1B",
    "#34C9B2",
    "#A80505",
    "#FFC003",
    "#39D996",
    "#6709CB",
    "#FF2740",
    "#FFEF29",
    ]

In [None]:
# Instantiate clients
s3_client = boto3.client("s3")

# Load functions
@validate_arguments
def getListOfFiles(dirName: str) -> List[str]:
    # create a list of file and sub directories
    # names in the given directory
    listOfFile = os.listdir(dirName)
    allFiles = list()
    # Iterate over all the entries
    for entry in listOfFile:
        # Create full path
        fullPath = os.path.join(dirName, entry)
        # If entry is a directory then get the list of files in this directory
        if os.path.isdir(fullPath):
            allFiles = allFiles + getListOfFiles(fullPath)
        else:
            allFiles.append(fullPath)

    return allFiles


@validate_arguments
def get_all_s3_filepaths(s3_path: str) -> List[str]:
    bucket, key, filename = split_s3_filepath(s3_path)
    # Use paginator to overcome 1000 objects limit
    paginator = s3_client.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket, Prefix=key)
    s3_keys = []
    for page in pages:
        for obj in page["Contents"]:
            s3_keys.append(obj["Key"])
    # Recreate full s3 path
    s3_filepaths = [f"s3://{bucket}/{s3_key}" for s3_key in s3_keys]
    # Remove empty directory strings
    s3_filepaths = [filepath for filepath in s3_filepaths if filepath[-1] is not "/"]
    # Remove "_SUCCESS" files
    s3_filepaths = [filepath for filepath in s3_filepaths if "_SUCCESS" not in filepath]
    return s3_filepaths


@validate_arguments
def split_s3_filepath(s3_filepath: str) -> Tuple[str, str, str]:
    path_parts = s3_filepath.replace("s3://", "").split("/")
    bucket = path_parts.pop(0)
    key = "/".join(path_parts)
    filename = path_parts[-1]
    return bucket, key, filename


@validate_arguments
def pd_read_file(local_filename: str) -> pd.DataFrame:
    if '.' in local_filename:
        filetype = local_filename.split(".")[-1]
        if filetype == "csv":
            df = pd.read_csv(local_filename)
        elif filetype == "parquet":
            df = pd.read_parquet(local_filename)
        else:
            raise Exception(f'filetype "{filetype}" is not supported at the moment')
    else:
        try:
            df = pd.read_parquet(local_filename)
        except:
            raise Exception('filetype unknown.')
    return df


@validate_arguments
def download_s3(s3_filepath: str) -> pd.DataFrame:
    # Get s3_bucket, s3_key, filename
    s3_bucket, s3_key, filename = split_s3_filepath(s3_filepath)
    if not filename:
        print(f"This is no file: {s3_filepath}")
        return pd.DataFrame()

    local_filename = "download_s3-" + filename
    # Download
    try:
        s3_client.download_file(s3_bucket, s3_key, local_filename)
    except:
        raise Exception

    # Load result into memory
    df = pd_read_file(local_filename)

    # Remove downloaded result from local disk
    if os.path.isfile(local_filename):
        os.remove(local_filename)
    return df