# Data Analytics - Bus Data Understanding and Prep

## Audit

Author: Danning

Last Modified By: Adam

Module: COMP47630

DC:     2021-06-20

DLM:    2021-06-23

Desc:   This file contains an analysis of the historic bus data

Dict:   The Data Dictionary for the Data Set is available in Brightspace

## Table of Contents

00. Introduction

01. Exec Summary and Results

02. Modules

03. Constants

04. Ingestion

05. Cleansing


## 00. Introduction

### 00.01 Background
(here)

### 00.02 Problem Scope
(here)

### 00.03 Data
(here)

### 00.04 Approach
(here)

## 01. Exec Summary

(here)

---
# ------BEGIN---------- #

## 01. Static

### 01.01 Modules
Import all modules here

In [2]:
import sys
print(sys.executable)

/home/team10/miniconda3/bin/python3


In [3]:
####--------------------------------------
#00.Import Modules
####--------------------------------------

######---------BEGIN
#      SUPPRESS DEPRECIATION WARNINGS: Applicable to datetime_is_numeric=True
######--------END

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

######---------BEGIN
#      ML
######--------END

#import nltk as nl
import sklearn as sk
import matplotlib as mp
#import xgboost as xg
#import pymc3 as pymc
#import sympy as sym



######---------BEGIN
#      SQL/API
######--------END


#import requests as rq
#import sqlalchemy as sqla
#import pyodbc
#import cx_oracle as cx


######---------BEGIN
#     GENERAL
######--------END

import pandas as pd
import datetime as dt
import numpy as np
import sys
import os
from dask import dataframe as dask_df
#import pyspark as spk
#import json
#import time
#import socket
#import traceback as tb
#import platform
#from psutil import virtual_memory
import pickle as pck


######---------BEGIN
#     VISUALISATIONS
######--------END


import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import matplotlib.dates as mdates


#For showing plots directly in the notebook run the command below
#%matplotlib inline


###HTML Output Hiding
#Install hide input extension
#!pip install jupyter_contrib_nbextensions
#!jupyter contrib nbextension install --user
#!jupyter nbextension enable hide_input_all/main
#!jupyter nbextension enable hide_input/main
#!jupyter nbextension enable codefolding/main

#!jupyter nbextension disable hide_input_all/main
#!jupyter nbextension disable hide_input/main
#!jupyter nbextension disable codefolding/main

#Update with Filename: Run in Terminal, post completion, after hiding all cells for report
#!jupyter nbconvert --to=html bus_Data.ipynb


In [4]:
plt.style.use('ggplot')

### 01.02 Constants
Import all Constants here

In [5]:
data_year='2018'
bus_leavetimes_filepath="./data/rt_leavetimes_DB_{}.txt".format(data_year)
bus_trips_filepath="./data/rt_trips_DB_{}.txt".format(data_year)

bus_leavetimes_sep=";"
bus_trips_sep=";"

bus_leavetimes_data_dictionary={
        'DATASOURCE':['Description','category']
        ,'DAYOFSERVICE':['Description','datetime']
        ,'TRIPID':['Description','category']
        ,'PROGRNUMBER':['Description','category']
        ,'STOPPOINTID':['Description','category']
        ,'PLANNEDTIME_ARR':['Description','int64']
        ,'PLANNEDTIME_DEP':['Description','int64']
        ,'ACTUALTIME_ARR':['Description','int64']
        ,'ACTUALTIME_DEP':['Description','int64']
        ,'VEHICLEID':['Description','category']
        ,'PASSENGERS':['Description','float64']
        ,'PASSENGERSIN':['Description','float64']
        ,'PASSENGERSOUT':['Description','float64']
        ,'DISTANCE': ['Description','float64']
        ,'SUPPRESSED':['Description','category']
        ,'JUSTIFICATIONID':['Description','category']
        ,'LASTUPDATE':['Description','datetime']
        ,'NOTE': ['Description','string']
                        }


lt_datetime_columns=[column_headers for column_headers, column_desc_array in bus_leavetimes_data_dictionary.items() if column_desc_array[1] == 'datetime']
lt_categorical_columns=[column_headers for column_headers, column_desc_array in bus_leavetimes_data_dictionary.items() if column_desc_array[1] == 'category']
lt_num_columns=[column_headers for column_headers, column_desc_array in bus_leavetimes_data_dictionary.items() if column_desc_array[1] in ('numeric','int64','float64')]



bus_leavetimes_metadata_dictionary={}

for column_headers, column_desc_array in bus_leavetimes_data_dictionary.items():
    bus_leavetimes_metadata_dictionary[column_headers]=column_desc_array[1]
    
    
bus_trips_data_dictionary={
        'DATASOURCE':['Description','category']
        ,'DAYOFSERVICE':['Description','datetime']
        ,'TRIPID':['Description','category']
        ,'LINEID':['Description','category']
        ,'ROUTEID':['Description','category']
    
        ,'DIRECTION':['Description','category']
        
        ,'PLANNEDTIME_ARR':['Description','int64']
        ,'PLANNEDTIME_DEP':['Description','int64']
        ,'ACTUALTIME_ARR':['Description','int64']
        ,'ACTUALTIME_DEP':['Description','int64']
    
        ,'BASIN':['Description','category']
        ,'TENDERLOT':['Description','float64']
    
        ,'SUPPRESSED':['Description','category']
        ,'JUSTIFICATIONID':['Description','category']
        ,'LASTUPDATE':['Description','datetime']
        ,'NOTE': ['Description','string']
                        }

