In [119]:
from typing import Dict

# STANDARD_TYPES
INT = "INT"
FLOAT = "FLOAT"
NUMERIC = "NUMERIC"
TEXT = "TEXT"
TIMESTAMP = "TIMESTAMP"
DATETIME_PANDAS = "DATETIME64[NS]"

# STANDARD_DATA_SOURCES
PANDAS = "PANDAS"

DIALECT_MAPPING: Dict[str, Dict[str, str]] = dict(
    PANDAS=dict(
        OBJECT=TEXT,
        DATETIME_PANDAS=TIMESTAMP,
        INT64=TEXT,
        FLOAT64=TEXT,
    )
)

In [131]:
import os
from typing import List, Optional
import pandas as pd
from pandas import DataFrame

def get_tables_by_prefix_separator(data_directory, prefix_separator: str = "_") -> List[str]:
    potential_tables = [pt.split(prefix_separator)[0] for pt in os.listdir("/dataset")]
    tables = set(potential_tables)
    return list(tables)

def get_csv_data_files_for_table(dataset_directory: str, table: str, prefix_separator: str) -> List[str]:
    data_files_glob = f"{table}{prefix_separator}*.csv"
    data_files = glob.glob(os.path.join(dataset_directory, data_files_glob))
    return data_files

class DataFrameTools(object):
    def __init__(self):
        pass

    def get_typed_dataframe_from_file(self, data_file: str, date_columns: List[str]):
        df = pd.read_csv(data_file, parse_dates=date_columns)
        return df

    def get_dtypes_dict_from_typed_dataframe(self, typed_df: DataFrame):
        dtypes_dict = typed_df.dtypes.to_dict()
        for key, value in dtypes_dict.items():
            dtypes_dict[key] = str(value).upper()
        return dtypes_dict


In [132]:
import codecs
import glob
from io import StringIO
from jinja2 import Environment, FileSystemLoader
import os
import psycopg2
import sys
import traceback
from typing import Any, Dict

DEFAULT_USER = "postgres"
DEFAULT_PASSWORD = "password"
DEFAULT_DATABASE = "take_home"
DEFAULT_SCHEMA = "eric_meadows"
DEFAULT_HOST = "database"



TEMPLATE_DIR = os.path.join(os.path.dirname("./jinja/"), "templates")
TEMPLATE_LOADER = FileSystemLoader(searchpath=TEMPLATE_DIR)
TEMPLATE_ENV = Environment(loader=TEMPLATE_LOADER)


TABLE_DATE_COLUMNS: Dict[str, List[str]] = dict(
    user=["event_ts",],
    marketing=["event_ts",]
)

DROP_AND_CREATE_TABLE_WITH_SCHEMA = "drop_and_create_table_with_schema.jinja2"
CREATE_SCHEMA_IF_NOT_EXISTS = "create_schema_if_not_exists.jinja2"

class DatabaseTools(object):
    def __init__(self, *args, **kwargs):
        pass

    def load_jinja_query_template(
        self,
        template_file: str,
        params: Dict[str, Any]
    ) -> str:
        template = TEMPLATE_ENV.get_template(template_file)
        rendered_query = template.render(params)
        return rendered_query
    

