This is based on the idea that users can initialize the database with 3 files (a routes file, a driver file, and a driver assignment file).  However, once these files are loaded, the user cannot append to the data that we are storing through an additional csv file.  For example, if a user initialized the DB with ./routes_file_1, they cannot also append to the routes with a ./routes_file_2.  This is important for a number of reasons that I don't want to write out but we can talk about them if we want.  We could allow for a "clear DB" button and allow the user to re-initialize though.

We probably want to refactor into using parquet files like here: https://stackoverflow.com/questions/61920105/dask-applying-a-function-over-a-large-dataframe-which-is-more-than-ram

Not a huge change and the current code should pretty much work when we do that. Waiting to do this though because some of the operations that we need to check for depend on the whole dataframe and I'm still learning Dask. 

Some of the implementation could be done a lot more efficiently, but for now I left it this way explicitly so that it is clear how it operates.  We can adjust later.



**Don't use dask.compute()  Puts everything in main memory!**

**Use pip to install dask if you want to install yourself.  Had issues with conda**

**I added a .yml file so that we can just use the same conda environment**

# Use dask or sframe

In [1]:
import multiprocessing
n_cpus = multiprocessing.cpu_count()
n_cpus

16

In [2]:
import dask.dataframe as dd
import dask
import numpy as np
from pathlib import Path
import os
import math
import pandas as pd

In [53]:
class DaskReader():
    ''' Base class for reading csv files with Dask.  

    '''
    def __init__(self, csv_path):
        # Verify that the path extension is .csv
        self._verify_csv_format(csv_path)
        self.csv_path = csv_path
        
        self.state_codes = ["AL", "AK", "AZ", "AR", "CA", "CO", "CT",
                            "DC", "DE", "FL", "GA", "HI", "ID", "IL", 
                            "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
                            "MA", "MI", "MN", "MS", "MO", "MT", "NE",
                            "NV", "NH", "NJ", "NM", "NY", "NC", "ND",
                            "OH", "OK", "OR", "PA", "RI", "SC", "SD", 
                            "TN", "TX", "UT", "VT", "VA", "WA", "WV", 
                            "WI", "WY"]
        
    def _verify_csv_format(self, file):
        if Path(file).suffix == '.csv':
            return True
        return False

    def _is_nan(self, x):
        if x != x or x is None:
            return True
        return False
    
    def _verify_str_len(self, x, min_len, max_len):
        if self._is_nan(x):
            return False
        
        if len(x) < min_len or len(x) > max_len:
            return False
        
        return True
    
    def _verify_id(self, x):
        if self._is_nan(x):
            return False
        
        if not x.isalnum() or len(x) != 5:
            return False
        
        return True
    
    def _verify_int_value(self, x, min_x, max_x):
        # make sure is an int
        if not x.isdigit():
            return False
        
        int_x = int(x)
        if int_x < min_x or int_x > max_x:
            return False
        return True
    
    def _is_empty(self, df):
        if len(df.index) == 0:
            return True
        return False
                
    def _read_df(self, file_type, names=None):
        '''Get a dask dataframe from the file
        
        Parameters
        ----------
        names : list of str names for columns in a csv file.
            This assumes that the correct amount of names is passed into this 
            function so that it matches up with the csv file. This also assumes
            that the csv files do not have a header with column names initially.
            By saving as a group of parquet files, we keep the operations from
            causing memory issues.
        '''
        df = dd.read_csv(self.csv_path, header=None, dtype='str', names=names)
        df.repartition(partition_size="100MB")
        
        # save as parquet files
        parquet_path = './parquet_processing/' + file_type + '/'
        df.to_parquet(parquet_path)
#         print(dd.read_parquet(parquet_path).head())
        return dd.read_parquet(parquet_path)
    
# class AssignmentReader(DaskReader):
#     def __init__(self, csv_path):
#         super().__init__(csv_path)
        