bus_trips_metadata_dictionary={}

for column_headers, column_desc_array in bus_trips_data_dictionary.items():
    bus_trips_metadata_dictionary[column_headers]=column_desc_array[1]

    
tp_datetime_columns=[column_headers for column_headers, column_desc_array in bus_trips_data_dictionary.items() if column_desc_array[1] == 'datetime']
tp_categorical_columns=[column_headers for column_headers, column_desc_array in bus_trips_data_dictionary.items() if column_desc_array[1] == 'category']
tp_num_columns=[column_headers for column_headers, column_desc_array in bus_trips_data_dictionary.items() if column_desc_array[1] in ('numeric','int64','float64')]




    

#Dates for File Additions if needed
today_date=dt.datetime.now()

#DateTime objects
today_year=today_date.year
today_month=today_date.month
today_day=today_date.day

#Convert to ISO Standard for Filename
str_year=str(today_date.year)

#Month should have two digits
str_month=str(today_date.month)
if len(str_month)==1:
    str_month="0{}".format(str_month)

#Day should have two digits
str_day=str(today_date.day)
if len(str_day)==1:
    str_day="0{}".format(str_day)


str_today_date="{}-{}-{}".format(str_year,str_month,str_day)

datetime_format='%d-%b-%Y %H:%M:%S'

## 02. Exploration

In [6]:
def dsk_describe(dask_df_iter):
    """A function to describe dask df"""
    
    
    desc_df=dask_df_iter.describe(include='all')  
    desc_df=desc_df.compute().T
    display(type(desc_df))
    
    return desc_df

In [7]:

def missing_check(row):
    """Highlight rows with potential missing_values"""

    #Configuration Values
    col_to_check=len(row)-1
    default_colour = 'white'
    flag_colour=''
    high_flag_colour_val='red'
    med_flag_colour_val='orange'
    low_flag_colour_val='yellow'
    val_to_check=0

    #Row length valid
    if len(row)>=col_to_check:

        #
        if row.values[col_to_check] == 'High':
            flag_colour = high_flag_colour_val
            
        elif row.values[col_to_check] == 'Medium':
            flag_colour = med_flag_colour_val
            
        elif row.values[col_to_check] == 'Low':
            flag_colour = low_flag_colour_val

        if flag_colour=='':
            colour=default_colour
        else:
            colour=flag_colour

        return ['background-color: {}'.format(colour)]*len(row.values)

    else:
        print('Row too short - Reconfigure Column Number')
        return ['background-color: {}'.format(default_colour)]*len(row.values)
    
def dt_missing_check(row):
    """Highlight rows with potential missing_values"""

    #Configuration Values
    col_to_check=len(row)-1
    default_colour = 'white'
    flag_colour=''
    high_flag_colour_val='red'
    med_flag_colour_val='orange'
    low_flag_colour_val='yellow'
    val_to_check=0

    #Row length valid
    if len(row)>=col_to_check:

        #
        if row.values[col_to_check] == 'High':
            flag_colour = high_flag_colour_val
            
        elif row.values[col_to_check] == 'Medium':
            flag_colour = med_flag_colour_val
            
        elif row.values[col_to_check] == 'Low':
            flag_colour = low_flag_colour_val

        if flag_colour=='':
            colour=default_colour
        else:
            colour=flag_colour

        return ['background-color: {}'.format(colour)]*len(row.values)

    else:
        print('Row too short - Reconfigure Column Number')
        return ['background-color: {}'.format(default_colour)]*len(row.values)
    

