Perform and schedule an ETL process that transforms raw course rating data, into actionable course recommendations for students!
* Fetch data from multiple sources
* Transform to form recommendations
* Load into target database

## Course ratings

### Exploring the schema
the diagram of the database schema of `datacamp_application`:
![image.png](attachment:image.png)

### Querying the table
The goal is to get a feeling for the data in this exercise. You'll get the rating data for three sample users and then use a predefined helper function, `print_user_comparison()`, to compare the sets of course ids these users rated.

In [None]:
# Complete the connection URI
# The database is called datacamp_application. The host is localhost with port 5432.
# The username is repl and the password is password.
connection_uri = "postgresql://repl:password@localhost:5432/datacamp_application" 
db_engine = sqlalchemy.create_engine(connection_uri)

# Get user with id 4387
user1 = pd.read_sql("SELECT * FROM rating WHERE user_id=4387", db_engine)

# Get user with id 18163
user2 = pd.read_sql("SELECT * FROM rating WHERE user_id=18163", db_engine)

# Get user with id 8770
user3 = pd.read_sql("SELECT * FROM rating WHERE user_id=8770", db_engine)

# Use the helper function to compare the 3 users
print_user_comparison(user1, user2, user3)

In [None]:
output:
    
Course id overlap between users:
================================
User 1 and User 2 overlap: {32, 96, 36, 6, 7, 44, 95}
User 1 and User 3 overlap: set()
User 2 and User 3 overlap: set()

### Average rating per course
complete a transformation function `transform_avg_rating()` that aggregates the rating data using the pandas DataFrame's `.groupby() method.` The goal is to get a DataFrame with two columns, a course id and its average rating:
![image.png](attachment:image.png)
helper function `extract_rating_data()` which extracts course ratings from the rating table.

In [None]:
# Complete the transformation function
def transform_avg_rating(rating_data):
    # Group by course_id and extract average rating per course
    avg_rating = rating_data.groupby('course_id').rating.mean()
    # Return sorted average ratings per course
    sort_rating = avg_rating.sort_values(ascending=False).reset_index()
    return sort_rating

# Extract the rating data into a DataFrame    
# Use extract_rating_data() to extract raw ratings data. 
# It takes in as argument the database engine db_engines.
rating_data = extract_rating_data(db_engines)

# Use transform_avg_rating on the extracted data and print results
avg_rating_data = transform_avg_rating(rating_data)
print(avg_rating_data) 

In [None]:
output:
    
    course_id    rating
0          46  4.800000
1          23  4.800000
2          96  4.692765
3          56  4.661765
4          24  4.653061
..        ...       ...
94         54  4.238095
95         92  4.222222
96         29  4.208333
97         17  4.147059
98         42  4.107570

[99 rows x 2 columns]

## From ratings to recommendations

### Filter out corrupt data
inspect the DataFrame and make sure there are no missing values by using the pandas DataFrame's `.isnull().sum()` methods. You will find that the `programming_language` column has some missing values.

As such, you will complete the `transform_fill_programming_language()` function by using the `.fillna()` method to fill missing values.
![image.png](attachment:image.png)

In [None]:
course_data = extract_course_data(db_engines)

# Print out the number of missing values per column
print(course_data.isnull().sum())

# The transformation should fill in the missing values
# Missing values of the programming_language should be the language "R".
def transform_fill_programming_language(course_data):
    imputed = course_data.fillna({"programming_language": "R"})
    return imputed

transformed = transform_fill_programming_language(course_data)

# Print out the number of missing values per column of transformed
print(transformed.isnull().sum())

In [None]:
output:
    
course_id               0
title                   0
description             0
programming_language    3
dtype: int64
course_id               0
title                   0
description             0
programming_language    0
dtype: int64

