# Getting Started with Data Pipelines for ETL

Data pipelines are everywhere! More than ever, data practitioners find themselves needing to extract, transform, and load data to power the work they do. During this code-along, I'll walk through the basics of building a data pipeline using Python, `pandas`, and `sqlite`. 

Throughout the tutorial, I'll be using the "Google Play Store Apps" dataset, available in DataCamp Workspaces. The two datasets we'll be using is made available as `.csv` files, and will be transformed throughout the code-along before being loaded into a `sqlite` database.

# Extracting Data

Extracting data is almost always the first step when building a data pipelines. There are tons of shapes and sizes that data can be extracted from. Here are just a few:
- API's
- SFTP sites
- Relational databases
- NoSQL databases (columnar, document, key-value)
- Flat-files

In this code-along, we'll focus on extracting data from flat-files. A flat file might be something like a `.csv` or a `.json` file. The two files that we'll be extracting data from are the `apps_data.csv` and the `review_data.csv` file. To do this, we'll used `pandas`. Let's take a closer look!

1. After importing `pandas`, read the `apps_data.csv` DataFrame into memory. Print the head of the DataFrame.
2. Similar to before, read in the DataFrame stored in the `review_data.csv` file. Take a look at the first few rows of this DataFrame.
3. Print the column names, shape, and data types of the `apps` DataFrame.

In [1]:
# Import libraries
import pandas as pd

In [2]:
# Read the dataset into memory, and take a look at the first few rows (store as apps)
apps = pd.read_csv("apps_data.csv")
reviews = pd.read_csv("review_data.csv")

In [4]:
# Print out the head of the DataFrame
apps.head()

Unnamed: 0,App,Category,Rating,Reviews,Size,Installs,Type,Price,Content Rating,Genres,Last Updated,Current Ver,Android Ver
0,Photo Editor & Candy Camera & Grid & ScrapBook,ART_AND_DESIGN,4.1,159,19M,"10,000+",Free,0,Everyone,Art & Design,"January 7, 2018",1.0.0,4.0.3 and up
1,Coloring book moana,ART_AND_DESIGN,3.9,967,14M,"500,000+",Free,0,Everyone,Art & Design;Pretend Play,"January 15, 2018",2.0.0,4.0.3 and up
2,"U Launcher Lite – FREE Live Cool Themes, Hide ...",ART_AND_DESIGN,4.7,87510,8.7M,"5,000,000+",Free,0,Everyone,Art & Design,"August 1, 2018",1.2.4,4.0.3 and up
3,Sketch - Draw & Paint,ART_AND_DESIGN,4.5,215644,25M,"50,000,000+",Free,0,Teen,Art & Design,"June 8, 2018",Varies with device,4.2 and up
4,Pixel Draw - Number Art Coloring Book,ART_AND_DESIGN,4.3,967,2.8M,"100,000+",Free,0,Everyone,Art & Design;Creativity,"June 20, 2018",1.1,4.4 and up


In [5]:
# Print out the head of the DataFrame
reviews.head()

Unnamed: 0,App,Translated_Review,Sentiment,Sentiment_Polarity,Sentiment_Subjectivity
0,10 Best Foods for You,I like eat delicious food. That's I'm cooking ...,Positive,1.0,0.533333
1,10 Best Foods for You,This help eating healthy exercise regular basis,Positive,0.25,0.288462
2,10 Best Foods for You,,,,
3,10 Best Foods for You,Works great especially going grocery store,Positive,0.4,0.875
4,10 Best Foods for You,Best idea us,Positive,1.0,0.3


In [6]:
# Perform some basic checks (column names, number of records, types, etc)
print(apps.columns)
print(apps.shape)
print(apps.dtypes)

Index(['App', 'Category', 'Rating', 'Reviews', 'Size', 'Installs', 'Type',
       'Price', 'Content Rating', 'Genres', 'Last Updated', 'Current Ver',
       'Android Ver'],
      dtype='object')
(10841, 13)
App                object
Category           object
Rating            float64
Reviews            object
Size               object
Installs           object
Type               object
Price              object
Content Rating     object
Genres             object
Last Updated       object
Current Ver        object
Android Ver        object
dtype: object


The code above works perfectly well, but this time let's try using DRY-principles to build a function to extract data.