def ingest_data(fp,delim,data_dictionary,chunks=10000000,pandas=False):
    """A function to read in CSV Data and Validate.
    
    Memory error after 50M rows"""

    print("Inside ingest_data({},dictionary)".format(fp))
    
    def print_shape(raw_df):
        """A function to print the shape of a dataframe"""
        #row_column data
        shape_of_df=raw_df.shape
        row_count=shape_of_df[0]
        column_count=shape_of_df[1]

        #print info to user
        row_column_print_statement='Your file contains: \n{} rows x {} columns.\n\n'
        row_column_print_statement=row_column_print_statement.format(row_count,column_count)
        print(row_column_print_statement)
        header_statement='The following columns are present:\n'

        #print the headers
        for header in raw_df.columns:
            header_statement+='"{}"\n'.format(header)

        print(header_statement)
        return
        
    def verify_schema(raw_df,data_dictionary):
        """A function to validate the schema of a dataframe"""
        
        match=False
        #check if the schema is correct
        if set(raw_df.columns)==set(data_dictionary.keys()) and len(raw_df.columns)==len(data_dictionary.keys()):
            print('The columns in this data sample match the schema')
            match=True

        else:
            print('The columns in this data sample do not match the schema')
            
        return match
    
    def unique_values(df):
        """A function to print the unique values in each column"""
        
        print_statement="""\n\n-----\n\nColumn: {}\n\nValues: {}\n\n"""
        
        for column in df.columns:
            print_statement.format(column,df[column].unique())
            
        return

    def descriptive_stats(df,num_datetime=False,pandas=True):
        """A function to get descriptive stats for a dataframe"""
        
        #Format Dictionary:
        non_numeric_format_dictionary={
                              '% Populated': "{:.0f}%"
                              ,'% Missing': "{:.0f}%"
                                ,'% Top Value':"{:.0f}%"}
        
        numeric_format_dictionary={'count':"{:.0f}"
                 ,'% Populated': "{:.0f}%"
                ,'% Missing': "{:.0f}%"
                ,'% Top Value':"{:.0f}%"}




        row_count=df.shape[0]
        
        #Produce a summary table - Note the continuous features in this dataset are dates
        #Note: As of now, pandas allows using describe with dates so keeping it in, but this will be depreciated
        if pandas:
            category_summary_df=df.describe(datetime_is_numeric=num_datetime).T
            
        else:
            category_summary_df=dsk_describe(dask_df_iter=df)
            
        #Add what Percent is populated
        category_summary_df['% Populated']=100*(category_summary_df['count']/row_count)

        #Percent missing
        category_summary_df['% Missing']=100-category_summary_df['% Populated']

        #Prevelance of top vlaue
        category_summary_df['% Top Value']=100*(category_summary_df['freq']/row_count)

        #Give a warning depending on quartile of missing data - upper quartiles are high
        category_summary_df['Missing Warning']=np.select([(category_summary_df['% Missing']==0),(category_summary_df['% Missing']>0) & (category_summary_df['% Missing']<25),(category_summary_df['% Missing']>=25) & (category_summary_df['% Missing']<50),category_summary_df['% Missing']>=50],['None','Low','Medium','High'])

        category_summary_df=category_summary_df.reset_index()
        category_summary_df=category_summary_df.rename(columns={"index": "feature"})

        if num_datetime:
            print('Datetime: Numeric')
            display((category_summary_df.style.apply(missing_check, axis=1)
                                                 .format(non_numeric_format_dictionary)))
        
        else:
            print('Datetime: NotNumeric')
            display((category_summary_df.style.apply(dt_missing_check, axis=1)
                                         .format(numeric_format_dictionary)))
        
        
        return category_summary_df
        

    if pandas==True:
        #Valid Filepath
        if os.path.isfile(fp):

            raw_df = pd.DataFrame()
            chunk_count=0
            
            for chunk in pd.read_csv(fp,sep=delim,dtype=str,chunksize=chunks):
                chunk_count+=1
                print('On Chunk: {}'.format(chunk_count))
                raw_df = pd.concat([raw_df,chunk])
                
            display(raw_df)

            print_shape(raw_df)

            verify_schema(raw_df,data_dictionary)
            
            unique_values(raw_df)
            
            descriptive_stats(raw_df,num_datetime=False)

            print("\n\n\nSample Data:\n\n\n")
            display(raw_df.head())
            
            return raw_df

        #Not Valid Filepath
        else:
            print("Invalid filepath - Correct the filepath and re-ingest")

            return
        
    elif pandas==False:
        
        #Valid Filepath
        if os.path.isfile(fp):

            #read_csv - Do Not Let Pandas Manipulate the Data First - Auto-assign is more memory intensive.
            raw_df=dask_df.read_csv(fp, sep=delim)
            display(raw_df)

            print_shape(raw_df)

            verify_schema(raw_df,data_dictionary)
            
            unique_values(raw_df)

            descriptive_stats(raw_df,num_datetime=False,pandas=False)
            descriptive_stats(raw_df,num_datetime=True,pandas=False)
            
            print("\n\n\nSample Data:\n\n\n")
            display(raw_df.head())

            return raw_df

        #Not Valid Filepath
        else:
            print("Invalid filepath - Correct the filepath and re-ingest")

            return
        
    else:
        print('No opinion on using Dask or Pandas - Defaulting to Dask')
        
        #Valid Filepath
        if os.path.isfile(fp):

            #read_csv - Do Not Let Pandas Manipulate the Data First - Auto-assign is more memory intensive.
            raw_df=dask_df.read_csv(fp, sep=delim)
            display(raw_df)

            print_shape(raw_df)

            verify_schema(raw_df,data_dictionary)
            
            unique_values(raw_df)
            
            descriptive_stats(raw_df,num_datetime=False)
            descriptive_stats(raw_df,num_datetime=True)
            
            

            print("\n\n\nSample Data:\n\n\n")
            display(raw_df.head())
            
            return raw_df
        
        #Not Valid Filepath
        else:
            print("Invalid filepath - Correct the filepath and re-ingest")

            return

In [8]:
def data_convert(df,types,columnlist,dt_format):
    """A function to convert all columns in a list into the appropriate type"""
    
    print("Inside data_convert()")
    
    ###Check if empty
    if len(df.index) != 0:
        
        ##Check if datetime or other
        if types=='datetime':
       
            ###Check if 0
            if len(columnlist)>0:
                print('Converting to {}'.format(types))
                
                for column in columnlist:
                    df[column]=df[column].apply(pd.to_datetime,format=dt_format,errors='ignore')
                
            else:
                print('No need to convert to: {}'.format(types))
              
        ###Numeric type
        elif types=='category':
            ###Check if 0
            if len(columnlist)>0:
                print('Converting to {}'.format(types))
                
                for column in columnlist:
                    df[column]=df[column].astype('category')
                
            ###Nothing to convert
            else:
                print('No need to convert')
                
        ###Numeric type
        elif types=='numeric':
            
            ###Check if 0
            if len(columnlist)>0:
                print('Converting to Numerical')
                
                for column in columnlist:
                    df[column]=df[column].apply(pd.to_numeric, errors='ignore')
                
            else:
                print('No need to convert')
                
        ###Other type - e.g. Boolean, string - Dont do anything - force the above types.
        else:
            print('Unknown type')
                
    ###Empty data set          
    else:
        print("Empty dataframe")

