<a href="https://colab.research.google.com/github/djfLtC0dr/python-playground/blob/main/DASC522/djfDASC522hw3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Sqlite3 Database & Helper Classes

In [1]:
import sqlparse
from sqlparse.sql import IdentifierList, Identifier,  Function
from sqlparse.tokens import Keyword, DML
from collections import namedtuple
import itertools
import sqlite3
import os
import pandas as pd
from typing import List

class Reference(namedtuple('Reference', ['schema', 'name', 'alias', 'is_function'])):
    __slots__ = ()

    def has_alias(self):
        return self.alias is not None

    @property
    def is_query_alias(self):
        return self.name is None and self.alias is not None

    @property
    def is_table_alias(self):
        return self.name is not None and self.alias is not None and not self.is_function

    @property
    def full_name(self):
        if self.schema is None:
            return self.name
        else:
            return self.schema + '.' + self.name

def _is_subselect(parsed):
    if not parsed.is_group:
        return False
    for item in parsed.tokens:
        if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT',
                                                        'UPDATE', 'CREATE', 'DELETE'):
            return True
    return False


def _identifier_is_function(identifier):
    return any(isinstance(t, Function) for t in identifier.tokens)


def _extract_from_part(parsed):
    tbl_prefix_seen = False
    for item in parsed.tokens:
        if item.is_group:
            for x in _extract_from_part(item):
                yield x
        if tbl_prefix_seen:
            if _is_subselect(item):
                for x in _extract_from_part(item):
                    yield x
            # An incomplete nested select won't be recognized correctly as a
            # sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
            # the second FROM to trigger this elif condition resulting in a
            # StopIteration. So we need to ignore the keyword if the keyword
            # FROM.
            # Also 'SELECT * FROM abc JOIN def' will trigger this elif
            # condition. So we need to ignore the keyword JOIN and its variants
            # INNER JOIN, FULL OUTER JOIN, etc.
            elif item.ttype is Keyword and (
                    not item.value.upper() == 'FROM') and (
                    not item.value.upper().endswith('JOIN')):
                tbl_prefix_seen = False
            else:
                yield item
        elif item.ttype is Keyword or item.ttype is Keyword.DML:
            item_val = item.value.upper()
            if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or
                    item_val.endswith('JOIN')):
                tbl_prefix_seen = True
        # 'SELECT a, FROM abc' will detect FROM as part of the column list.
        # So this check here is necessary.
        elif isinstance(item, IdentifierList):
            for identifier in item.get_identifiers():
                if (identifier.ttype is Keyword and
                        identifier.value.upper() == 'FROM'):
                    tbl_prefix_seen = True
                    break

def _extract_table_identifiers(token_stream):
    for item in token_stream:
        if isinstance(item, IdentifierList):
            for ident in item.get_identifiers():
                try:
                    alias = ident.get_alias()
                    schema_name = ident.get_parent_name()
                    real_name = ident.get_real_name()
                except AttributeError:
                    continue
                if real_name:
                    yield Reference(schema_name, real_name,
                                    alias, _identifier_is_function(ident))
        elif isinstance(item, Identifier):
            yield Reference(item.get_parent_name(), item.get_real_name(),
                            item.get_alias(), _identifier_is_function(item))
        elif isinstance(item, Function):
            yield Reference(item.get_parent_name(), item.get_real_name(),
                            item.get_alias(), _identifier_is_function(item))

def extract_tables(sql):
    # let's handle multiple statements in one sql string
    extracted_tables = []
    statements = list(sqlparse.parse(sql))
    for statement in statements:
        stream = _extract_from_part(statement)
        extracted_tables.append([ref.name for ref in _extract_table_identifiers(stream)])
    return list(itertools.chain(*extracted_tables))

class CSVDriver:
    def __init__(self, table_dir_path: str):
        self.table_dir_path = table_dir_path  # where tables (ie. csv files) are located
        self._con = None

    @property
    def con(self) -> sqlite3.Connection:
        """Make a singleton connection to an in-memory SQLite database"""
        if not self._con:
            self._con = sqlite3.connect(":memory:")
        return self._con
    
    def _exists(self, table: str) -> bool:
        query = """
        SELECT name
        FROM sqlite_master 
        WHERE type ='table'
        AND name NOT LIKE 'sqlite_%';
        """
        tables = self.con.execute(query).fetchall()
        return table in tables

    def _load_table_to_mem(self, table: str, sep: str = None) -> None:
        """
        Load a CSV into an in-memory SQLite database
        sep is set to None in order to force pandas to auto-detect the delimiter
        """
        if self._exists(table):
            return
        file_name = table + ".csv"
        path = os.path.join(self.table_dir_path, file_name)
        if not os.path.exists(path):
            raise ValueError(f"CSV table {table} does not exist in {self.table_dir_path}")
        df = pd.read_csv(path, sep=sep, engine="python")  # set engine to python to skip pandas' warning
        df.to_sql(table, self.con, if_exists='replace', index=False, chunksize=10000)

    def query(self, query: str) -> List[tuple]:
        """
        Run an SQL query on CSV file(s). 
        Tables are loaded from table_dir_path
        """
        tables = extract_tables(query)
        for table in tables:
            self._load_table_to_mem(table)
        cursor = self.con.cursor()
        cursor.execute(query)
        records = cursor.fetchall()
        return records