class DriverReader(DaskReader):
    ''' Reads driver csv files.
    
    Expected Columns
    ----------------
    driver_id (unique)
    last_name
    first_name
    age (years)
    home_city
    home_state(standard US state code, 2 characters)
    
    It is assumed that the company only hires drivers that are from a city
    where a bus goes to.  No need to verify that there is a route to the 
    home city based on the assumption given in the assignment.
    
    "The company only hire drivers from cities where there is a route that 
    serves as its destination (destination ONLY, not departure city)."
    '''
    
    def __init__(self, csv_path):
        super().__init__(csv_path)
        
        self.column_names = ['driver_id', 'last_name', 'first_name', 'age', 'home_city',
                            'home_state']
        
        self.df = self.read_df()
        
    def read_df(self):
        return self._read_df(file_type='drivers', names=self.column_names)
    
    def verify_df(self):
        self._verify_attributes()
        
    def _check_duplicates_by(self, column_name):
        # adapted from: https://github.com/dask/dask/issues/2952
        self.df = self.df.reset_index().drop_duplicates([column_name]).set_index("index")
    
    def _verify_attributes(self):
        '''Verifies basic attributes in the table. 
        
        We verify that each of the attributes follows the datastructures we are placing on them,
        and we may want to add functionality to drop any rows that are duplicates.  Note: we 
        must check if the dataframe is empty so that we avoid KeyErrors.
        '''
        
        if self._is_empty(self.df):
            return self.df
        
        # driver_id must be 5 characters and alphanumeric
        self.df = self.df[self.df['driver_id'].apply(
            lambda x: self._verify_id(x), meta=pd.Series([], dtype='str', name='x'))]

        if self._is_empty(self.df):
            return self.df
        
        # last_name, first_name are a max of 80 characters
        self.df = self.df[self.df['last_name'].apply(
            lambda x: self._verify_str_len(x, 1, 80), meta=pd.Series([], dtype='str', name='x'))]

        if self._is_empty(self.df):
            return self.df
        
        self.df = self.df[self.df['first_name'].apply(
            lambda x: self._verify_str_len(x, 1, 80), meta=pd.Series([], dtype='str', name='x'))]
        
        if self._is_empty(self.df):
            return self.df
        
        # age is up to 3 characters and is an integer from 16-100
        self.df = self.df[self.df['age'].apply(
            lambda x: self._verify_str_len(x, 2, 3) and self._verify_int_value(x, 16, 100),
            meta=pd.Series([], dtype='str', name='x'))]
        
        if self._is_empty(self.df):
            return self.df
        
        
        

class RouteReader(DaskReader):
    ''' Reads route csv files.
    
    Expected Columns
    ----------------
    Route number
    Route name (left empty if not present)
    Departure city name
    Departure city code (standard US state code, 2 characters)
    Destination city name
    Destination city code
    Route type code (0 for daily, 1 for weekdays only, 2for weekend only)
    Departure time (hours)
    Departure time (minutes)
    Travel time (hours)
    Travel time (minutes)
    
    I renamed them below so it is harder to accidentially call on the wrong
    name (i.e. departure is now src)
    
    Helpful link: https://stackoverflow.com/questions/47125665/simple-dask-map-partitions-example
    '''
    def __init__(self, csv_path):
        super().__init__(csv_path)
        
        self.column_names = ['route_id', 'route_name', 'src_city_name', 'src_state_code',
                            'dest_city_name', 'dest_state_code', 'route_type', 'dep_time_hr',
                             'dep_time_min', 'travel_time_hr', 'travel_time_min']
        
        self.df = self.read_df()
        
        self.user_time = 0
        self.current_id = ''
        
    def read_df(self):
        return self._read_df(file_type='routes', names=self.column_names)
    
    def verify_df(self):
        self._verify_attributes()
    
    def _verify_time_less_than(self, hr, min_, max_minutes):
        hr, min_ = int(hr), int(min_)
        
        if (hr*60) + min_ > max_minutes:
            return False
        return True
    
    def _check_duplicates_by(self, column_name):
        # adapted from: https://github.com/dask/dask/issues/2952
        self.df = self.df.reset_index().drop_duplicates([column_name]).set_index("index")
    
    def _verify_attributes(self):
        '''Verifies basic attributes in the table. 
        
        We verify that each of the attributes follows the datastructures we are placing on them,
        and we may want to add functionality to drop any rows that are duplicates.  Note: we 
        must check if the dataframe is empty so that we avoid KeyErrors.
        
        Note:  All of this culd be applied better but leaving for now for clarity.  Another thing
        that we need to do here is remove duplicates of someone with the same ID.  Could just do 
        a groupby and remove all indices after the first occurance.  Looking into better ways to 
        handle the out of memory operations like the link I referenced at the beginning of the 
        '''
