Save diff dataframe to table

In [2]:
import requests
import datetime
import time
import os
import csv
import pandas as pd
# from os import environ
from sqlalchemy import MetaData, create_engine, asc
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError

import psycopg2
from psycopg2 import Error

from flask import Flask, jsonify, request
from flask_cors import cross_origin, CORS
from models.models import Base

from models.models import ExpensesRaw, Filenames
# from endpointClasses.resources import Resources
#
from sqlalchemy import Column, Integer, BigInteger, String, Text, DateTime, \
    Float, Boolean, func, ForeignKeyConstraint, Index, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
from datetime import datetime, timedelta, timezone
from geoalchemy2 import Geography, Geometry


In [3]:
run_all_flag = True

In [4]:
### Setup the application
app = Flask(__name__)

# Wrap CORS around the app so that the server does not block machine to machine
# or browser based requests
CORS(app)

<flask_cors.extension.CORS at 0x21407476310>

In [6]:
# Engine below for Google Cloud PostgreSQL access.
engine = create_engine('postgresql+psycopg2://postgres:5413CrossFit2018@34'
                       '.70.40.80/transgov')

Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

In [7]:
col_names = ["Ministry", "Position", "Name", "Type", "Category", "Date", "Amount",
         "Description", "Receipt 1", "Receipt 2", "Receipt 3"]

In [8]:
# download  csv file
# Only do this once per day
print('Downloading .....')
df = pd.read_csv(r'https://expenses.alberta.ca/DownloadData.aspx?type=csv'
                   r'&d=IsVE/OcdpNZJ5rBbvji3qw', names=col_names,
                     low_memory=False, parse_dates=['Date'])

# Saving the dataframe
print('Saving to CSV ...')
filename = 'expenses/' + str( time.strftime('%Y%m%d') ) + ".csv"
df.to_csv(filename)
print('done...')

Downloading .....
Saving to CSV ...
done...


In [9]:
# We do this because we want to skip the heading rows
print('Reading from file...', filename)
# filename = "/" + filename
df=pd.read_csv(filename, names=col_names, low_memory=False, skiprows=2)
df_size = len(df.index)
print('Done...', df_size)

Reading from file... expenses/20210224.csv
Done... 539390


In [10]:
# compare the current download to the most previous download so that we can identify
# the changes and only process those.
# 1. Check file sizes
# 2. iterate through DF1 and lookup record in DF2
# 3.   if record exists then do nothing
# 4.   is this new record or a changed one? (hard to determine....we may have to manually see if dups become a problem)

# 4.   otherwise add record to database

# get the most recent entry in the table FileNames
prevFile = pd.read_sql(
        session.query(Filenames).
    order_by(Filenames.id.desc()).statement, session.bind)

previousFile = prevFile._get_value(0,'name')
print('Reading from previous file...', previousFile)

# open the file as a dataframe
prev_df = pd.read_csv(previousFile, names=col_names, low_memory=False, skiprows=2)
prev_df_size = len(prev_df.index)
print('Done...', prev_df_size)

Reading from previous file... expenses/20210223.csv
Done... 539390


In [11]:
# save file name to filenames table in database
filetosave = Filenames(
        name=filename,
        records = df_size,
        created_at=datetime.now()
    )
try:
    session.add(filetosave)
    session.commit()
except SQLAlchemyError as e:
    session.rollback()
    print('Could not save the new filename %s', filetosave)

In [12]:
# compare the two dataframes
# https://hackersandslackers.com/compare-rows-pandas-dataframes/
def dataframe_difference(df1, df2, which=None):
    """Find rows which are different between two DataFrames."""
    comparison_df = df1.merge(
        df2,
        indicator=True,
        how='outer'
    )
    if which is None:
        diff_df = comparison_df[comparison_df['_merge'] != 'both']
    else:
        diff_df = comparison_df[comparison_df['_merge'] == which]
    diff_df.to_csv('expenses/diff.csv')
    return diff_df

In [13]:
df_diff_df = dataframe_difference(df, prev_df)

In [14]:
df_diff_df

Unnamed: 0,Ministry,Position,Name,Type,Category,Date,Amount,Description,Receipt 1,Receipt 2,Receipt 3,_merge


In [None]:
# if the diff dataframe has no rows then there is no difference between the files so do nothing.

In [None]:
df_diff_df._merge.unique()

In [None]:
#Save diff Df
filename = 'expenses/' + 'diff_' + str( time.strftime('%Y%m%d') ) + ".csv"
df_diff_df.to_csv(filename)