class PostgresTools(DatabaseTools):
    def __init__(
        self,
        host: str = DEFAULT_HOST,
        database: str = DEFAULT_DATABASE,
        user: str = DEFAULT_USER,
        password: str = DEFAULT_PASSWORD,
        *args,
        **kwargs
    ):
        super(PostgresTools, self).__init__(*args, **kwargs)
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.conn = psycopg2.connect(
            host=self.host,
            dbname=self.database,
            user=self.user,
            password=self.password
        )
        self.dataframe_tools = DataFrameTools()
    
    def _ensure_schema_present(
        self,
        schema,
    ):
        query_params = dict(
            schema=schema
        ) 
        query = self.load_jinja_query_template(
            CREATE_SCHEMA_IF_NOT_EXISTS,
            query_params
        )
        self._run_query(query)

    def _drop_and_create_table_sql(
        self,
        database: str,
        table: str,
        dialect: str,
        schema_map: Dict[str, str],
    ):
        dialect_map = DIALECT_MAPPING[dialect]
        postgres_table_name = f"{database}.{table}"

        query_params = dict(
            postgres_table_name=postgres_table_name,
            mapping=dialect_map,
            schema_map=schema_map,
        )
        query_sql = self.load_jinja_query_template(
            DROP_AND_CREATE_TABLE_WITH_SCHEMA,
            query_params
        )
        return query_sql

    def _drop_and_create_table(
        self,
        schema: str,
        table: str,
        dialect: str,
        schema_map: Dict[str, str],
    ):
        query = self._drop_and_create_table_sql(
            schema,
            table,
            dialect,
            schema_map
        )
        self._run_query(query)
    
    def _get_file_buffer_without_null_bytes(
        self,
        data_file
    ):
        writeable_csv_buffer = StringIO()
        # Circumvent the 
        pd.read_csv(data_file).to_csv(writeable_csv_buffer, index=False, header=False)
        writeable_csv_buffer.seek(0)
        return writeable_csv_buffer
    
    def _load_file_into_table(
        self,
        schema: str,
        table: str,
        data_file: str,
        separator=","
    ):
        cursor = self.conn.cursor()
        writeable_csv_buffer = self._get_file_buffer_without_null_bytes(data_file)
        try:
            cursor.copy_from(writeable_csv_buffer, f"{schema}.{table}", sep=separator, null="")
            self.conn.commit()
        except Exception as err:
            traceback.print_exc(file=sys.stdout)
            raise err
        finally:
            writeable_csv_buffer.close()
            cursor.close()
    
    def _run_query(self, query):
        
        cursor = self.conn.cursor()
        cursor.execute(query)
        self.conn.commit()
        cursor.close()
    
    def load_files_into_database(
        self,
        schema: str,
        table: str,
        data_files: List[str],
        dialect: str = PANDAS,
    ):
        first_data_file = data_files[0]
        df = self.dataframe_tools.get_typed_dataframe_from_file(first_data_file, [])
        schema_map = self.dataframe_tools.get_dtypes_dict_from_typed_dataframe(df)
        
        self._ensure_schema_present(schema)

        self._drop_and_create_table(
            schema,
            table,
            dialect,
            schema_map,
        )
        for data_file in data_files:
            self._load_file_into_table(schema, table, data_file)


In [151]:
from typing import Dict

TABLE_DATE_COLUMNS: Dict[str, List[str]] = dict(
    user=["event_ts",],
    marketing=["event_ts",]
)
DATASET_DIRECTORY = "/dataset"
PREFIX_SEPARATOR = "_"


def main():
    postgres_tools = PostgresTools()

    required_tables = get_tables_by_prefix_separator(DATASET_DIRECTORY, PREFIX_SEPARATOR)
    for table in required_tables:
        print(f"Loading data for table:  {table}")
        data_files = get_csv_data_files_for_table(DATASET_DIRECTORY, table, PREFIX_SEPARATOR)
        result = postgres_tools.load_files_into_database(DEFAULT_SCHEMA, table, data_files)

if __name__ == "__main__":
    main()


Loading data for table:  user
Loading data for table:  marketing


# Investigation

In [328]:
from sqlalchemy import create_engine
engine = create_engine(f'postgresql://{DEFAULT_USER}:{DEFAULT_PASSWORD}@{DEFAULT_HOST}:5432/{DEFAULT_DATABASE}')

USERS_TABLE = "user"
MARKETING_TABLE = "marketing"

users_df = pd.read_sql_query(
    f"select * from {DEFAULT_SCHEMA}.{USERS_TABLE}",
    con=engine,
    parse_dates=["event_ts"])\
    .dropna(subset=["event_ts"])