1. Create a function called `extract`, with a single parameter of name `file_path`.
2. Sprint the number of rows and columns in the DataFrame, as well as the data type of each column. Provide instructions about how to use the value that will eventually be returned by this function.
3. Return the variable `data`.
4. Call the `extract` function twice, once passing in the `apps_data.csv` file path, and another time with the `review_data.csv` file path. Output the first few rows of the `apps_data` DataFrame.

In [8]:
# Create a function to extract the data, and print some important information
def extract(file_path):
    # Read the file into memory
    data = pd.read_csv(file_path)
    
    # Now, print the details about the file
    print(f"Here is a little bit of information about the data stored in {file_path}:")
    print(f"\nThere are {data.shape[0]} rows and {data.shape[1]} columns in this DataFrame.")
    print("\nThe columns in this DataFrame take the following types: ")
    
    # Print the type of each column
    print(data.dtypes)
    
    # Finally, print a message before returning the DataFrame
    print(f"\nTo view the DataFrame extracted from {file_path}, display the value returned by this function!\n\n")
    
    return data

In [10]:
# Call the function (create apps_data and reviews_data)
apps_data = extract("apps_data.csv")
reviews_data = extract("review_data.csv")

# Take a peek at one of the DataFrames
apps_data

Here is a little bit of information about the data stored in apps_data.csv:

There are 10841 rows and 13 columns in this DataFrame.

The columns in this DataFrame take the following types: 
App                object
Category           object
Rating            float64
Reviews            object
Size               object
Installs           object
Type               object
Price              object
Content Rating     object
Genres             object
Last Updated       object
Current Ver        object
Android Ver        object
dtype: object

To view the DataFrame extracted from apps_data.csv, display the value returned by this function!


Here is a little bit of information about the data stored in review_data.csv:

There are 64295 rows and 5 columns in this DataFrame.

The columns in this DataFrame take the following types: 
App                        object
Translated_Review          object
Sentiment                  object
Sentiment_Polarity        float64
Sentiment_Subjectivity    float

Unnamed: 0,App,Category,Rating,Reviews,Size,Installs,Type,Price,Content Rating,Genres,Last Updated,Current Ver,Android Ver
0,Photo Editor & Candy Camera & Grid & ScrapBook,ART_AND_DESIGN,4.1,159,19M,"10,000+",Free,0,Everyone,Art & Design,"January 7, 2018",1.0.0,4.0.3 and up
1,Coloring book moana,ART_AND_DESIGN,3.9,967,14M,"500,000+",Free,0,Everyone,Art & Design;Pretend Play,"January 15, 2018",2.0.0,4.0.3 and up
2,"U Launcher Lite – FREE Live Cool Themes, Hide ...",ART_AND_DESIGN,4.7,87510,8.7M,"5,000,000+",Free,0,Everyone,Art & Design,"August 1, 2018",1.2.4,4.0.3 and up
3,Sketch - Draw & Paint,ART_AND_DESIGN,4.5,215644,25M,"50,000,000+",Free,0,Teen,Art & Design,"June 8, 2018",Varies with device,4.2 and up
4,Pixel Draw - Number Art Coloring Book,ART_AND_DESIGN,4.3,967,2.8M,"100,000+",Free,0,Everyone,Art & Design;Creativity,"June 20, 2018",1.1,4.4 and up
...,...,...,...,...,...,...,...,...,...,...,...,...,...
10836,Sya9a Maroc - FR,FAMILY,4.5,38,53M,"5,000+",Free,0,Everyone,Education,"July 25, 2017",1.48,4.1 and up
10837,Fr. Mike Schmitz Audio Teachings,FAMILY,5.0,4,3.6M,100+,Free,0,Everyone,Education,"July 6, 2018",1.0,4.1 and up
10838,Parkinson Exercices FR,MEDICAL,,3,9.5M,"1,000+",Free,0,Everyone,Medical,"January 20, 2017",1.0,2.2 and up
10839,The SCP Foundation DB fr nn5n,BOOKS_AND_REFERENCE,4.5,114,Varies with device,"1,000+",Free,0,Mature 17+,Books & Reference,"January 19, 2015",Varies with device,Varies with device