In [9]:
def group_over_single_categories(df,categorical_columns,pdf_fn, save_fig=True):
    """A function to group over the categories"""
    
    print("Inside group_over_single_categories()")
    row_count=len(df)

    grouping_type={}
    timestamp_now=dt.datetime.timestamp(dt.datetime.now())
    
    #Dataframe is not empty, and there are categorical columns to group over:
    if df.empty==False and len(categorical_columns)>0:
        with PdfPages(pdf_fn) as pp: #lab
            column=''

            #Let's go through the category column type
            for column in categorical_columns:

                #Separator
                print('\n\n----------------------\n\n')
                agg_df=df.groupby([column]).agg({df.columns[0]:"count"})
                print(agg_df)
                agg_df=agg_df.rename(columns={df.columns[0]:'Rows'})
                agg_df=agg_df.reset_index()

                #Note: Could also do value_counts but I prefer that for graphing.
                agg_df['% Frequency']=100*(agg_df['Rows']/row_count)

                #Be explicit over what we're displaying
                print('Grouping over {} results in:\n'.format(column))

                #Display the result
                display(agg_df)
                
                print('Non Zero Data:')
                non_zero_df=agg_df[agg_df['Rows']>0]
                display(non_zero_df)
                #Graphing Section:

                if len(agg_df)<300:
                    figure = (
                                df[column]
                                  .value_counts(dropna=True, normalize=True)
                                  .plot(kind='bar'
                                        ,title='Count of values for {}'.format(column)
                                        , xlabel='Field Values'
                                        , ylabel='Count of Values'
                                        , figsize=(35,35)
                                       )
                     )
                    #This grid style is from the sample Lab5 as I like how it looks
                    plt.ylim([0,1])
                    plt.grid(b=True, which='major', color='#666666', linestyle='-')
                    plt.setp(figure.get_xticklabels(), ha="right", rotation=0)
                    plt.minorticks_on()
                    plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
                    plt.legend(loc='upper left', bbox_to_anchor=(1,1), ncol=1)
                    plt.show()
                    grouping_type[column]=agg_df

                    if save_fig:
                        pp.savefig(figure.get_figure())
            else:
                print('Too Many Categories to plot')


                





    
    return grouping_type

def group_over_multi_categories(df,categorical_columns,pdf_fn,save_output=False,save_fig=False):
    """A function to group over all pairs of categories
    
    Warning: This can be memory intensive as we have (columnCount)C(2) pairings, so only run this if your device is able!"""
    
    print("Inside group_over_multi_categories()")
    row_count=len(df)
    grouping_type={}
    timestamp_now=dt.datetime.timestamp(dt.datetime.now())
    
    #Try run this
    try:
    
        #Dataframe is not empty, and there are categorical columns to group over:
        if df.empty==False and len(categorical_columns)>0:
            with PdfPages(pdf_fn) as pp:
                column=''
                second_column=''
                
                #Let's go through the category column type
                for column in categorical_columns:

                    #Second index, n^2
                    for second_column in categorical_columns:
                        multi_column=[column]


                        #Create a key to access - pipe delimited as columns contain _
                        grouping_key="{}|{}"

                        #No point in grouping the same column twice
                        if second_column!=column and  column not in ('TRIPID','DATASOURCE') and second_column not in ('TRIPID','DATASOURCE'):
                            multi_column+=[second_column]
                            grouping_key=grouping_key.format(column,second_column)

                            #Separator
                            print('\n\n----------------------\n\n')
                            agg_df=df.groupby(multi_column).agg({df.columns[0]:"count"})
                            agg_df=agg_df.rename(columns={df.columns[0]:'Rows'})
                            agg_df['% Frequency']=100*(agg_df['Rows']/row_count)
                            agg_df=agg_df.reset_index()

                            #Be explicit over what we're displaying
                            print('Grouping over {} results in:\n'.format(grouping_key))

                            #Display the result
                            display(agg_df)
                            
                            print('Non Zero Data:')
                            non_zero_df=agg_df[agg_df['Rows']>0]
                            display(non_zero_df)
                            
                            #Graph
                            
                            if len(agg_df)<50000:
                                figure = (
                                            (df[multi_column]
                                                  .dropna()
                                                  .value_counts(normalize=True)
                                                  .reset_index()
                                                  .pivot_table(index=column,columns=second_column)
                                                  .fillna(0))[0]
                                                          .plot(kind='bar'
                                                            , stacked=True
                                                            , title='Count of values for {} vs {}'.format(second_column,column)
                                                            , xlabel='Field Values'
                                                            , ylabel='Count of Values'
                                                            , figsize=(35,35)


                                                               )
                                         )

                                #This grid style is from the sample Lab5 as I like how it looks
                                plt.ylim([0,1])
                                plt.grid(b=True, which='major', color='#666666', linestyle='-')
                                plt.setp(figure.get_xticklabels(), ha="right", rotation=0)
                                plt.minorticks_on()
                                plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
                                plt.legend(loc='upper left', bbox_to_anchor=(1,1), ncol=1)
                                plt.show()



                                if save_fig:
                                    pp.savefig(figure.get_figure())

                                #Only save if explicitly passed - This could kill your memory.
                            if save_output:
                                grouping_type[grouping_key]=agg_df

                        
    #Catch exceptions
    except Exception as exc:
        print("Function exception:\n")
        #check exception is memory error
        if exc==MemoryError:
            print("Sorry, your device is not able to run this function as you have hit a memory limit")
            
        print(exc)
        

    return grouping_type