marketing_df = pd.read_sql_query(
    f"select * from {DEFAULT_SCHEMA}.{MARKETING_TABLE}",
    con=engine,
    parse_dates=["event_ts"])\
    .dropna(subset=["event_ts"])
marketing_df.length = marketing_df.length.astype("float")


In [314]:
unique_user_count = len(users_df.user_id.unique())
print(f"1.  Unique users:  {unique_user_count}")


1.  Unique users:  2904


In [315]:
marketing_providers = ", ".join(list(filter(None, marketing_df.provider.unique())) )
print(f"2.  Marketing providers:  {marketing_providers}")
# df.head()


2.  Marketing providers:  Facebook, Instagram, Spotify, Snapchat, Inst


In [343]:
change_frequency = users_df.property.value_counts()
most_changed_attribute_name = change_frequency.idxmax().title()
most_changed_attribute_count = change_frequency.max()

print("3.  Most-changed Attribute - Name:  {most_changed_attribute_name}, Count:  {most_changed_attribute_count}")


3.  Most-changed Attribute - Name:  {most_changed_attribute_name}, Count:  {most_changed_attribute_count}


In [326]:
DATE = "2019-07-03"
PROVIDER = "Snapchat"
users_show_ad_on_provider_on_given_date = marketing_df\
    .loc[marketing_df.event_ts.dt.strftime("%Y-%m-%d") == DATE]\
    .loc[marketing_df.provider == PROVIDER]\
    .shape[0]
print(f"4.  Users shown an ad on {PROVIDER} on {DATE}:  {users_show_ad_on_provider_on_given_date}")

4.  Users shown an ad on Snapchat on 2019-07-03:  261


In [341]:
POLITICAL_AFFILIATION = "moderate"

phone_id_and_political_affiliation = users_df.loc[
    users_df.property == "politics",
    ["phone_id", "value"]]\
    .dropna(subset=["value"])\
    .rename(columns={"value": "political_affiliation"})
merged_df = pd.merge(
    marketing_df,
    phone_id_and_political_affiliation,
    how="inner",
    left_on="phone_id",
    right_on="phone_id")

most_shown_ad_to_specific_political_affliation = merged_df\
    .loc[merged_df.political_affiliation.str.lower() == POLITICAL_AFFILIATION]\
    .ad_id\
    .value_counts()\
    .idxmax()
print(f"5.  Ad shown most to {POLITICAL_AFFILIATION}s:  {most_shown_ad_to_specific_political_affliation}")

5.  Ad shown most to moderates:  4


In [344]:
marketing_df.groupby(by=["ad_id"])\
    .agg({"length": ["mean", "std"], "phone_id": "nunique"})\
    .sort_values([
        ('phone_id', "nunique"),
        ("length", "mean")],
    ascending=False)


Unnamed: 0_level_0,length,length,phone_id
Unnamed: 0_level_1,mean,std,nunique
ad_id,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2
1,1187.477852,698.685703,639
4,1194.655267,687.216389,635
2,1200.760522,680.341966,612
0,1251.282318,693.533536,605
3,1189.10104,689.61589,592
5,1129.005277,672.512072,351
8,1156.726519,688.105589,346
6,1166.478022,722.131942,337
9,1167.659341,712.691252,335
7,1146.465465,703.231707,313


# Examining the data above, it appears clear that there are 4 ad groups based upon the number of users an ad was shown to
## Because of this, there appears to be a winner for each group, and then we will select another
### Group 1:  >500 users shown
Winner:  ad_id = 0
Reason:  mean - 2 std. dev > every other in the group
### Group 2:  300-499 users shown
Winner:  ad_id = 5
Reason:  mean - 2 std. dev > every other in the group
### Group 4:  160-299 users shown
Winner:  ad_id = 12
Reason:  mean - 2 std. dev > every other in the group
### Group 3:  100-159 users shown
Winner:  ad_id = 14
Reason:  mean - 2 std. dev > every other in the group
### Group 4:  <100 users shown
Winner:  ad_id = 20
Reason:  mean - 2 std. dev > every other in the group