### Notes
I would normally use a configuration file, environment variables, or command-line arguments for the variables in the "Credentials" cell. However, given the format of the exercise, I feel just having the variables integrated in the notebook in their own cell is the simplest for everyone involved.

In [None]:
# Imports
import os, re
import numpy
import pandas
import boto3
from enum import Enum
from botocore.exceptions import ClientError, ReadTimeoutError, ConnectTimeoutError
import sqlalchemy
from sqlalchemy.sql import text

In [None]:
# Credentials
aws_access_key=""
aws_secret_key=""
aws_s3_bucket=""
postgres_host=""
postgres_port=5432
postgres_user=""
postgres_password=""
postgres_database="postgres"
postgres_table=""

In [None]:
class MIND_S3:
    """Holds AWS S3 credentials and session as well as performs actions against S3."""
    aws_access_key = None
    aws_secret_key = None
    s3_client = None

    # Defaults
    max_keys = 1000


    def __init__(self,access_key: str,secret_key: str) -> None:
        """
        Initialize the S3 client.

        access_key: Your AWS Access Key ID

        secret_key: Your cooresponding AWS Secret Key
        """
        self.aws_access_key = access_key
        self.aws_secret_key = secret_key
        self.__login()

    def __login(self,retries=0):
        """Attempt to login to AWS S3 using the saved credentials."""
        try:
            self.s3_client = boto3.client(
                "s3",
                aws_access_key_id = self.aws_access_key,
                aws_secret_access_key = self.aws_secret_key
            )
        except TimeoutError or ReadTimeoutError or ConnectTimeoutError as e:
            if(retries > 2):
                raise e
            else:
                self.__login(retries+1)
        except Exception as e:
            raise e
        
    def list_all_objects(self,bucket: str,prefix="") -> list:
        """
        Paginate through all of the objects in a given bucket which contain a given prefix.

        Warning: This can become an expensive process on large buckets.

        Returns: list(object) -- Objects in format specified by boto3 list_objects_v2()

        Args:

        bucket -- The S3 bucket to perform the list on.

        prefix -- The prefix to use when filtering results.
        """
        objects = []
        response = self.s3_client.list_objects_v2(
            Bucket=bucket,
            MaxKeys=self.max_keys,
            Prefix=prefix
        )

        if("Contents" in response):
            objects.extend(response["Contents"])
        
        while ("NextContinuationToken" in response):
            response = self.s3_client.list_objects_v2(
                Bucket=bucket,
                MaxKeys=self.max_keys,
                Prefix=prefix,
                ContinuationToken=response["NextContinuationToken"]
            )
            objects.extend(response["Contents"])

        return objects
    
    def pull_objects(self,bucket: str, destination: str, objects: list) -> list:
        """
        Attempts to download the given list of objects from the given bucket and place then in a directory on
        the local filesystem.

        Returns: list(str) -- The the paths to each of the retrieved files.

        Args:

        bucket -- The S3 bucket to pull from.

        destination -- The target directory to store the retrieved objects. (Will be created if it doesn't already exist.)

        objects -- A list of objects to pull. Must be in format [{"Key":"object_key"}]. MIND_S3.list_all_objects() returns this format.
        """
        files = []
        os.makedirs(destination,exist_ok=True)
        for item in objects:
            try:
                obj = self.s3_client.get_object(
                    Bucket=bucket,
                    Key=item["Key"]
                )
                if("Body" in obj):
                    file_path = "%s/%s"%(destination,item["Key"])
                    object_directory = os.path.dirname(file_path)
                    os.makedirs(object_directory,exist_ok=True)

                    with open(file_path,'wb') as file:
                        file.write(obj["Body"].read())

                    files.append(file_path)
            except ClientError as e:
                if e.response['Error']['Code'] == 'NoSuchKey':
                    print("Object %s not found."%(item["Key"]))

        return files