#         https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.dropna
#             drop na for most attributes, but some are fine to be na
        # drop nan rows for columns that must contain values
        # causes issues for masking down the line...so I made my own helper function
        
        if self._is_empty(self.df):
            return self.df
        
        # route_ID must be 5 characters and alphanumeric
        # this could probably be improved, but I am still learning dask 
        self.df = self.df[self.df['route_id'].apply(
            lambda x: self._verify_id(x), meta=pd.Series([], dtype='str', name='x'))]

        if self._is_empty(self.df):
            return self.df

        # route_name is optional but a max of 80 characters
        self.df = self.df[self.df['route_name'].apply(
            lambda x: self._is_nan(x) or self._verify_str_len(x, 0, 80),
            meta=pd.Series([], dtype='str', name='x'))]

        if self._is_empty(self.df):
                return self.df
        
        # src_city_name, dest_city_name are a max of 80 characters
        self.df = self.df[self.df['src_city_name'].apply(
            lambda x: self._verify_str_len(x, 1, 80), meta=pd.Series([], dtype='str', name='x'))]

        if self._is_empty(self.df):
            return self.df
        
        self.df = self.df[self.df['dest_city_name'].apply(
            lambda x: self._verify_str_len(x, 1, 80), meta=pd.Series([], dtype='str', name='x'))]
        
        if self._is_empty(self.df):
            return self.df
            
        # route_type is 1 character and can be the integers 0, 1, or 2
        self.df = self.df[self.df['route_type'].apply(
            lambda x: self._verify_str_len(x, 1, 1) and x in ['0', '1', '2'],
            meta=pd.Series([], dtype='str', name='x'))]
        
        if self._is_empty(self.df):
            return self.df
        
        # dep_time_hr is up to 2 characters and is an integer from 0-23
        self.df = self.df[self.df['dep_time_hr'].apply(
            lambda x: self._verify_str_len(x, 1, 2) and self._verify_int_value(x, 0, 23),
            meta=pd.Series([], dtype='str', name='x'))]
        
        if self._is_empty(self.df):
            return self.df
        
        # travel_time_hr is up to 2 characters and is an integer from 0-72
        self.df = self.df[self.df['travel_time_hr'].apply(
            lambda x: self._verify_str_len(x, 1, 2) and self._verify_int_value(x, 0, 72),
            meta=pd.Series([], dtype='str', name='x'))]
        
        if self._is_empty(self.df):
            return self.df
        
        # travel_time_min is up to 2 characters and is an integer from 0-59
        self.df = self.df[self.df['travel_time_min'].apply(
            lambda x: self._verify_str_len(x, 1, 2) and self._verify_int_value(x, 0, 59),
            meta=pd.Series([], dtype='str', name='x'))]
        
        if self._is_empty(self.df):
            return self.df
        
        # total travel time must not exceed 72 hrs
        self.df = self.df[self.df[['travel_time_hr', 'travel_time_min']].apply(
            lambda x: self._verify_time_less_than(*x, 72*60),
            meta=pd.Series([], dtype='str', name='x'), axis=1)]
        
        if self._is_empty(self.df):
            return self.df
        
        # src_state_code and dest_state_code must be 2 characters and valid state codes
        self.df = self.df[self.df['src_state_code'].apply(
            lambda x: self._verify_str_len(x, 2, 2) and x in self.state_codes,
            meta=pd.Series([], dtype='str', name='x'))]
        
        if self._is_empty(self.df):
            return self.df
        
        self.df = self.df[self.df['dest_state_code'].apply(
            lambda x: self._verify_str_len(x, 2, 2) and x in self.state_codes,
            meta=pd.Series([], dtype='str', name='x'))]
        
        # remove duplicate IDs
        self._check_duplicates_by('route_id')
        
        
    ''' All functions below should be removed at a later time if we do not 
        need them.
    '''
    def grouper(self, column_name):
        a = self.df.groupby(column_name)