# Transforming Data

We're interested in working with the apps and their corresponding reviews in the`"FOOD_AND_DRINK"` category. We'd like to do the following:

1. Define a function with name `transform`. This function will have five parameters; `apps`, `review`, `category`, `min_rating`, and `min_reviews`.
2. Drop duplicates from both DataFrames.
3. For each of the apps in the desired category, find the number of positive reviews, and filter the columns.
4. Join this back to the `apps` dataset, only keeping the following columns:
    - `App`
    - `Rating`
    - `Reviews`
    - `Installs`
    - `Sentiment_Polarity`
5. Filter out all records that don't have at least the `min_rating`, and more than the `min_reviews`.
6. Order by the rating and number of installs, both in descending order.
7. Call the function for the `"FOOD_AND_DRINK"` category, with a minimum average rating of 4 stars, and at least 1000 reviews.

Alright, let's give it a shot!


In [11]:
# Define a function to transform data
def transform(apps, reviews, category, min_rating, min_reviews):
    # Print statement for observability
    print(f"Transforming data to curate a dataset with all {category} apps and their "
          f"corresponding reviews with a rating of at least {min_rating} and "
          f"{min_reviews} reviews\n")
    
    # Drop any duplicates from both DataFrames (also have the option to do this in-place)
    reviews = reviews.drop_duplicates()
    apps = apps.drop_duplicates(["App"])
    
    # Find all of the apps and reviews in the food and drink category
    subset_apps = apps.loc[apps["Category"] == category, :]
    subset_reviews = reviews.loc[reviews["App"].isin(subset_apps["App"]), ["App", "Sentiment_Polarity"]]
    
    # Aggregate the subset_reviews DataFrame
    aggregated_reviews = subset_reviews.groupby(by="App").mean()
    
    # Join it back to the subset_apps table
    joined_apps_reviews = subset_apps.join(aggregated_reviews, on="App", how="left")
    
    # Keep only the needed columns
    filtered_apps_reviews = joined_apps_reviews.loc[:, ["App", "Rating", "Reviews", "Installs", "Sentiment_Polarity"]]
    
    # Convert reviews, keep only values with an average rating of at least 4 stars, and at least 1000 reviews
    filtered_apps_reviews = filtered_apps_reviews.astype({"Reviews": "int32"})
    top_apps = filtered_apps_reviews.loc[(filtered_apps_reviews["Rating"] > min_rating) & (filtered_apps_reviews["Reviews"] > min_reviews), :]
    
    # Sort the top apps, replace NaN with 0, reset the index (drop, inplace)
    top_apps.sort_values(by=["Rating", "Reviews"], ascending=False, inplace=True)
    top_apps.reset_index(drop=True, inplace=True)
     
    # Persist this DataFrame as top_apps.csv file
    top_apps.to_csv("top_apps.csv")
    
    print(f"The transformed DataFrame, which includes {top_apps.shape[0]} rows "
          f"and {top_apps.shape[1]} columns has been persisted, and will now be "
          f"returned")
    
    # Return the transformed DataFrame
    return top_apps


# Call the function
top_apps_data = transform(
    apps=apps_data,
    reviews=reviews_data,
    category="FOOD_AND_DRINK",
    min_rating=4.0,
    min_reviews=1000
)

# Show
top_apps_data


Transforming data to curate a dataset with all FOOD_AND_DRINK apps and their corresponding reviews with a rating of at least 4.0 and 1000 reviews

The transformed DataFrame, which includes 54 rows and 5 columns has been persisted, and will now be returned


Unnamed: 0,App,Rating,Reviews,Installs,Sentiment_Polarity
0,SarashpazPapion (Cooking with Chef Bowls),4.8,1250,"50,000+",
1,Domino's Pizza USA,4.7,1032935,"10,000,000+",0.226971
2,Tastely,4.7,611136,"10,000,000+",
3,Delicious Recipes,4.7,129737,"1,000,000+",
4,BeyondMenu Food Delivery,4.7,51517,"1,000,000+",0.408743
5,Recipes Pastries and homemade pies More than 5...,4.7,14065,"500,000+",
6,Pastry & Cooking (Without Net),4.7,6118,"1,000,000+",
7,Simple Recipes,4.7,3803,"500,000+",
8,Easy Recipes,4.7,2707,"100,000+",0.284777
9,OpenTable: Restaurants Near Me,4.6,90242,"5,000,000+",