In [None]:
# Add left_only to the table expensesraw and update the 'changed' field to true for any that are right_only
start_time = time.time()
length = len(df_diff_df)
ctr = 0

for index, row in df_diff_df.iterrows():
    
    newdate = df_diff_df._get_value(index, 'Date')
    newdate = datetime.strptime(newdate, '%m/%d/%Y')
    newAmount = df_diff_df._get_value(index, 'Amount').replace('$', '').replace(',','')
    newAmount = float(newAmount)

    if len(str(df_diff_df._get_value(index, 'Receipt 1'))) > 5:
        newReceipt1 = df_diff_df._get_value(index, 'Receipt 1')
    else:
        newReceipt1 = ''
    if len(str(df_diff_df._get_value(index, 'Receipt 2'))) > 5:
        newReceipt2 = df_diff_df._get_value(index, 'Receipt 2')
    else:
        newReceipt2 = ''
    if len(str(df_diff_df._get_value(index, 'Receipt 3'))) > 5:
        newReceipt3 = df_diff_df._get_value(index, 'Receipt 3')
    else:
        newReceipt3 = ''  
        
    if df_diff_df._get_value(index, '_merge') == 'left_only':
        changed = False
        
    if df_diff_df._get_value(index, '_merge') == 'right_only':
        changed = True  
        
    expense = ExpensesRaw(
        ministry=df_diff_df._get_value(index, 'Ministry'),
        position=df_diff_df._get_value(index, 'Position'),
        name=df_diff_df._get_value(index, 'Name'),
        type=df_diff_df._get_value(index, 'Type'),
        category=df_diff_df._get_value(index, 'Category'),
        expense_date=newdate,
        amount=newAmount,
        description=df_diff_df._get_value(index, 'Description'),
        receipt1=newReceipt1,
        receipt2=newReceipt2,
        receipt3=newReceipt3,
        changed = changed,
        date_last_found=datetime.now(),
        created_at=datetime.now(),
        updated_at=datetime.now(),
    )
    
    if ctr % 100 == 0:
        perc = "{:.3f}".format(ctr/length)
        elapsed = "{:.2f}".format(time.time() - start_time)
        print('Count: %s of %s percentage %s elapsed %s ' %(ctr, length, perc, elapsed))
    
    ctr = ctr + 1
    
    try:
        session.add(expense)
        session.commit()
    except SQLAlchemyError as e:
        session.rollback()
        print('Could not save the new expense %s',
              expense)


Following code cleans the original dataframe df

In [None]:
shift_df = df[df.Date.str.startswith('$')]
shift_df

In [None]:
len(shift_df)

In [None]:
TOTAL_COLS = df.shape[1] -1
for idx, row in shift_df.iterrows():
    new_line = [df.iloc[idx, 0], None]
    for i in df.iloc[idx, 1: TOTAL_COLS]:
        new_line.append(i)
    df.loc[idx] = new_line


In [None]:
# Test
shift_df = df[df.Date.str.startswith('$')]
shift_df

In [None]:
shift_df_2 = df[df.Date.str.startswith('H')]
shift_df_2

In [None]:
len(shift_df_2)

In [None]:
TOTAL_COLS = df.shape[1] -1
shift_df_2
for idx, row in shift_df_2.iterrows():
    new_line = []
    for i in df.iloc[idx, : TOTAL_COLS+1]:
        new_line.append(i)
    df.loc[idx] = new_line

In [None]:
# Test
shift_df = df[df.Date.str.startswith('H')]
shift_df

In [None]:
shift_df_3 = df[df.Date.str.startswith('O')]
shift_df_3

In [None]:
len(shift_df_3)

In [None]:
TOTAL_COLS = df.shape[1] -1
shift_df_3
for idx, row in shift_df_3.iterrows():
    new_line = []
    for i in df.iloc[idx, : TOTAL_COLS+1]:
        new_line.append(i)
    df.loc[idx] = new_line

In [None]:
# Test
shift_df = df[df.Date.str.startswith('O')]
shift_df

In [None]:
shift_df_4 = df[df.Date.str.startswith('M')]
shift_df_4

In [None]:
TOTAL_COLS = df.shape[1] -1
shift_df_4
for idx, row in shift_df_4.iterrows():
    new_line = []
    for i in df.iloc[idx, : TOTAL_COLS+1]:
        new_line.append(i)
    df.loc[idx] = new_line