#         t = a.apply(self.checker, meta=pd.Series([], dtype='str', name='x'))
        
        t = a.apply(lambda x: self.test(x), meta=pd.Series([], dtype='str', name='x'))
#         t = a.applymap(lambda x: self.test(x), meta=pd.Series([], dtype='str', name='x'))
        return t
    
    def test(self, x):
        self.user_time = 0
        
        t = x.applymap(lambda a: self.a)
        return t
    
    def a(self, t):
        print(t)
    
    def checker(self, x):
        user_time = 0
#         print(f'user time: {user_time} x: {x}')
#         print(f'user time: {user_time} x: {x}\n\n')
        return x.applymap(lambda row: self.row_check)
#         user_time +=1
        
        
    def row_check(self, row):
        print(row)
        return 0
        
    
test_file = './test_csvs/routes/edited_Lin_Routes.csv'
dr = RouteReader(test_file)
# dr.df.head()
dr.verify_df()
dr.df.head()
# print(dr.df.head())
# dr.grouper('route_id').head()

# v = dr._check_duplicates_by('route_id')



Unnamed: 0_level_0,route_id,route_name,src_city_name,src_state_code,dest_city_name,dest_state_code,route_type,dep_time_hr,dep_time_min,travel_time_hr,travel_time_min
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2,A1345,,Waco,TX,Dallas,TX,0,20,0,72,0


In [28]:
df = pd.DataFrame({
    'a': ['a', 'b', 'a', 'a', 'b'],
    'b': [0, 1, 0, 2, 5],
    })
ddf = dd.from_pandas(df, 2)
df

Unnamed: 0,a,b
0,a,0
1,b,1
2,a,0
3,a,2
4,b,5


In [29]:
df['c'] = [1, 2, 1, 1, 2]
ddf = dd.from_pandas(df, 2)
nunique = dd.Aggregation(
    name="nunique",
    chunk=lambda s: s.apply(lambda x: list(set(x))),
    agg=lambda s0: s0.obj.groupby(level=list(range(s0.obj.index.nlevels))).sum(),
    finalize=lambda s1: s1.apply(lambda final: len(set(final))),
    )
ddf.groupby('a').agg({'b':nunique, 'c':nunique})

Unnamed: 0_level_0,b,c
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,int64,int64
,...,...


Possible time overlap solution?

https://stackoverflow.com/questions/57876479/efficient-way-to-compute-difference-of-all-rows-in-dask-dataframe
https://stackoverflow.com/questions/60721290/how-to-apply-a-custom-function-to-groups-in-a-dask-dataframe-using-multiple-col
https://docs.dask.org/en/latest/array.html

In [9]:
import dask.dataframe as dd
import dask

assignment_csv_path = './test_csvs/assignments/Lin_Assignment.csv'

df = dask.dataframe.read_csv(assignment_csv_path, dtype='str', header=None)
# df = dd.read_csv(assignment_csv_path, dtype='str')
print(type(df))
df.head(26)

<class 'dask.dataframe.core.DataFrame'>


Unnamed: 0,0,1,2
0,100A,1,M
1,100B,1,T
2,100C,1,W
3,100A,1,U
4,100B,1,F
5,100C,1,S
6,100A,1,s
7,200A,2,M
8,100A,2,T
9,100B,2,W