# Loading Data

Next, we'd like to load the transformed dataset into a SQL database. We'll be using `pandas` along with `sqlite` to do just that!

1. After importing `sqlite3`, create a function with name `load`. The function will have four parameters; `dataframe`, `database_name`, `table_name`.
2. Connect to the database using the `connect()` function.
3. Write the DataFrame to the provided table name. Replace the table if it exists, and do not include the index.
4. Now, we'll validate that the data was loaded correctly. Use the `read_sql()` function to return the DataFrame that was just loaded.
5. Assert that the number of rows and columns match in the original and loaded DataFrame.
6. Return the DataFrame read from the `sqlite` database.
7. Call the function for the `top_apps_data` DataFrame, for the `"market_research"` database and the `top_apps` table.

In [12]:
import sqlite3

# Now, create a function to do this
def load(dataframe, database_name, table_name):
    # Create a connection object
    con = sqlite3.connect(database_name)
    
    # Write the data to the specified table (table_name)
    dataframe.to_sql(name=table_name, con=con, if_exists="replace", index=False)
    print("Original DataFrame has been loaded to sqlite\n")
    
    # Read the data, and return the result (it is to be used)
    loaded_dataframe = pd.read_sql(sql=f"SELECT * FROM {table_name}", con=con)
    print("The loaded DataFrame has been read from sqlite for validation\n")
    
    try:
        assert dataframe.shape == loaded_dataframe.shape
        print(f"Success! The data in the {table_name} table have successfully been "
              f"loaded and validated")

    except AssertionError:
        print("DataFrame shape is not consistent before and after loading. Take a closer look!")


# Call the function
load(
    dataframe=top_apps_data,
    database_name="market_research",
    table_name="top_apps"
)
    

Original DataFrame has been loaded to sqlite

The loaded DataFrame has been read from sqlite for validation

Success! The data in the top_apps table have successfully been loaded and validated


# Running the Pipeline

Now that our functions have been defined and tested, we'll run this pipeline end-to-end!

1. For verbosity, import `pandas` and `sqlite3`.
2. Extract data from the `apps_data.csv` and `review_data.csv` functions.
3. Transform the data by passing in the following:
    - `category="FOOD_AND_DRINK"`
    - `min_rating=4.0`
    - `min_reviews=1000`
4. Load the transformed DataFrame to the `top_apps` table in the `market_research` database.
5. Check out the output!



In [20]:
# Import modules
import pandas as pd
import sqlite3

# Extract the data
apps_data = extract("apps_data.csv")
reviews_data = extract("review_data.csv")

Here is a little bit of information about the data stored in apps_data.csv:

There are 10841 rows and 13 columns in this DataFrame.

The columns in this DataFrame take the following types: 
App                object
Category           object
Rating            float64
Reviews            object
Size               object
Installs           object
Type               object
Price              object
Content Rating     object
Genres             object
Last Updated       object
Current Ver        object
Android Ver        object
dtype: object

To view the DataFrame extracted from apps_data.csv, display the value returned by this function!


Here is a little bit of information about the data stored in review_data.csv:

There are 64295 rows and 5 columns in this DataFrame.

The columns in this DataFrame take the following types: 
App                        object
Translated_Review          object
Sentiment                  object
Sentiment_Polarity        float64
Sentiment_Subjectivity    float

In [21]:
# Transform the data
top_apps_data = transform(
    apps=apps_data,
    reviews=reviews_data,
    category="FOOD_AND_DRINK",
    min_rating=3.0,
    min_reviews=1000
)

Transforming data to curate a dataset with all FOOD_AND_DRINK apps and their corresponding reviews with a rating of at least 3.0 and 1000 reviews

The transformed DataFrame, which includes 69 rows and 5 columns has been persisted, and will now be returned


In [22]:
# Load the data
load(
    dataframe=top_apps_data,
    database_name="market_research",
    table_name="top_apps"
)


Original DataFrame has been loaded to sqlite

The loaded DataFrame has been read from sqlite for validation

Success! The data in the top_apps table have successfully been loaded and validated