def cat_missing_check_cleanse(row):
    """Highlight rows with potential missing_values"""

    #Configuration Values
    col_to_check=8
    default_colour = 'green'
    flag_colour=''
    high_flag_colour_val='red'
    med_flag_colour_val='orange'
    low_flag_colour_val='yellow'
    val_to_check=0

    #Row length valid
    if len(row)>=col_to_check:

        #
        if row.values[col_to_check] == 'High':
            flag_colour = high_flag_colour_val
            
        elif row.values[col_to_check] == 'Medium':
            flag_colour = med_flag_colour_val
            
        elif row.values[col_to_check] == 'Low':
            flag_colour = low_flag_colour_val

        if flag_colour=='':
            colour=default_colour
        else:
            colour=flag_colour

        return ['background-color: {}'.format(colour)]*len(row.values)

    else:
        print('Row too short - Reconfigure Column Number')
        return ['background-color: {}'.format(default_colour)]*len(row.values)
    
def stacked_group_over_multi_categories(df,categorical_columns,pdf_fn,save_output=False,save_fig=False):
    """A function to group over all pairs of categories
    
    Warning: This can be memory intensive as we have (columnCount)C(2) pairings, so only run this if your device is able!"""
    
    print("Inside group_over_multi_categories()")
    row_count=len(df)
    grouping_type={}
    timestamp_now=dt.datetime.timestamp(dt.datetime.now())
    
    #Try run this
    try:
    
        #Dataframe is not empty, and there are categorical columns to group over:
        if df.empty==False and len(categorical_columns)>0:
            with PdfPages(pdf_fn) as pp:
                column=''
                second_column=''
                
                #Let's go through the category column type
                for column in categorical_columns:

                    #Second index, n^2
                    for second_column in categorical_columns:
                        multi_column=[column]


                        #Create a key to access - pipe delimited as columns contain _
                        grouping_key="{}|{}"

                        #No point in grouping the same column twice
                        if second_column!=column:
                            multi_column+=[second_column]
                            grouping_key=grouping_key.format(column,second_column)

                            #Separator
                            print('\n\n----------------------\n\n')
                            agg_df=df.groupby(multi_column).agg({df.columns[0]:"count"})
                            agg_df=agg_df.rename(columns={df.columns[0]:'Rows'})
                            agg_df['% Frequency']=100*(agg_df['Rows']/row_count)
                            
                            

                            #Be explicit over what we're displaying
                            print('Grouping over {} results in:\n'.format(grouping_key))

                            #Display the result
                            display(agg_df)
                            
                            print('Non Zero Data:')
                            non_zero_df=agg_df[agg_df['Rows']>0]
                            display(non_zero_df)
                            
                            agg_df=agg_df.reset_index()

                            sagg_df=(
                                    df
                                    .groupby([column])
                                    .agg({df.columns[0]:"count"})
                                    .reset_index()
                                    .rename(columns={df.columns[0]:'TotalRows'})
                                    )

                            join_df=agg_df.merge(sagg_df,left_on=column,right_on=column,suffixes=('_subbed','_group'))
                            join_df['% Stacked']=join_df['Rows']/join_df['TotalRows']

                            figure=((join_df
                                  .pivot_table(index=column,columns=second_column,values='% Stacked')
                                  .fillna(0))

                                          .plot(kind='bar'
                                            , stacked=True
                                            , title='Distribution of values for {} vs {}'.format(second_column,column)
                                            , xlabel='Field Values'
                                            , ylabel='Makeup of Values'
                                            , figsize=(35,35)

                            ))

                            #This grid style is from the sample Lab5 as I like how it looks
                            plt.ylim([0,1])
                            plt.grid(b=True, which='major', color='#666666', linestyle='-')
                            plt.setp(figure.get_xticklabels(), ha="right", rotation=0)
                            plt.minorticks_on()
                            plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
                            plt.legend(loc='upper left', bbox_to_anchor=(1,1), ncol=1)
                            plt.show()

                            

                            if save_fig:
                                pp.savefig(figure.get_figure())

                            #Only save if explicitly passed - This could kill your memory.
                            if save_output:
                                grouping_type[grouping_key]=agg_df

                        
    #Catch exceptions
    except Exception as exc:
        print("Function exception:\n")
        #check exception is memory error
        if exc==MemoryError:
            print("Sorry, your device is not able to run this function as you have hit a memory limit")
            
        print(exc)
        

    return grouping_type