### Using the recommender transformation
here are the decision rules for producing recommendations:
* Use technology a student has rated the most.
* Exclude courses a student has already rated.
* Find the three top-rated courses from eligible courses.<br><br>
In order to produce the final recommendations, you will use the average course ratings, and the list of eligible recommendations per user, stored in `avg_course_ratings` and `courses_to_recommend` respectively. You will do this by completing the `transform_recommendations()` function which merges both DataFrames and finds the top 3 highest rated courses to recommend per user.

In [None]:
# Complete the transformation function
def transform_recommendations(avg_course_ratings, courses_to_recommend):
    # Merge both DataFrames
    merged = courses_to_recommend.merge(avg_course_ratings) 
    # Sort values by rating and group by user_id
    grouped = merged.sort_values("rating", ascending=False).groupby("user_id")
    # Produce the top 3 values and sort by user_id
    recommendations = grouped.head(3).sort_values("user_id").reset_index()
    final_recommendations = recommendations[["user_id", "course_id","rating"]]
    # Return final recommendations
    return final_recommendations

# Use the function with the predefined DataFrame objects
recommendations = transform_recommendations(avg_course_ratings, courses_to_recommend)

In [None]:
print(recommendations)
        user_id  course_id    rating
0             1         24  4.653061
1             1         23  4.800000
2             1         46  4.800000
3             2         46  4.800000
4             2         24  4.653061
...         ...        ...       ...
113452    38173         56  4.661765
113453    38173         86  4.608696
113454    38174         23  4.800000
113455    38174         24  4.653061
113456    38174         46  4.800000

[113457 rows x 3 columns]

## Scheduling daily jobs

### The target table
It's time to put this table into a database so that it can be used by several products like a recommendation engine or an emailing system.

Since it's a `pandas.DataFrame` object, you can use the `.to_sql()` method. Of course, you'll have to connect to the database using the connection URI first.

In [None]:
connection_uri = "postgresql://repl:password@localhost:5432/dwh"
db_engine = sqlalchemy.create_engine(connection_uri)

def load_to_dwh(recommendations):
    recommendations.to_sql("recommendations", db_engine, if_exists="replace")

### Defining the DAG
you've completed the extract, transform and load phases separately. Now all of this is put together in one neat `etl()` function that you can discover in the console.
<br><br>
The `etl()` function extracts raw course and ratings data from relevant databases, cleans corrupt data and fills in missing value, computes average rating per course and creates recommendations based on the decision rules for producing recommendations, and finally loads the recommendations into a database.
<br><br>
As you might remember from the video, `etl()` accepts a single argument: `db_engines`. You can pass this to the task using `op_kwargs` in the `PythonOperator`. You can pass it a dictionary that will be filled in as kwargs in the callable.

In [None]:
# Define the DAG so it runs on a daily basis
dag = DAG(dag_id="recommendations",
          schedule_interval="0 0 * * *")

# Make sure `etl()` is called in the operator. Pass the correct kwargs.
task_recommendations = PythonOperator(
    task_id="recommendations_task",
    python_callable=etl,
    op_kwargs={"db_engines": db_engines},
)

### Enable the DAG
![image.png](attachment:image.png)

### Queriying the recommendations
Now that this `recommendations` table is in the data warehouse, you could also quickly join it with other tables in order to produce important features for students such as customized marketing emails, intelligent recommendations for students and other features.
<br><br>
In this exercise, you will get a taste of how the newly created `recommendations` table could be utilized by creating a function `recommendations_for_user()` which automatically gets the top recommended courses based per user ID for a particular rating threshold.

In [None]:
def recommendations_for_user(user_id, threshold=4.5):
    # Join with the courses table
    query = """
    SELECT title, rating FROM recommendations
    INNER JOIN courses ON courses.course_id = recommendations.course_id
    WHERE user_id=%(user_id)s AND rating>%(threshold)s
    ORDER BY rating DESC
    """
    # Add the threshold parameter
    predictions_df = pd.read_sql(query, db_engine, params = {"user_id": user_id, 
                                                             "threshold": threshold})
    return predictions_df.title.values

# Try the function you created
print(recommendations_for_user(12, 4.65))