In [None]:
# Test
shift_df = df[df.Date.str.startswith('M')]
shift_df

Following code cleans the diff dataframe

In [None]:
shift_df = df_diff_df[df_diff_df.Date.str.startswith('$')]
shift_df

In [None]:
len(shift_df)

In [None]:
TOTAL_COLS = df_diff_df.shape[1] -1
for idx, row in shift_df.iterrows():
    new_line = [df_diff_df.iloc[idx, 0], None]
    for i in df_diff_df.iloc[idx, 1: TOTAL_COLS]:
        new_line.append(i)
    df_diff_df.loc[idx] = new_line

In [None]:
shift_df_2 = df_diff_df[df_diff_df.Date.str.startswith('H')]
shift_df_2

In [None]:
len(shift_df_2)

In [None]:
# Test
shift_df = df_diff_df[df_diff_df.Date.str.startswith('H')]
shift_df

In [None]:
TOTAL_COLS =df_diff_df.shape[1] -1
shift_df_2
for idx, row in shift_df_2.iterrows():
    new_line = []
    for i in df_diff_df.iloc[idx, : TOTAL_COLS+1]:
        new_line.append(i)
    df_diff_df.loc[idx] = new_line

In [None]:
# Test
shift_df = df_diff_df[df_diff_df.Date.str.startswith('H')]
shift_df

In [None]:
shift_df_3 = df_diff_df[df_diff_df.Date.str.startswith('O')]
shift_df_3

In [None]:
len(shift_df_3)

In [None]:
TOTAL_COLS = df_diff_df.shape[1] -1
shift_df_3
for idx, row in shift_df_3.iterrows():
    new_line = []
    for i in df_diff_df.iloc[idx, : TOTAL_COLS+1]:
        new_line.append(i)
    df_diff_df.loc[idx] = new_line

In [None]:
# Test
shift_df = df_diff_df[df_diff_df.Date.str.startswith('O')]
shift_df

In [None]:
shift_df_4 = df_diff_df[df_diff_df.Date.str.startswith('M')]
shift_df_4

In [None]:
TOTAL_COLS = df_diff_df.shape[1] -1
shift_df_4
for idx, row in shift_df_4.iterrows():
    new_line = []
    for i in df_diff_df.iloc[idx, : TOTAL_COLS+1]:
        new_line.append(i)
    df_diff_df.loc[idx] = new_line

In [None]:
# Test
shift_df = df_diff_df[df_diff_df.Date.str.startswith('M')]
shift_df

SAVE DIff dataframe here

The next cells identify around 10000 records that were not saved in the initial run.  Not sure why, probably a DB connection issue

In [None]:
if run_all_flag:
    df.query('Name == "Filevich,Patricia"')

In [None]:
if run_all_flag:
    df.loc[509620:509630]

In [None]:
# new dataframe of missing records
if run_all_flag:
    missing_records_df = df.loc[498839: 509627]

In [None]:
if run_all_flag:
    missing_records_df.head()

In [None]:
if run_all_flag:
    missing_records_df.tail()

In [None]:
if run_all_flag:
    len(missing_records_df)

Duplicates

In [None]:
# identify duplicate values
dups_df= df[df.duplicated(keep=False)] # identifies 850
# dups_df= df[df.duplicated()] # Identifies 431

In [None]:
dups_df

In [None]:
# FUTURE only process delta dataframe!!!


# Process the dataframe
# Note that after the very first attempt at this we need to test for whether the record already exist and then update the
# date last found

start_time = time.time()
length = len(df)
ctr = 0

for index, row in df.iterrows():
    
    newdate = df._get_value(index, 'Date')
    newdate = datetime.strptime(newdate, '%m/%d/%Y')
    newAmount = df._get_value(index, 'Amount').replace('$', '').replace(',','')
    newAmount = float(newAmount)

    if len(str(df._get_value(index, 'Receipt 1'))) > 5:
        newReceipt1 = df._get_value(index, 'Receipt 1')
    else:
        newReceipt1 = ''
    if len(str(df._get_value(index, 'Receipt 2'))) > 5:
        newReceipt2 = df._get_value(index, 'Receipt 2')
    else:
        newReceipt2 = ''
    if len(str(df._get_value(index, 'Receipt 3'))) > 5:
        newReceipt3 = df._get_value(index, 'Receipt 3')
    else:
        newReceipt3 = ''  
    
    expense = ExpensesRaw(
        ministry=df._get_value(index, 'Ministry'),
        position=df._get_value(index, 'Position'),
        name=df._get_value(index, 'Name'),
        type=df._get_value(index, 'Type'),
        category=df._get_value(index, 'Category'),
        expense_date=newdate,
        amount=newAmount,
        description=df._get_value(index, 'Description'),
        receipt1=newReceipt1,
        receipt2=newReceipt2,
        receipt3=newReceipt3,
        date_last_found=datetime.now(),
        created_at=datetime.now(),
        updated_at=datetime.now(),
    )
    
