<font size="+3"><strong>Extract, Transform, Load</strong></font>

In Data Science and Data Engineering, the process of taking data from a source, changing it, and then loading it into a database is called **ETL**, which is short for **extract, transform, load**. ETL tends to be more programming-intensive than other data science tasks like visualization, so we'll also spend time in this lesson exploring Python as an **object-oriented programming** language. Specifically, we'll create our own Python **class** to contain our ETL processes.

In [2]:
import random
import pandas as pd
#from teaching_tools.ab_test.reset import Reset
#from pymongo import MongoClient

#r = Reset()
#r.reset_database()

In [None]:
client = MongoClient(host="localhost", port=27017)
db = client["wqu-abtest"]
ds_app = db["ds-applicants"]
print("client", type(client))
print("ds_app:", type(ds_app))

### Extract: Developing the Hypothesis
Now that we've connected to the data, we need to pull out the information we need. One aspect of our applicant pool that we didn't explore in the last lesson is how many applicants actually complete the DS Lab admissions quiz.

Use the [`aggregate`](https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.aggregate) method to calculate the number of applicants that completed and did not complete the admissions quiz.

In [None]:
result = ds_app.aggregate([{"$group":
                            {"_id": "$admissionsQuiz",
                             "count": {"$count": {} }}}])

In [None]:
result

In [None]:
list(result)

In [None]:
#How many applicants complete the admission quiz?

result = ds_app.aggregate([{"$group": {"_id": "$admissionsQuiz",
                                       "count": {"$count":{}}}}])
for r in result:
    if r["_id"] == "complete":
        complete = r["count"]
    else:
        incomplete = r["count"]

In [None]:
print("Complete quiz:", complete)
print("Did not complete quiz:", incomplete)

In [None]:
#Proportion of new user that have not complete the quiz
total = complete + incomplete
prop_incompete = incomplete / complete
print("Proportion of users who do not complete the admission quiz:",
      round(prop_incompete, 2))

Now that we know that around a quarter of the applicants don't complete the admissions quiz, is there anything we can do improve the completion rate? 

