## Case Study - Data ingestion

### Objectives

1. Given a business opportunity gather relevant data from multiple sources
2. Clean the gathered data
3. Create a script that gathers and cleans the data

**The following files are needed to complete this case study**

* this notebook
* aavail-customers.db
* aavail-steams.csv

The data you will be sourcing from is contained in two sources.

1. A database ([SQLite](https://www.sqlite.org/index.html)) of `customer` data
2. A [CSV file](https://en.wikipedia.org/wiki/Comma-separated_values) of `stream` level data

   >You will create a simple data pipeline that
   (1) simplifies the data for future analysis and 
   (2) performs quality assurance checks.

The process of building *the data ingestion pipeline* entails extracting data, transforming it, and loading into an appropriate data storage technology.  When constructing a pipeline it is important to keep in mind that they generally works in batches.  Data may be compiled during the day and the batch could be processed during the night.  The data pipeline may also be optimized to execute as a streaming computation that is, every event is handled as it occurs.

### PART 1: Gathering the data

In [1]:
# Imports needed for this case study
import os
import pandas as pd
import numpy as np
import scipy.stats as stats
from datetime import datetime
import sqlite3

Much of the data exists in a database.  You can connect to is using the `sqlite3` package with the following function.  Note that is is good practice to wrap your connect functions in a `try-except` statement to cleanly handle exceptions.

In [2]:
def connect_db(file_path):
    try:
        conn = sqlite3.connect(file_path)
        print("...successfully connected to db\n")
    except:
        print("...unsuccessful connection\n")
    return(conn)

In [3]:
# Make the connection to the database
conn = connect_db('data/aavail-customers.db')
# Print the table names
tables = [t[0] for t in conn.execute("SELECT name FROM sqlite_master WHERE type='table';")]
print(tables)

...successfully connected to db

['CUSTOMERS', 'INVOICES', 'INVOICE_ITEMS']


In [4]:
# List columns in a table
cursor = conn.execute('select * from sqlite_master')
cursor.execute("PRAGMA table_info(CUSTOMERS)")
res = cursor.fetchall()
for item in res:
    print(item[1])

generated_id
customer_id
last_name
first_name
gender
age
DOB
city
state
country


#### 1. Extract the relevant data from the DB.

Query the database and extract the following data into a Pandas DataFrame. An online viewer for .DB files is available at https://inloop.github.io/sqlite-viewer/. The `CUSTOMERS` and `INVOICES` tables both have a 'customer_id' field while the `INVOICES` and `INVOICE_ITEMS` share the 'invoice_item_id' field.

From the `CUSTOMERS` table extract:
* Customer ID
* Last name
* First name
* DOB
* City
* State
* Country
* Gender

From the `INVOICES` table extract:
* Invoice_item_id

In [5]:
## Extract data from CUSTOMERS table
query = """
SELECT cu.customer_id, cu.last_name, cu.first_name, cu.DOB,
       cu.city, cu.state, cu.gender, cu.country, inv.invoice_item_id
       FROM CUSTOMERS cu
       INNER JOIN INVOICES inv
       ON cu.customer_id = inv.customer_id;
"""

_data = [d for d in conn.execute(query)]
columns = ["customer_id","last_name","first_name","DOB","city","state","gender","country","invoice_item_id"]
df_db = pd.DataFrame(_data,columns=columns)
df_db.head()

Unnamed: 0,customer_id,last_name,first_name,DOB,city,state,gender,country,invoice_item_id
0,1,Todd,Kasen,07/30/98,Rock Hill,South Carolina,m,united_states,3
1,2,Garza,Ensley,04/12/89,singapore,,f,singapore,3
2,3,Carey,Lillian,09/12/97,Auburn,Alabama,f,united_states,2
3,4,Christensen,Beau,01/28/99,Hempstead,New York,m,united_states,3
4,5,Gibson,Ernesto,03/23/98,singapore,,m,singapore,1


#### 2. Extract the relevant data from the CSV file.

For each ```customer_id``` determine if a customer has stopped their subscription or not and save it in a data container.

In [6]:
df_streams = pd.read_csv(r"data/aavail-streams.csv")
print(df_streams.shape)
df_streams.head()

(19032, 4)


Unnamed: 0,customer_id,stream_id,date,subscription_stopped
0,1,1356,2018-12-01,0
1,1,1540,2018-12-04,0
2,1,1395,2018-12-11,0
3,1,1255,2018-12-22,0
4,1,1697,2018-12-23,0


In [7]:
customer_ids = df_streams['customer_id'].values
unique_ids = np.unique(df_streams['customer_id'].values)

streams = df_streams['subscription_stopped'].values

# For each unique ID (uid) determine subscription status
has_churned = [0 if streams[customer_ids==uid].max() > 0 else 1 for uid in unique_ids]
df_churn = pd.DataFrame({"customer_id": unique_ids, "is_subscriber": has_churned})
df_churn.head()

Unnamed: 0,customer_id,is_subscriber
0,1,1
1,2,0
2,3,1
3,4,1
4,5,1


### PART 2: Cleaning the data

Sometimes it is known in advance which types of data integrity issues to expect, but other times it is during the Exploratory Data Analysis (EDA) process that these issues are identified.  After extracting data it is important to include checks for quality assurance even on the first pass through the AI workflow.  Here, we will combine the data into a single structure and provide a couple checks for quality assurance.

#### 3. Implement checks for quality assurance.

1. Remove any repeat customers based on ```customer_id```
2. Remove stream data that do not have an associated ```stream_id```

In [8]:
print("\nCleaning Summary\n{}".format("-"*35))

duplicate_rows = df_db.duplicated()
if True in duplicate_rows:
    df_db = df_db[~duplicate_rows] 
print("Removed {} duplicate rows".format(np.where(duplicate_rows==True)[0].size))

missing_stream_ids = np.isnan(df_streams['stream_id'])
if True in missing_stream_ids:
    df_streams = df_streams[~missing_stream_ids]
print("Removed {} missing stream ids".format(np.where(missing_stream_ids==True)[0].size))

print("\nMissing Value Summary\n{}".format("-"*35))
print("\ndf_db\n{}".format("-"*15))
print(df_db.isnull().sum(axis = 0))
print("\ndf_streams\n{}".format("-"*15))
print(df_streams.isnull().sum(axis = 0))


Cleaning Summary
-----------------------------------
Removed 7 duplicate rows
Removed 0 missing stream ids

Missing Value Summary
-----------------------------------

df_db
---------------
customer_id          0
last_name            0
first_name           0
DOB                  0
city                 0
state              300
gender               0
country              0
invoice_item_id      0
dtype: int64

df_streams
---------------
customer_id             0
stream_id               0
date                    0
subscription_stopped    0
dtype: int64


#### 4: Combine the data into a single data structure

For this example, the two most convenient structures for this task are Pandas dataframes and NumPy arrays.  At a minimum ensure that your structure accommodates the following.

1. A column for `customer_id`
2. A column for `country`
3. A column for ```age``` that is created from ```DOB```
4. A column ```customer_name``` that is created from ```first_name``` and ```last_name```
5. A column to indicate churn called ```is_subscriber```
7. A column that indicates ```subscriber_type``` that comes from ```invoice_items```
6. A column to indicate the total ```num_streams```

In [9]:
# Get unique IDs
df_clean = df_churn.copy()
df_clean = df_clean[np.isin(df_clean['customer_id'].values, df_db['customer_id'].values)]
unique_ids = df_clean['customer_id'].values

In [10]:
# Ensure we are working with correctly ordered customer_ids df_db
if not np.array_equal(df_clean['customer_id'], df_db['customer_id']): 
    raise Exception("Indexes are out of order or unmatched---needs to fix")

In [11]:
## Query the db to create an invoice item map
query = """
SELECT i.invoice_item_id, i.invoice_item
FROM INVOICE_ITEMS i;
"""
## Variables for new df creation
invoice_item_map = {d[0]:d[1] for d in conn.execute(query)}
print(invoice_item_map)

{1: 'aavail_basic', 2: 'aavail_premium', 3: 'aavail_unlimited'}


The dataframe `db_df` contains 1000 rows with unique customer IDs and associated demographic data as well as the `invoice_item_id` associated with each customer ID. The dataframe `df_streams` contains the stream_ids, the date on which the stream occurred and whether the subscription was stopped for each customer_id. Note that there could be multiple rows for each customer. The dateframe `df_churn` contains customer IDs and a binary value (0/1) whether the subscription was stopped.

In [12]:
# Create the new df
df_clean['country'] = df_db['country']
df_clean['age'] = np.datetime64('today') - df_db['DOB'].astype('datetime64')
df_clean['customer_name'] = df_db['first_name'] + " " + df_db['last_name']
df_clean['subscriber_type'] = df_db['invoice_item_id'].map(invoice_item_map)
df_clean['num_streams'] = df_streams.groupby(['customer_id']).count()['stream_id'].values
# Convert age to years
df_clean['age'] = [a.astype('timedelta64[Y]').astype(int) for a in df_clean['age'].values]
df_clean.head()

Unnamed: 0,customer_id,is_subscriber,country,age,customer_name,subscriber_type,num_streams
0,1,1,united_states,21,Kasen Todd,aavail_unlimited,21
1,2,0,singapore,30,Ensley Garza,aavail_unlimited,16
2,3,1,united_states,22,Lillian Carey,aavail_premium,25
3,4,1,united_states,20,Beau Christensen,aavail_unlimited,18
4,5,1,singapore,21,Ernesto Gibson,aavail_basic,21


### PART 3: Automating the process.

To ensure that you code can be used to automate this process.  First save the dataframe (or numpy array) as a CSV file.  

#### 5: Take the initial steps towards automation.

1. Save cleaned, combined data as a CSV file.
2. From the code above create a function or class that performs all of the steps given a database file and a streams CSV file.
3. Run the function in batches and write a check to ensure you get the same result as compared to the code above.

Shown below is some code that will split your streams file into two batches. 

In [13]:
## Code to split streams.csv into batches
data_dir = "data"
df_all = pd.read_csv(os.path.join(data_dir,"aavail-streams.csv"))

half = int(round(df_all.shape[0] * 0.5))

df_part1 = df_all[:half]
df_part2 = df_all[half:]

df_part1.to_csv(os.path.join(data_dir,"aavail-streams-1.csv"),index=False)
df_part2.to_csv(os.path.join(data_dir,"aavail-streams-2.csv"),index=False)

The following cell demonstrates how to save the function as a file from within a notebook. 

In [14]:
%%writefile aavail-data-ingestor.py
#!/usr/bin/env python

import os
import sys
import getopt
import scipy.stats as stats
import pandas as pd
import numpy as np
import sqlite3

DATA_DIR = "data"

def connect_db(file_path):
    """
    Function to connect to the aavail database
    """
    try:
        conn = sqlite3.connect(file_path)
        print("...successfully connected to db")
    except Error as e:
        print("...unsuccessful connection",e)
    return(conn)

def ingest_db_data(conn):
    """
    Load and clean the db data
    """
    
    query = """
            SELECT cu.customer_id, cu.last_name, cu.first_name, cu.DOB,
            cu.city, cu.state, cu.gender, cu.country, inv.invoice_item_id
            FROM CUSTOMERS cu
            INNER JOIN INVOICES inv
            ON cu.customer_id = inv.customer_id;
            """
    _data = [d for d in conn.execute(query)]
    columns = ["customer_id","last_name","first_name","DOB","city","state","gender","country","invoice_item_id"]
    df_db = pd.DataFrame(_data,columns=columns)
    
    duplicate_rows = df_db.duplicated()
    
    if True in duplicate_rows:
        df_db = df_db[~duplicate_rows]
        df_db.reset_index()
    print("... removed {} duplicate rows in db data".format(np.where(duplicate_rows==True)[0].size))
    return(df_db)

def ingest_stream_data(file_path):
    """
    Load and clean the stream data
    """
    
    df_streams = pd.read_csv(file_path)
    
    customer_ids = df_streams['customer_id'].values
    unique_ids = np.unique(df_streams['customer_id'].values)
    streams = df_streams['subscription_stopped'].values
    has_churned = [0 if streams[customer_ids==uid].max() > 0 else 1 for uid in unique_ids]
    df_churn = pd.DataFrame({"customer_id": unique_ids, "is_subscriber": has_churned})
    df_churn.head()
    
    missing_stream_ids = np.isnan(df_streams['stream_id'])    
    if True in missing_stream_ids:
        df_streams = df_streams[~missing_stream_ids]
        df_streams.reset_index()
    print("... removed {} missing stream ids".format(np.where(missing_stream_ids==True)[0].size))
    
    return(df_streams, df_churn)

def process_dataframes(df_db, df_streams, df_churn, conn):
    """
    Add data to target csv
    """
    df_clean = df_churn.copy()
    df_db = df_db[np.isin(df_db['customer_id'].values,df_clean['customer_id'].values)]
    df_db.reset_index()
    unique_ids = df_clean['customer_id'].values

    ## ensure we are working with correctly ordered customer_ids df_db
    if not np.array_equal(df_clean['customer_id'],df_db['customer_id']):
        raise Exception("indexes are out of order or unmatched---needs to fix")

    ## query the db t create a invoice item map
    query = """
    SELECT i.invoice_item_id, i.invoice_item
    FROM INVOICE_ITEMS i;
    """
    invoice_item_map = {d[0]:d[1] for d in conn.execute(query)}
    
    ## variables for new df creation
    df_clean['country'] = df_db['country']
    df_clean['age'] = np.datetime64('today') - df_db['DOB'].astype('datetime64')
    df_clean['customer_name'] = df_db['first_name'] + " " + df_db['last_name']
    df_clean['subscriber_type'] = df_db['invoice_item_id'].map(invoice_item_map)
    df_clean['num_streams'] = df_streams.groupby(['customer_id']).count()['stream_id'].values
    # Convert age to years
    df_clean['age'] = [a.astype('timedelta64[Y]').astype(int) for a in df_clean['age'].values]
    
    return(df_clean)

def update_target(target_file, df_clean, overwrite=False):
    """
    Update line by line in case data files are large
    """

    if overwrite or not os.path.exists(target_file):
        df_clean.to_csv(target_file, index=False)   
    else:
        df_target = pd.read_csv(target_file)
        df_target.to_csv(target_file, mode='a', index=False)
        
if __name__ == "__main__":
  
    ## collect args
    arg_string = "%s -d db_filepath -s streams_filepath"%sys.argv[0]
    try:
        optlist, args = getopt.getopt(sys.argv[1:],'d:s:')
    except getopt.GetoptError:
        print(getopt.GetoptError)
        raise Exception(arg_string)

    ## handle args
    streams_file = None
    db_file = None
    for o, a in optlist:
        if o == '-d':
            db_file = a
        if o == '-s':
            streams_file = a
    streams_file = os.path.join(DATA_DIR, streams_file)
    db_file = os.path.join(DATA_DIR, db_file)
    target_file = os.path.join(DATA_DIR, "aavail-target.csv")
    
    ## make the connection to the database
    conn = connect_db(db_file)

    ## ingest data base data
    df_db = ingest_db_data(conn)
    df_streams, df_churn = ingest_stream_data(streams_file)
    df_clean = process_dataframes(df_db, df_streams, df_churn, conn)
    
    ## write
    update_target(target_file, df_clean, overwrite=False)
    print("done")

Overwriting aavail-data-ingestor.py


We can run the script just created using:

In [None]:
!python aavail-data-ingestor.py -d aavail-customers.db -s aavail-streams-1.csv
!python aavail-data-ingestor.py -d aavail-customers.db -s aavail-streams-2.csv

In [None]:
df_clean.shape