def stacked_group_over_target_categories(df,categorical_columns,pdf_fn,save_output=False,save_fig=False):
    """A function to group over all pairs of categories and the target death_yn
    
    Warning: This can be memory intensive as we have so only run this if your device is able!"""
    
    print("Inside group_over_multi_categories()")
    row_count=len(df)
    grouping_type={}
    timestamp_now=dt.datetime.timestamp(dt.datetime.now())
    
    #Try run this
    try:
    
        #Dataframe is not empty, and there are categorical columns to group over:
        if df.empty==False and len(categorical_columns)>0:
            with PdfPages(pdf_fn) as pp:
                column=''
                second_column=''
                
                #Let's go through the category column type
                for column in categorical_columns:

                    #Second index, n^2
                    for second_column in ['death_yn']:
                        multi_column=[column]


                        #Create a key to access - pipe delimited as columns contain _
                        grouping_key="{}|{}"

                        #No point in grouping the same column twice
                        if second_column!=column:
                            multi_column+=[second_column]
                            grouping_key=grouping_key.format(column,second_column)

                            #Separator
                            print('\n\n----------------------\n\n')
                            agg_df=df.groupby(multi_column).agg({df.columns[0]:"count"})
                            agg_df=agg_df.reset_index()
                            agg_df=agg_df.rename(columns={df.columns[0]:'Rows'})
                            agg_df['% Frequency']=100*(agg_df['Rows']/row_count)

                            #Be explicit over what we're displaying
                            print('Grouping over {} results in:\n'.format(grouping_key))

                            #Display the result
                            display(agg_df)
                            agg_df=agg_df.reset_index()

                            sagg_df=(
                                    df
                                    .groupby([column])
                                    .agg({df.columns[0]:"count"})
                                    .reset_index()
                                    .rename(columns={df.columns[0]:'TotalRows'})
                                    )

                            join_df=agg_df.merge(sagg_df,left_on=column,right_on=column,suffixes=('_subbed','_group'))
                            join_df['% Stacked']=join_df['Rows']/join_df['TotalRows']

                            display(join_df)
                            
                            if len(join_df)<10000:
                                figure=((join_df
                                      .pivot_table(index=column,columns=second_column,values='% Stacked')
                                      .fillna(0))

                                              .plot(kind='bar'
                                                , stacked=True
                                                , title='Distribution of values for {} vs {}'.format(second_column,column)
                                                , xlabel='Field Values'
                                                , ylabel='Makeup of Values'
                                                , figsize=(35,35)

                                ))

                                #This grid style is from the sample Lab5 as I like how it looks
                                plt.ylim([0,1])
                                plt.grid(b=True, which='major', color='#666666', linestyle='-')
                                plt.setp(figure.get_xticklabels(), ha="right", rotation=0)
                                plt.minorticks_on()
                                plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
                                plt.legend(loc='upper left', bbox_to_anchor=(1,1), ncol=1)
                                plt.show()



                                if save_fig:
                                    pp.savefig(figure.get_figure())

                            #Only save if explicitly passed - This could kill your memory.
                            if save_output:
                                grouping_type[grouping_key]=agg_df

                        
    #Catch exceptions
    except Exception as exc:
        print("Function exception:\n")
        #check exception is memory error
        if exc==MemoryError:
            print("Sorry, your device is not able to run this function as you have hit a memory limit")
            
        print(exc)
        

    return grouping_type

In [10]:
def group_by_column(df,groupby_columns,agg_dict):
    """A function to group by columns given and aggregate according to a dictionary.
    
    Input: df, columns to group by, agg_dictionary
    """
    
    print("inside group_by_column(df,{},{})".format(groupby_columns,agg_dict))
    
    #Possible Errors
    error_dictionary={0:'No Error'
                     ,1:'The dataframe is empty'
                     ,2:"The columns to group by is empty or not a list"
                     ,3: 'The dictionary is empty'
                     ,4: 'The dataframe does not contain the required columns'
                      ,999: 'Uncaught exception'
                     }
    
    #Set as empty
    summary_df=pd.DataFrame()
    required_columns=[]
    
    error_code=0
    
    try:

        #Dictionary is non-empty
        if len(agg_dict)>0 and type(agg_dict)==dict:

            #df not empty
            if len(df)>0:

                #List and non-empty
                if type(groupby_columns)==list and len(groupby_columns)>0:
                    required_columns=list(df.columns)+list(agg_dict.keys())

                    #Required columns found
                    if set(required_columns).issubset(set(df.columns)):

                        #begin groupby - note: not catching summary issues as they are plentiful
                        summary_df=(df
                                        .groupby(groupby_columns)
                                        .agg(agg_dict)
                                        .reset_index()
                                    )


                    #Required columns not found    
                    else:
                        error_code=4
                        error_message=error_dictionary[error_code]
                        print(error_message)

                #Not a list or empty
                else:
                    error_code=2
                    error_message=error_dictionary[error_code]
                    print(error_message)

            #df is not empty
            else:
                error_code=1
                error_message=error_dictionary[error_code]
                print(error_message)

        #empty Dictionary
        else:
            error_code=3
            error_message=error_dictionary[error_code]
            print(error_message)
            
    except Exception as e:
        error_code=999
        print("Uncaught exception: {}".format(e))
        
    return [error_code,summary_df]

### 02.02 Begin Looking at the Trips dataset

Using Pandas to ingest the dataset and display some summary statistics

In [11]:
bus_trips_df=ingest_data(fp=bus_trips_filepath
                ,delim=bus_trips_sep
                ,data_dictionary=bus_trips_data_dictionary
                ,chunks=10000000
                ,pandas=True)

Inside ingest_data(./data/rt_trips_DB_2018.txt,dictionary)
On Chunk: 1