A **hypothesis** is an informed guess about what we think is going to happen in an experiment. We probably hope that whatever we're trying out is going to work, but it's important to maintain a healthy degree of skepticism. Science experiments are designed to demonstrate what *does* work, not what doesn't, so we always start out by assuming that whatever we're about to do won't make a difference (even if we hope it will). The idea that an experimental intervention won't change anything is called a **null hypothesis** ($H_0$), and every experiment either rejects the null hypothesis (meaning the intervention worked), or fails to reject the null hypothesis (meaning it didn't). 

The mirror image of the null hypothesis is called an **alternate hypothesis** ($H_a$), and it proceeds from the idea that whatever we're about to do actually *will* work. If I'm trying to figure out whether exercising is going to help me lose weight, the null hypothesis says that if I exercise, I won't lose any weight. The alternate hypothesis says that if I exercise, I will lose weight. 

It's important to keep both types of hypothesis in mind as you work through your experimental design.

In [None]:
null_hypothesis = """
There is no relationship between recieving an email and completing the admission quiz, i.e
    Sending an email to 'no-quiz applicants' does not increase the rate of completion.
"""
alternate_hypothesis = """
There is a relationship between recieving an email and completing the admission quiz, i.e
    Sending an email to 'no-quiz applicants' increases the rate of completion.
"""

print("Null Hypothesis:", null_hypothesis)
print("Alternate Hypothesis:", alternate_hypothesis)

In [None]:
#create function that search 'ds_applicants' and return all the no-quiz applicants from a specific date
collection = ds_app
dat_string = '2022-05-04'

In [None]:
def find_by_date(collection, date_string):
    #convert 'date_string' to a datetime object
    start = pd.to_datetime(date_string, format='%Y-%m-%d')
    # Offset start by 1 day
    end = start + pd.DateOffset(days=1)
    # Create pymongo query for no-quiz applicant b/t start and end
    query = {"createdAt": {"$gte": start, "$lt": end}, "admissionsQuiz": "incomplete"}
    #Query collection, get result
    result = collection.find(query)
    observations = list(result)

    return observations

In [None]:
observations = find_by_date(ds_app, date_string="2022-05-05")
print("observations type:", type(observations))
print("observations len:", len(observations))
observations[0]

## Transform: Designing the Experiment
Okay! Now that we've extracted the data we'll need for the experiment, it's time to get our hands dirty. 

The **transform** stage of ETL involves manipulating the data we just extracted. In this case, we're going to be figuring out which students didn't take the quiz, and assigning them to different experimental groups. To do that, we'll need to *transform* each document in the database by creating a new attribute for each record.

Now we can split the students who didn't take the quiz into two groups: one that will receive a reminder email, and one that will not. Let's make another function that'll do that for us.


Create a function `assign_to_groups` that takes a list of new user documents as input and adds two keys to each document. The first key should be `"inExperiment"`, and its value should always be `True`. The second key should be `"group"`, with half of the records in `"email (treatment)"` and the other half in `"no email (control)"`.

In [None]:
def assign_to_groups(observations):
    #shuffle the observations
    random.seed(42)
    random.shuffle(observations)
    #get index positions of item at observations and halfway point
    idx = len(observations)

    #Assign first half to control group
    for doc in observations[idx:]:
        doc["inExperiment"] = True
        doc["group"] = "email (control)"
    #Assign second half to treatment group
    for doc in observations[idx:]:
        doc["inExperiment"] = True
        doc["group"] = "email (treatment)"

    return observations

In [None]:
observations_assigned = assign_to_groups(observations)

print("observations_assigned type:", type(observations_assigned))
print("observations_assigned len:", len(observations_assigned))

Create a function `export_email` that takes a list of documents (like `observations_assigned`) as input, creates a DataFrame with the emails of all observations in the treatment group, and saves the DataFrame as a CSV file. Then use your function to create a CSV file in the current directory.


In [None]:
def export_treatment_emails(observations_assigned, directory="."):
     # Put `observations_assigned` docs into DataFrame
    df = pd.DataFrame(observations_assigned)

    # Add `"tag"` column
    df["tag"] = "ab-test"

    # Create mask for treatment group only
    mask = df["group"] == "email (treatment)"

    # Create filename with date
    date_string = pd.Timestamp.now().strftime(format="%Y-%m-%d")
    filename = directory + "/" + date_string + "_ab-test.csv"

    # Save DataFrame to directory (email and tag only)
    df[mask][["email", "tag"]].to_csv(filename, index=False)


export_treatment_emails(observations_assigned=observations_assigned)

Assign the first item in `observations_assigned` list to the variable `updated_applicant`. The assign that applicant's ID to the variable `applicant_id`.

In [None]:
updated_applicant = observations_assigned[0]
applicant_id = updated_applicant["_id"]
print("applicant type:", type(updated_applicant))
print(updated_applicant)
print("applicant_id type:", type(applicant_id))
print(applicant_id)

Use the `find_one` method together with the `applicant_id` from the previous task to locate the original record in the `"ds-applicants"` collection.


In [None]:
# Find original record for `applicant_id`
ds_app.find_one({"_id": applicant_id})

Use the `update_one` method to update the record with the new information in `updated_applicant`. Once you're done, rerun your query from the previous task to see if the record has been updated. 


In [None]:
result = ds_app.update_one(
    filter={"_id": applicant_id},
    update={"$set": updated_applicant}
)
print("result type:", type(result))

 Use the [`dir`](https://docs.python.org/3/library/functions.html#dir) function to inspect `result`. Once you see some of the attributes, try to access them. For instance, what does the `raw_result` attribute tell you about the success of your record update?

In [None]:
# Access methods and attributes using `dir`
dir(result) #type(result)
# Access `raw_result` attribute
result.raw_result #
#find original record of 'applicant_id'
ds_app.find_one({"_id": applicant_id})

 Create a function `update_applicants` that takes a list of document like as input, updates the corresponding documents in a collection, and returns a dictionary with the results of the update. Then use your function to update `"ds-applicants"` with `observations_assigned`.


In [None]:
def update_applicants(collection, observations_assigned):
    n = 0
    n_modified = 0
    #iterate through applicants
    for doc in observations_assigned:
        result = ds_app.update_one(
            filter={"_id": doc["_id"]},
            update={"$set": doc}
            )
        #updated counter
        n += result.matched_count
        n_modified += result.modified_count
    #create results
    transaction_result = {"n": n, "nModified": n_modified}
    return transaction_result

In [None]:
result = update_applicants(ds_app, observations_assigned)
print("result type:", type(result))
result

## Python Classes: Building the Repository

We've managed to extract data from our database using our `find_by_date` function, transform it using our `assign_to_groups` function, and load it using our `update_applicants` function. Does that mean we're done? Not yet! There's an issue we need to address: distraction.

What do we mean when we say distraction? Think about it this way: Do you need to know the exact code that makes `df.describe()` work? No, you just need to calculate summary statistics! Going into more details would distract you from the work you need to get done. The same is true of the tools you've created in this lesson. Others will want to use them in future experiments with worrying about your implementation. The solution is to **abstract** the details of your code away.

To do this we're going to create a [Python class.](https://docs.python.org/3/tutorial/classes.html) Python classes contain both information and ways to interact with that information. An example of class is a pandas `DataFrame`. Not only does it hold data (like the size of an apartment in Buenos Aires or the income of a household in the United States); it also provides methods for inspecting it (like `DataFrame.head()` or `DataFrame.info()`) and manipulating it (like `DataFrame.sum()` or `DataFrame.replace()`). 

In the case of this project, we want to create a class that will hold information about the documents we want (like the name and location of the collection) and provide tools for interacting with those documents (like the functions we've built above). Let's get started!

Define a `MongoRepository` class with an `__init__` method. The `__init__` method should accept three arguments: `client`, `db`, and `collection`. Use the docstring below as a guide.

In [None]:
class MongoRepository:
     # Task 7.2.14
    def __init__(self, client=MongoClient(host="localhost", port=27017), db="wqu-abtest",collection="ds-applicants"):
        self.collection = client[db][collection]

    # Task 7.2.17
    def find_by_date(self, date_string):
        # Convert `date_string` to datetime object
        start = pd.to_datetime(date_string, format='%Y-%m-%d')
        # Offset `start` by 1 day
        end = start + pd.DateOffset(days=1)
        # Create PyMongo query for no-quiz applicants b/t `start` and `end`
        query = {"createdAt": {"$gte": start, "$lt": end}, "admissionsQuiz": "incomplete"}
        # Query collection, get result
        result = self.collection.find(query)
        # Convert `result` to list
        observations = list(result)

        return observations
    # Task 7.2.18
    def update_applicants(self, observations_assigned):
        n = 0
        n_modified = 0
        #iterate through applicants
        for doc in observations_assigned:
            result = self.collection.update_one(
                filter={"_id": doc["_id"]},
                update={"$set": doc}
                )
            #updated counter
            n += result.matched_count
            n_modified += result.modified_count
        #create results
        transaction_result = {"n": n, "nModified": n_modified}
        return transaction_result
    
        # Task 7.2.19
    def assign_to_groups(self, date_string):
        #get Observation
        observations = self.find_by_date(date_string)
        
        # Shuffle `observations`
        random.seed(42)
        random.shuffle(observations)
        
        # Get index position of item at observations halfway point
        idx = len(observations) // 2                                        # ciel and floor

        # Assign first half of observations to control group
        for doc in observations[idx:]:
            doc["inExperiment"] = True
            doc["group"] = "email (treatment)" 

        # Assign second half of observations to treatment group
        for doc in observations[:idx]:
            doc["inExperiment"] = True
            doc["group"] = "email (control)" 
        
        result = self.update_applicants(observations)
        return result


In [None]:
repo = MongoRepository()
print("repo type:", type(repo))
repo

 Extract the `collection` attribute from `repo`, and assign it to the variable `c_test`. Is the `c_test` the correct data type?


In [None]:
c_test = repo
print("c_test type:", type(c_test))
c_test

In [3]:
#dir(repo)

Our class is built, and now we need to take the ETL functions we created and turn them into **class methods**. Think back to the beginning of the course, where we learned how to work with DataFrames. If we call a DataFrame `df`, we can use methods designed by other people to figure out what's inside. We've learned lots of those methods already — `df.head()` `df.info()`, etc. — but we can also create our own. Let's give it a try.

Using your function as a model, create a `find_by_date` method for your `MongoRepository` class. It should take only one argument: `date_string`. Once you're done, test your method by extracting all the users who created account on 15 May 2022.

In [None]:
may_15_users = repo.find_by_date(date_string="2022-05-15")
print("may_15_users type", type(may_15_users))
print("may_15_users len", len(may_15_users))
may_15_users[:3]

Using your function as a model, create an `update_applicants` method for your `MongoRepository` class. It should take one argument: `documents`. To test your method, use the function to update the documents in `observations_assigned`. 

In [None]:
result = repo.update_applicants(observations_assigned)
print("result type:", type(result))
result

 Create an `assign_to_groups` method for your `MongoRepository` class. Note that it should work differently than your original function. It will take one argument: `date_string`. It should find users from that date, assign them to groups, update the database, and return the results of the transaction. Once you're done, use your method to assign all the users who created account on **14 May 2022**, to groups.

In [None]:
result = repo.assign_to_groups(date_string="2022-05-15")
print("result type:", type(result))
result