Data Prep => Need to get Dates & Timestamps in same format

In [14]:
import csv
from datetime import datetime

sleep = 'sleep'
ready = 'readiness'
workout = 'workout'
mdy_format = '%m/%d/%Y'
input_files = [sleep, ready, workout] 

def parse_date(input_file):
    with open(f"/{input_file}.csv", 'r') as infile, open(f"/t_{input_file}.csv", 'w') as outfile:
      reader = csv.reader(infile)
      headers = next(reader, None)  # returns the headers or `None` if the input is empty
      writer = csv.writer(outfile)
      if headers:
        writer.writerow(headers) # write the header line
      if input_file == sleep:
        for row in reader:
          dt = datetime.fromisoformat(row[1].replace('Z', '+00:00')) #parse the datetime 
          row[1] = dt.strftime(mdy_format)           #assign the revised format
          writer.writerow(row)  
      elif input_file == ready:
        for row in reader:      
          dt = datetime.strptime(row[0], '%Y-%m-%d')    #parse the datetime
          row[0] = dt.strftime(mdy_format)     #assign the revised format
          writer.writerow(row) 
      else: # workout
        for row in reader:    
          writer.writerow(row) # workout date is in mdy_format
    return     

for input_file in input_files:
    parse_date(input_file)
 



Load tables from database into Pandas dataframe object

In [20]:
db_path = r"/"
driver = CSVDriver(db_path)
sel_query = """
SELECT S.overall_score, S.composition_score, S.revitalization_score, S.duration_score, S.deep_sleep_in_minutes, S.restlessness,
   R.readiness_score_value,R.srl_normalized_score,R.ff_normalized_score,R.hrv_normalized_score,  
   W.notes AS rpe
FROM t_sleep S
LEFT JOIN t_readiness R
ON S.timestamp= R.date
LEFT JOIN t_workout W
ON R.date = W.date;
"""
data = pd.DataFrame.from_records(driver.query(sel_query))
print(data.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 517 entries, 0 to 516
Data columns (total 11 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   0       517 non-null    int64  
 1   1       517 non-null    int64  
 2   2       517 non-null    int64  
 3   3       517 non-null    int64  
 4   4       517 non-null    int64  
 5   5       517 non-null    float64
 6   6       450 non-null    float64
 7   7       450 non-null    float64
 8   8       450 non-null    float64
 9   9       450 non-null    float64
 10  10      379 non-null    object 
dtypes: float64(5), int64(5), object(1)
memory usage: 44.6+ KB
None


Data Prep => we need to subset data by mask where @RPE exists

In [23]:
mask = data[10].str.contains('@RPE', case=False, na=False)
print(data[mask].head(50))

     0   1   2   3    4         5      6     7      8     9   \
59   81  21  17  43   60  0.091130   89.0  84.0   89.0  40.0   
61   81  21  17  43   60  0.091130   89.0  84.0   89.0  40.0   
65   87  21  23  43   79  0.056695   97.0  94.0   97.0  52.0   
69   77  16  16  45   66  0.085714   46.0  98.0   46.0  42.0   
76   80  18  20  42   79  0.065265   64.0  96.0   64.0  62.0   
77   80  18  20  42   79  0.065265   64.0  96.0   64.0  62.0   
79   82  21  23  38   60  0.065183   68.0  86.0   68.0  68.0   
81   86  21  18  47   82  0.073544   94.0  97.0   94.0  51.0   
86   79  20  18  41   33  0.060976   59.0  71.0   59.0  55.0   
87   78  19  18  41   56  0.045632   79.0  84.0   79.0  45.0   
92   84  21  20  43   85  0.075812  100.0  83.0  100.0  39.0   
93   84  21  20  43   85  0.075812  100.0  83.0  100.0  39.0   
94   84  21  20  43   85  0.075812  100.0  83.0  100.0  39.0   
99   85  21  21  43   63  0.065990   84.0  91.0   84.0  68.0   
104  80  22  19  39   89  0.065301   60.