Unnamed: 0,DATASOURCE,DAYOFSERVICE,TRIPID,LINEID,ROUTEID,DIRECTION,PLANNEDTIME_ARR,PLANNEDTIME_DEP,ACTUALTIME_ARR,ACTUALTIME_DEP,BASIN,TENDERLOT,SUPPRESSED,JUSTIFICATIONID,LASTUPDATE,NOTE
0,DB,07-FEB-18 00:00:00,6253783,68,68_80,1,87245,84600,87524,84600,BasDef,,,,28-FEB-18 12:05:11,",2967409,"
1,DB,07-FEB-18 00:00:00,6262138,25B,25B_271,2,30517,26460,32752,,BasDef,,,,28-FEB-18 12:05:11,",2580260,"
2,DB,07-FEB-18 00:00:00,6254942,45A,45A_70,2,35512,32100,36329,32082,BasDef,,,,28-FEB-18 12:05:11,",2448968,"
3,DB,07-FEB-18 00:00:00,6259460,25A,25A_273,1,57261,54420,58463,54443,BasDef,,,,28-FEB-18 12:05:11,",3094242,"
4,DB,07-FEB-18 00:00:00,6253175,14,14_15,1,85383,81600,84682,81608,BasDef,,,,28-FEB-18 12:05:11,",2526331,"
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2182632,DB,14-MAY-18 00:00:00,6765849,123,123_36,2,61560,57840,61365,57859,BasDef,,,,26-JUN-18 09:13:13,",3216350,"
2182633,DB,14-MAY-18 00:00:00,6765469,75,75_17,1,53416,48600,,48823,BasDef,,,,26-JUN-18 09:13:13,",2865284,"
2182634,DB,14-MAY-18 00:00:00,6765486,33D,33D_62,2,29460,26400,29904,,BasDef,,,,26-JUN-18 09:13:13,",3077688,"
2182635,DB,14-MAY-18 00:00:00,6764987,70,70_60,1,65277,60600,66341,,BasDef,,,,26-JUN-18 09:13:13,",3208841,"


Your file contains: 
2182637 rows x 16 columns.


The following columns are present:
"DATASOURCE"
"DAYOFSERVICE"
"TRIPID"
"LINEID"
"ROUTEID"
"DIRECTION"
"PLANNEDTIME_ARR"
"PLANNEDTIME_DEP"
"ACTUALTIME_ARR"
"ACTUALTIME_DEP"
"BASIN"
"TENDERLOT"
"SUPPRESSED"
"JUSTIFICATIONID"
"LASTUPDATE"
"NOTE"

The columns in this data sample match the schema
Datetime: NotNumeric


Unnamed: 0,feature,count,unique,top,freq,% Populated,% Missing,% Top Value,Missing Warning
0,DATASOURCE,2182637,1,DB,2182637.0,100%,0%,100%,
1,DAYOFSERVICE,2182637,360,12-FEB-18 00:00:00,7122.0,100%,0%,0%,
2,TRIPID,2182637,658964,7319095,19.0,100%,0%,0%,
3,LINEID,2182637,130,46A,76728.0,100%,0%,4%,
4,ROUTEID,2182637,588,46A_74,37182.0,100%,0%,2%,
5,DIRECTION,2182637,2,2,1100273.0,100%,0%,50%,
6,PLANNEDTIME_ARR,2182637,64461,31620,467.0,100%,0%,0%,
7,PLANNEDTIME_DEP,2182637,791,61200,26879.0,100%,0%,1%,
8,ACTUALTIME_ARR,2045430,68122,65646,69.0,94%,6%,0%,Low
9,ACTUALTIME_DEP,2018086,66771,84607,458.0,92%,8%,0%,Low





Sample Data:





Unnamed: 0,DATASOURCE,DAYOFSERVICE,TRIPID,LINEID,ROUTEID,DIRECTION,PLANNEDTIME_ARR,PLANNEDTIME_DEP,ACTUALTIME_ARR,ACTUALTIME_DEP,BASIN,TENDERLOT,SUPPRESSED,JUSTIFICATIONID,LASTUPDATE,NOTE
0,DB,07-FEB-18 00:00:00,6253783,68,68_80,1,87245,84600,87524,84600.0,BasDef,,,,28-FEB-18 12:05:11,",2967409,"
1,DB,07-FEB-18 00:00:00,6262138,25B,25B_271,2,30517,26460,32752,,BasDef,,,,28-FEB-18 12:05:11,",2580260,"
2,DB,07-FEB-18 00:00:00,6254942,45A,45A_70,2,35512,32100,36329,32082.0,BasDef,,,,28-FEB-18 12:05:11,",2448968,"
3,DB,07-FEB-18 00:00:00,6259460,25A,25A_273,1,57261,54420,58463,54443.0,BasDef,,,,28-FEB-18 12:05:11,",3094242,"
4,DB,07-FEB-18 00:00:00,6253175,14,14_15,1,85383,81600,84682,81608.0,BasDef,,,,28-FEB-18 12:05:11,",2526331,"


In [12]:
data_convert(bus_trips_df,'datetime',tp_datetime_columns,datetime_format)
data_convert(bus_trips_df,'category',tp_categorical_columns,datetime_format)
data_convert(bus_trips_df,'numeric',tp_num_columns,datetime_format)

Inside data_convert()
Converting to datetime
Inside data_convert()
Converting to category
Inside data_convert()
Converting to Numerical


In [None]:
group_over_single_categories(bus_trips_df,categorical_columns=tp_categorical_columns,pdf_fn="./data/rt_trips_single_DB_{}.pdf".format(today_date), save_fig=True)

In [None]:
group_over_multi_categories(bus_trips_df,categorical_columns=tp_categorical_columns,pdf_fn="./data/rt_multi_trips_DB_{}.pdf".format(today_date), save_fig=True)

### 02.02 Looking at the Leavetime dataset

I will use dask to look at the dataset as it can load more things and is supposed to be faster for larger datasets.

In [42]:
bus_leavetimes_df=ingest_data(fp=bus_leavetimes_filepath
                ,delim=bus_leavetimes_sep
                ,data_dictionary=bus_leavetimes_data_dictionary
                ,chunks=10000000
                ,pandas=False)

Inside ingest_data(./data/rt_leavetimes_DB_2018.txt,dictionary)