In [28]:
class MIND_Postgres:
    """Connects to a Postgres database and maintains the connection for future executions."""
    pg_host = None
    pg_port = None
    pg_username = None
    pg_password = None
    pg_database = None

    pgsql = None

    column_character_filter = re.compile("[^a-zA-Z0-9]")


    def __init__(self,host:str,port:int,username:str,password:str,database:str) -> None:
        """
        Args:

        host -- Postgres DB host

        port -- Postgres DB conenction port

        username -- Postgres DB Username

        password -- Password for the Postgres DB Username

        database -- Which database to connect to on the Postgres host.
        """
        self.pg_host = host
        self.pg_port = port
        self.pg_username = username
        self.pg_password = password
        self.pg_database = database
        self.__connect()

    

    def __connect(self):
        self.pgsql = sqlalchemy.create_engine("postgresql://%s:%s/%s?password=%s&user=%s"%(
            self.pg_host,
            self.pg_port,
            self.pg_database,
            self.pg_password,
            self.pg_username
        ))


    def list_columns(self,table_name:str) -> list:
        """
        Lists the columns and their data types from the desired table.

        Returns -- List of tuples in format [('column_name', 'data_type')]

        Args:

        table_name -- The name of the table which to list columns for.
        """
        with self.pgsql.connect() as con:
            smt = text("select column_name, data_type from INFORMATION_SCHEMA.COLUMNS where table_name = '%s'"%(table_name))
            resp = con.execute(smt)
        return list(resp)
    
    def clear_table(self,table_name:str) -> int:
        """
        Clear the table of all rows.

        Returns -- Number of rows deleted.

        Args:

        table_name -- The name of the table which to clear.
        """
        with self.pgsql.connect() as con:
            smt = text("WITH deleted AS (DELETE FROM %s RETURNING *) SELECT count(*) FROM deleted;"%(table_name))
            resp = con.execute(smt)
            con.commit()
        return list(resp)
    
    def sanitize_column_name(self,column_name:str) -> str:
        """
        Removes erroneous or otherwise ill-advised characters from a column name.

        returns -- Column name, sanitized for use with SQL type databases.

        Args:
        
        column_name -- Original name of the column
        """
        new_name = list(re.sub(self.column_character_filter,"|",column_name))
        first_character = True
        for i in range(0,len(new_name)):
            if(first_character == True and new_name[i] != "|"):
                new_name[i] = new_name[i].lower()
                first_character = False
                continue
            if(new_name[i] != "|" and new_name[i-1] == "|"):
                new_name[i] = new_name[i].upper()

        return "".join(new_name).replace("|","")
        
    
    def add_column(self,table_name:str,column_name:str,data_type:str,constraint=""):
        """
        Adds a column of the specified data type to a given table.

        Args:
        
        table_name -- Name of the table which to add the column

        column_name -- The name of the column to add.

        data_type -- The Postgres data type which the column stores.

        constraint -- Optional addition of a constraint, such as "NOT NULL" or "PRIMARY KEY"
        """
        with self.pgsql.connect() as con:
            smt = text("ALTER TABLE %s ADD COLUMN \"%s\" %s %s"%(table_name,column_name,data_type,constraint))
            resp = con.execute(smt)
            con.commit()



    def push_data_frame(self,frame:pandas.DataFrame,table_name:str):
        """
        Inserts a pandas DataFrame into a table.

        Args:

        frame -- Pandas DataFrame to push

        table_name -- Name of table to push DataFrame to.
        """
        sanitized_columns = {}
        for column in frame.columns:
            sanitized_columns[column] = self.sanitize_column_name(column)
        frame = frame.rename(columns=sanitized_columns)
        frame.to_sql(table_name,self.pgsql,if_exists='append',index=False)

In [None]:
class Result(Enum):
    """The possible results of a game."""
    WIN=1.0
    LOSS=0.0

In [None]:
def parse_CSV_file(file_name: str) -> pandas.DataFrame:
    """
    Loads in a CSV file and creates a pandas DataFrame from its contents.

    Args:

    file_name -- path to the CSV file which should be parsed.
    """
    frame = pandas.read_csv(file_name)
    if("Yards" in frame):
        file_base_name = os.path.basename(file_name)
        player_name = file_base_name.split("_")[0].capitalize()
        frame = frame.rename(columns={
            "Yards":"%s Yards"%(player_name),
            "TD":"%s TD"%(player_name),
        })
        return frame
    if("Result" in frame):
        frame["Result"] = frame["Result"].replace(Result.WIN.value, "Win").replace(Result.LOSS.value, "Loss")
        return frame
    
    return frame

In [None]:
# This cell is unused. Was used in testing to make sure CSV merging worked properly.
def generate_summary(input_frame:pandas.DataFrame) -> pandas.DataFrame:
    """
    Creates a summation of the raw input data.

    Returs -- new dataframe that includes the win-loss ratio as well as the total yards for each
    receiver.

    Args:

    input_frame -- a Pandas DataFrame containing game and player statistics.
    """
    summary_frame = pandas.DataFrame()
    for column in input_frame.columns:
        if(column.lower() == "result"):
            win_loss_counts = input_frame[column].value_counts()
            wins = 0
            losses = 0
            if("Win" in win_loss_counts):
                wins = int(win_loss_counts["Win"])
            if("Loss" in win_loss_counts):
                losses = int(win_loss_counts["Loss"])
            summary_frame[column] = ["%s-%s"%(wins,losses)]
        elif("yards" in column.lower() or "td" in column.lower()):
            try:
                summary_frame[column] = input_frame[column].sum(numeric_only=True)
            except Exception as e:
                print(e)

    return summary_frame

In [None]:
# Fetch the CSV files from S3
s3 = MIND_S3(aws_access_key,aws_secret_key)
objects = s3.list_all_objects(aws_s3_bucket)
files = s3.pull_objects(aws_s3_bucket,"./temp",objects)

In [None]:
# Parse and transform data from CSV files.
input_frame = None
for file_name in files:
    parsed_frame = parse_CSV_file(file_name)
    if(input_frame is None):
        input_frame = parsed_frame
    elif(parsed_frame is not None):
        input_frame = input_frame.merge(parsed_frame,how="outer")

In [30]:
pg = MIND_Postgres(
    postgres_host,
    postgres_port,
    postgres_user,
    postgres_password,
    postgres_database
)
existing_columns = pg.list_columns(postgres_table)

# Add columns if they don't exist
for column in input_frame.columns:
    sanitized_name = pg.sanitize_column_name(column)
    existing_filtered = list(filter(lambda _: _[0] == sanitized_name,existing_columns))
    if(len(existing_filtered) <= 0):
        print("Adding new column: %s"%(sanitized_name))
        try:
            int(input_frame[column].sum())
            pg.add_column(postgres_table,sanitized_name,"NUMERIC")
        except Exception as e:
            pg.add_column(postgres_table,sanitized_name,"VARCHAR(64)")

# Clear table of existing data
pg.clear_table(postgres_table)

# Push new data
pg.push_data_frame(input_frame,postgres_table)