#     qry = session.query(ExpensesRaw)
#     qry = qry.filter(
#                      ExpensesRaw.ministry==df._get_value(index, 'Ministry'),
#                      ExpensesRaw.position==df._get_value(index, 'Position'),
#                      ExpensesRaw.name==df._get_value(index, 'Name'),
#                      ExpensesRaw.type==df._get_value(index, 'Type'),
#                      ExpensesRaw.category==df._get_value(index, 'Category'),
#                      ExpensesRaw.expense_date==newdate,
#                      ExpensesRaw.amount==newAmount,
#                      ExpensesRaw.description==df._get_value(index, 'Description'),
#                      ExpensesRaw.receipt1==df._get_value(index, 'Receipt 1'),
#                      ExpensesRaw.receipt2==df._get_value(index, 'Receipt 2'),
#                      ExpensesRaw.receipt3==df._get_value(index, 'Receipt 3')
#                     ).first()
#     print('Query', qry)
    
    if ctr % 100 == 0:
        perc = "{:.3f}".format(ctr/length)
        elapsed = "{:.2f}".format(time.time() - start_time)
        print('Count: %s of %s percentage %s elapsed %s ' %(ctr, length, perc, elapsed))
    
    ctr = ctr + 1
    
    try:
        session.add(expense)
        session.commit()
    except SQLAlchemyError as e:
#         error = str(e.__dict__['orig'])
        session.rollback()
        print('Could not save the new expense %s',
              expense)
#         print(error)
   

In [None]:
# for each row in dups_df find it in the expensesraw table of the db and update the isduplicated flag to true
ctr = 1
for index, row in dups_df.iterrows():
    #find the row in the database
    newdate = df._get_value(index, 'Date')
    newdate = datetime.strptime(newdate, '%m/%d/%Y')
    newAmount = df._get_value(index, 'Amount').replace('$', '').replace(',','')
    newAmount = float(newAmount)

    if len(str(df._get_value(index, 'Receipt 1'))) > 5:
        newReceipt1 = df._get_value(index, 'Receipt 1')
    else:
        newReceipt1 = ''
    if len(str(df._get_value(index, 'Receipt 2'))) > 5:
        newReceipt2 = df._get_value(index, 'Receipt 2')
    else:
        newReceipt2 = ''
    if len(str(df._get_value(index, 'Receipt 3'))) > 5:
        newReceipt3 = df._get_value(index, 'Receipt 3')
    else:
        newReceipt3 = ''
        
    print('Position : ',  df._get_value(index, 'Position'))
    newPosition = df._get_value(index, 'Position')
    
    if df.isnull()._get_value(index, 'Position'):
        print('Position is None')
        newPosition = ''
        
    print('Description : ',  df._get_value(index, 'Description'))
    newDescription = df._get_value(index, 'Description')
    
    if df.isnull()._get_value(index, 'Description'):
        print('Description is None')
        newDescription = ''    
    
    qry = session.query(ExpensesRaw)
    qry = qry.filter(
                     ExpensesRaw.ministry==df._get_value(index, 'Ministry'),
                     ExpensesRaw.position==newPosition,
                     ExpensesRaw.name==df._get_value(index, 'Name'),
                     ExpensesRaw.type==df._get_value(index, 'Type'),
                     ExpensesRaw.category==df._get_value(index, 'Category'),
                     ExpensesRaw.expense_date==newdate,
                     ExpensesRaw.amount==newAmount,
                     ExpensesRaw.description==newDescription,
                     ExpensesRaw.receipt1==newReceipt1,
                     ExpensesRaw.receipt2==newReceipt2,
                     ExpensesRaw.receipt3==newReceipt3,
                    ).all()
    for result in qry:
        result.is_duplicated = True
#         print(result)
    try:
        session.commit()
    except:
        print('Could not update')
    print('Count', ctr)
    ctr = ctr + 1    
    

In [None]:
print(df.head())