Unnamed: 0_level_0,DATASOURCE,DAYOFSERVICE,TRIPID,PROGRNUMBER,STOPPOINTID,PLANNEDTIME_ARR,PLANNEDTIME_DEP,ACTUALTIME_ARR,ACTUALTIME_DEP,VEHICLEID,PASSENGERS,PASSENGERSIN,PASSENGERSOUT,DISTANCE,SUPPRESSED,JUSTIFICATIONID,LASTUPDATE,NOTE
npartitions=337,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,object,object,int64,int64,int64,int64,int64,int64,int64,int64,float64,float64,float64,float64,float64,float64,object,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Your file contains: 
Delayed('int-28d3a588-2053-4943-9a62-69b6f65f0dbd') rows x 18 columns.


The following columns are present:
"DATASOURCE"
"DAYOFSERVICE"
"TRIPID"
"PROGRNUMBER"
"STOPPOINTID"
"PLANNEDTIME_ARR"
"PLANNEDTIME_DEP"
"ACTUALTIME_ARR"
"ACTUALTIME_DEP"
"VEHICLEID"
"PASSENGERS"
"PASSENGERSIN"
"PASSENGERSOUT"
"DISTANCE"
"SUPPRESSED"
"JUSTIFICATIONID"
"LASTUPDATE"
"NOTE"

The columns in this data sample match the schema


ValueError: No non-trivial arrays found

Changing the datatypes of the dataset

In [21]:
data_convert(bus_leavetimes_df,'datetime',tp_datetime_columns,datetime_format)
data_convert(bus_leavetimes_df,'category',tp_categorical_columns,datetime_format)
data_convert(bus_leavetimes_df,'numeric',tp_num_columns,datetime_format)

NameError: name 'bus_leavetimes_df' is not defined

We can perform basic tasks to look at the dataset as a whole, so that we can look at the dataset as a whole

Looking for the max value and the min value will show us if there are extreme anomilies.

In [14]:
# find the max values of the dataset
%time bus_leavetimes_df.max().compute()

CPU times: user 6min 30s, sys: 45.4 s, total: 7min 15s
Wall time: 3min 3s


DATASOURCE                           DB
DAYOFSERVICE         31-OCT-18 00:00:00
TRIPID                          8592207
PROGRNUMBER                         109
STOPPOINTID                        7692
PLANNEDTIME_ARR                   91680
PLANNEDTIME_DEP                   91680
ACTUALTIME_ARR                    97177
ACTUALTIME_DEP                    97177
VEHICLEID                       3394131
PASSENGERS                         None
PASSENGERSIN                       None
PASSENGERSOUT                      None
DISTANCE                           None
SUPPRESSED                          1.0
JUSTIFICATIONID    484981386680422208.0
LASTUPDATE           31-JAN-18 21:17:42
NOTE                               None
dtype: object

In [None]:
# find the minimum values of the dataset
%time bus_leavetimes_df.min().compute()

In [15]:
bus_leavetimes_df['DATASOURCE'].unique()

Dask Series Structure:
npartitions=1
    object
       ...
Name: DATASOURCE, dtype: object
Dask Name: unique-agg, 1061 tasks

In [11]:
static_file_dict={'Data':bus_trips_df}

for key, value in static_file_dict.items():
    print(key)
    all_print=''
    print_statement="'{}':'{}'\n"
    
    for rkey, rvalue in value.dtypes.apply(lambda x: x.name).to_dict().items():
        all_print+=print_statement.format(rkey,rvalue)
    
    print(all_print)
        

Data
'DATASOURCE':'category'
'DAYOFSERVICE':'object'
'TRIPID':'category'
'LINEID':'category'
'ROUTEID':'category'
'DIRECTION':'category'
'PLANNEDTIME_ARR':'int64'
'PLANNEDTIME_DEP':'int64'
'ACTUALTIME_ARR':'float64'
'ACTUALTIME_DEP':'float64'
'BASIN':'category'
'TENDERLOT':'float64'
'SUPPRESSED':'category'
'JUSTIFICATIONID':'category'
'LASTUPDATE':'object'
'NOTE':'object'



In [30]:
! python -m pip install --upgrade dask
! python -m pip install fsspec
! python -m pip install --upgrade s3fs

Collecting s3fs
  Downloading s3fs-2021.6.1-py3-none-any.whl (25 kB)
Collecting aiobotocore>=1.0.1
  Downloading aiobotocore-1.3.1.tar.gz (48 kB)
[K     |████████████████████████████████| 48 kB 12.0 MB/s eta 0:00:01
[?25hCollecting botocore<1.20.50,>=1.20.49
  Downloading botocore-1.20.49-py2.py3-none-any.whl (7.4 MB)
[K     |████████████████████████████████| 7.4 MB 15.1 MB/s eta 0:00:01
[?25hCollecting aiohttp>=3.3.1
  Downloading aiohttp-3.7.4.post0-cp39-cp39-manylinux2014_x86_64.whl (1.4 MB)
[K     |████████████████████████████████| 1.4 MB 46.5 MB/s eta 0:00:01
[?25hCollecting wrapt>=1.10.10
  Using cached wrapt-1.12.1.tar.gz (27 kB)
Collecting aioitertools>=0.5.1
  Downloading aioitertools-0.7.1-py3-none-any.whl (20 kB)
Collecting async-timeout<4.0,>=3.0
  Downloading async_timeout-3.0.1-py3-none-any.whl (8.2 kB)
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.6.3-cp39-cp39-manylinux2014_x86_64.whl (315 kB)
[K     |████████████████████████████████| 315 kB 55.4 MB/s eta 0:00:0