# Imports and setup

In [1]:
import psycopg2
import numpy as np
import pandas as pd
import os
import datetime
import csv
import glob
import time

from mysecrets import username, password, hostname, port

In [2]:
HOME_PATH = os.getcwd()
print(HOME_PATH)
# pd.read_csv

/home/denny/Documents/mids/ev_charging


In [3]:
conn = psycopg2.connect(
    host=hostname,
    database="postgres",
    user=username,
    password=password)
conn

<connection object at 0x7f092de13740; dsn: 'user=postgres password=xxx dbname=postgres host=evdb.cjcyce60qgvj.us-east-1.rds.amazonaws.com', closed: 0>

In [4]:
def get_connection():
    return psycopg2.connect(
    host=hostname,
    database="postgres",
    user=username,
    password=password)

In [5]:
def run_query(query):
    conn = get_connection()
    with conn.cursor() as cur:
        cur.execute(query)
        try:
            results = cur.fetchall()
        except:
            print('query had no results to fetch')
            return 1
        
    return results

In [6]:
query = """select * from d_locations """
run_query(query)

[(1, 'caltech', 'California_Garage_01', 'CA'),
 (2, 'caltech', 'California_Garage_02', 'CA'),
 (3, 'caltech', 'LIGO_01', 'CA'),
 (4, 'caltech', 'N_Wilson_Garage_01', 'CA'),
 (5, 'caltech', 'S_Wilson_Garage_01', 'CA'),
 (6, 'jpl', 'Arroyo_Garage_01', 'CA'),
 (7, 'office_01', 'Parking_Lot_01', 'CA')]

In [7]:
conn = get_connection()
with conn.cursor() as cur:
    cur.execute('SELECT version()')
    result = cur.fetchall()
    
print(result)


[('PostgreSQL 15.4 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-12), 64-bit',)]


## Postgres insert helpers
 - execute_values -> slow
 - copy_from_stringiq -> fast 

In [8]:
import psycopg2.extras as extras
def execute_values(conn, df, table):
    """
    Using psycopg2.extras.execute_values() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cursor = conn.cursor()
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_values() done")
    cursor.close()

In [9]:
from io import StringIO
def copy_from_stringio(conn, df, table):
    """
    Here we are going save the dataframe in memory 
    and use copy_from() to copy it to the table
    """
    # save dataframe to an in memory buffer
    start_time = time.time()
    buffer = StringIO()
    df.to_csv(buffer,
              index=False, 
              sep=',',
              na_rep='NaN',
              header=False)
    
    buffer.seek(0)
    
    cursor = conn.cursor()
    try:
        cursor.copy_from(buffer, table, sep=",", columns=list(df.columns))
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(f'time to upload {df.shape[0]:,} records: {time.time()-start_time} or a rate of {np.round(df.shape[0]/(time.time()-start_time),-1):,}  records per second')
    print("copy_from_stringio() done")
    cursor.close()


# get locations

In [10]:
ldf = pd.read_sql(
    'select * from d_locations', conn
)
ldf.head()

  ldf = pd.read_sql(


Unnamed: 0,id,major_name,minor_name,state
0,1,caltech,California_Garage_01,CA
1,2,caltech,California_Garage_02,CA
2,3,caltech,LIGO_01,CA
3,4,caltech,N_Wilson_Garage_01,CA
4,5,caltech,S_Wilson_Garage_01,CA


In [11]:
ldf['path'] = ldf[['major_name','minor_name']].apply(lambda x: '/'+'/'.join([x[0],x[1]])+'/', axis=1)

  ldf['path'] = ldf[['major_name','minor_name']].apply(lambda x: '/'+'/'.join([x[0],x[1]])+'/', axis=1)


# Get all the files

In [12]:
def populate_d_files(conn, HOME_PATH):
    root = HOME_PATH + '/data/'
    rows = []
    i=0
    print(f'walking {root} for csv files ...')
    for path, subdirs, files in os.walk(root):
        for name in files:
            if name.endswith('.csv'):
                i += 1
                rows.extend([[i,os.path.join(path, name)[43:], name]])
    
    fdf = pd.DataFrame(rows, columns=['id','full_path','filename'])
    print('walk complete. \nresetting db tables...')
    query = """drop table if exists d_files;
        create table d_files
        (
            id        serial
                primary key,
            full_path varchar,
            filename  varchar
        );"""
    
    run_query(query)
    print('d_files table reset.')
    print(f'inserting {fdf.shape[0]:,} records...')
    copy_from_stringio(conn, fdf, 'd_files')
    # copy_from_stringio(conn, fdf.set_index('id'), 'd_files')
    print('file table creation complete')
    return fdf
    
fdf = populate_d_files(conn, HOME_PATH)
    

walking /home/denny/Documents/mids/ev_charging/data/ for csv files ...
walk complete. 
resetting db tables...
query had no results to fetch
d_files table reset.
inserting 85,877 records...
time to upload 85,877 records: 8.594522714614868 or a rate of 9,990.0  records per second
copy_from_stringio() done
file table creation complete


# Insert time series charges

In [13]:
# # https://stackoverflow.com/a/75756749/6432367
# import csv
# import glob
# import pandas as pd
# 
# rows = []
# full_path = '/data/ACN-Data-Static-main/office_01/Parking_Lot_01/'
# file_names = []
# for filename in glob.glob("*.csv"):
#     with open(filename, newline="") as f:
#         reader = csv.reader(f)
#         columns = next(reader)
#         for row in reader:
#             if row:
#                 row += [filename]
#                 rows.append(row)
#         # columns = next(reader)
#         # rows.extend(reader)
#         # file_names += [filename] * reader.line_num
#         
# columns = ['','Charging Current (A)', 'Actual Pilot (A)', 'Voltage (V)','Charging State','Energy Delivered (kWh)', 'Power (kW)','filename']
# df = pd.DataFrame(rows, columns=columns)

In [14]:
def get_csvs(path):
    os.chdir(path)
    return [filename for filename in glob.glob("*.csv")]

    

In [15]:
def filename_csvs_to_df(path, filenames):
    start_time = time.time()
    new_time = start_time
    os.chdir(path)
    
    rows = []
    i = 0
    for filename in filenames:
        i +=1
        with open(filename, newline="") as f:
            reader = csv.reader(f)
            columns = next(reader)
            for row in reader:
                if row:
                    row += [filename]
                    rows.append(row)
            # columns = next(reader)
            # rows.extend(reader)
            # file_names += [filename] * reader.line_num
        if i%500==0: 
            print(f'processed {i} \t\t{time.time()-new_time}')
            new_time = time.time()
        
    print(f'time to read files:\t {time.time() - start_time}')
    new_time = time.time()
    columns = ['','Charging Current (A)', 'Actual Pilot (A)', 'Voltage (V)','Charging State','Energy Delivered (kWh)', 'Power (kW)','filename']
    df = pd.DataFrame(rows, columns=columns)
    print(f'time to make dataframe:\t {time.time() - new_time}')
    print(f'total time to parse csvs {time.time() - start_time} in {path}')
    return df

In [16]:
def dir_csvs_to_df(path):
    start_time = time.time()
    new_time = start_time
    os.chdir(path)
    
    rows = []
    i = 0
    for filename in glob.glob("*.csv"):
        i +=1
        with open(filename, newline="") as f:
            reader = csv.reader(f)
            columns = next(reader)
            for row in reader:
                if row:
                    row += [filename]
                    rows.append(row)
            # columns = next(reader)
            # rows.extend(reader)
            # file_names += [filename] * reader.line_num
        if i%500==0: print(f'processed {i}')
    print(f'time to read files:\t {time.time() - new_time}')
    new_time = time.time()
    columns = ['','Charging Current (A)', 'Actual Pilot (A)', 'Voltage (V)','Charging State','Energy Delivered (kWh)', 'Power (kW)','filename']
    df = pd.DataFrame(rows, columns=columns)
    print(f'time to make dataframe:\t {time.time() - new_time}')
    print(f'total time to parse csvs {time.time() - start_time} in {path}')
    return df

In [17]:
def format_charges(df):
    start_time = time.time()
    new_time = start_time
    
    # rename columns to match db
    df.rename(columns={"":"datetime",'Charging Current (A)':'charging_current_amps', 'Actual Pilot (A)':'actual_pilot_amps', 'Voltage (V)':'voltage_volts',
           'Charging State':'charging_state', 'Energy Delivered (kWh)':'energy_delivered_kwh', 'Power (kW)':'power_kw','filename':'file_id'}, inplace=True)
    print(f'rename columns:\t {time.time()-new_time}')
    new_time = time.time()
    
    # map location and files to foreign key ids
    # https://stackoverflow.com/a/46049755/6432367
    df['location_id'] = location_id
    
    df['file_id'] = df['file_id'].map(fdf.set_index('filename')['id'])
    print(f'mapping:\t {time.time()-new_time}')
    new_time = time.time()
    
    # add nans to empty numeric fields
    # df[['charging_current_amps','actual_pilot_amps','voltage_volts','energy_delivered_kwh','power_kw']] = df[['charging_current_amps','actual_pilot_amps','voltage_volts','energy_delivered_kwh','power_kw']].apply(pd.to_numeric, errors='coerce', axis=1)
    for col in ['charging_current_amps','actual_pilot_amps','voltage_volts','energy_delivered_kwh','power_kw']:
        df[col] = pd.to_numeric(df[col], errors='coerce')
    
    print(f'to numeric:\t {time.time()-new_time}')
    new_time = time.time()
    
    # apply date transformations
    df['datetime'] = pd.to_datetime(df['datetime'], format='mixed')
    df['date'] = df['datetime'].dt.date
    df['time'] = df['datetime'].dt.time
    
    print(f'time manipulation:\t {time.time()-new_time}')
    new_time = time.time()
    
    # put columns in order
    df = df[['datetime', 'location_id', 'file_id', 'charging_current_amps', 'actual_pilot_amps', 'voltage_volts', 'charging_state', 'energy_delivered_kwh', 'power_kw', 'date', 'time']]
    print(f'reordering:\t {time.time()-new_time}')
    print(f'total formating time:\t {time.time() - start_time}')
    return df

In [18]:
# import concurrent.futures
# import urllib.request
# 
# URLS = ['http://www.foxnews.com/',
#         'http://www.cnn.com/',
#         'http://europe.wsj.com/',
#         'http://www.bbc.co.uk/',
#         'http://nonexistant-subdomain.python.org/']
# 
# # Retrieve a single page and report the URL and contents
# def load_url(url, timeout):
#     with urllib.request.urlopen(url, timeout=timeout) as conn:
#         return conn.read()
# 
# # We can use a with statement to ensure threads are cleaned up promptly
# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
#     # Start the load operations and mark each future with its URL
#     future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
#     for future in concurrent.futures.as_completed(future_to_url):
#         url = future_to_url[future]
#         try:
#             data = future.result()
#         except Exception as exc:
#             print('%r generated an exception: %s' % (url, exc))
#         else:
#             print('%r page is %d bytes' % (url, len(data)))

In [19]:
# path = HOME_PATH+'/data/ACN-Data-Static-main/office_01/Parking_Lot_01/'
path = HOME_PATH + '/data/ACN-Data-Static-main' + ldf.loc[6,'path']
location_id = ldf.loc[6,'id']
print(location_id, path)
start_time = time.time()
for indx_loc in ldf.index:
    os.chdir(HOME_PATH)
    # if indx_loc == 0:
    path = HOME_PATH + '/data/ACN-Data-Static-main' + ldf.loc[indx_loc,'path']
    location_id = ldf.loc[indx_loc,'id']
    print(f'running location id {location_id} from path {path}...')
    filenames = get_csvs(path)
    
    print(f'num files {len(filenames):,}')
    chunksize = 100
    for chunk in range(0,len(filenames),chunksize):
        # print(chunk, len(filenames[chunk:chunk+chunksize]))
        df_unformated = filename_csvs_to_df(path, filenames[chunk:chunk+chunksize])
        df = format_charges(df_unformated)
        print('records', df.shape[0])
        copy_from_stringio(conn, df, 'f_charges')
    # df_unformated = dir_csvs_to_df(path)
    # df = format_charges(df_unformated)
    # copy_from_stringio(conn, df, 'f_charges')
    
print(f'total upload time: {time.time() - start_time}')
print('done with upload!')

7 /home/denny/Documents/mids/ev_charging/data/ACN-Data-Static-main/office_01/Parking_Lot_01/
running location id 1 from path /home/denny/Documents/mids/ev_charging/data/ACN-Data-Static-main/caltech/California_Garage_01/...
num files 31,860
time to read files:	 1.9893724918365479
time to make dataframe:	 0.26788878440856934
total time to parse csvs 2.25736927986145 in /home/denny/Documents/mids/ev_charging/data/ACN-Data-Static-main/caltech/California_Garage_01/
rename columns:	 0.0003027915954589844
mapping:	 0.03744220733642578
to numeric:	 0.4818296432495117
time manipulation:	 0.7951006889343262
reordering:	 0.04776406288146973
total formating time:	 1.3627381324768066
records 606080
time to upload 606,080 records: 44.66438698768616 or a rate of 13,570.0  records per second
copy_from_stringio() done
time to read files:	 1.3036041259765625
time to make dataframe:	 0.26853132247924805
total time to parse csvs 1.5723655223846436 in /home/denny/Documents/mids/ev_charging/data/ACN-Data-St

In [None]:
# df_unformated.head()

In [None]:
# copy_from_stringio(conn, df.iloc[:10,:], 'f_charges')

In [None]:
# # path = HOME_PATH+'/data/ACN-Data-Static-main/office_01/Parking_Lot_01/'
# path = HOME_PATH + '/data/ACN-Data-Static-main' + ldf.loc[6,'path']
# location_id = ldf.loc[6,'id']
# print(location_id, path)
# start_time = time.time()
# 
# import concurrent.futures
# import urllib.request
# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
#         
#     for indx_loc in ldf.index:
#         os.chdir(HOME_PATH)
#         # if indx_loc == 0:
#         path = HOME_PATH + '/data/ACN-Data-Static-main' + ldf.loc[indx_loc,'path']
#         location_id = ldf.loc[indx_loc,'id']
#         print(f'running location id {location_id} from path {path}...')
#         filenames = get_csvs(path)
#         # 
#         print(f'num files {len(filenames):,}')
#         chunksize = 1000
#         for chunk in range(0,100,100):#len(filenames),chunksize):
#             # print(chunk, len(filenames[chunk:chunk+chunksize]))
#             df_unformated = filename_csvs_to_df(path, filenames[chunk:chunk+chunksize])
#             df = format_charges(df_unformated)
#             # copy_from_stringio(conn, df, 'f_charges')
#             break
#         break
#         # df_unformated = dir_csvs_to_df(path)
#         # df = format_charges(df_unformated)
#         # copy_from_stringio(conn, df, 'f_charges')
#         
#     print(f'total upload time: {time.time() - start_time}')
#     print('done with upload!')

In [None]:
# copy_from_stringio(conn, df.iloc[0:200,:], 'f_charges')

In [None]:
# df_unformated.shape

In [None]:
# pd.to_datetime([datetime.datetime.now(), datetime.datetime.now()], format='%Y-%m-%d %H:%M:%S')
# df.iloc[0:1,0]

In [None]:
# import time
# start_time = time.time()
# new_time = start_time
# 
# # rename columns to match db
# df.rename(columns={"":"datetime",'Charging Current (A)':'charging_current_amps', 'Actual Pilot (A)':'actual_pilot_amps', 'Voltage (V)':'voltage_volts',
#        'Charging State':'charging_state', 'Energy Delivered (kWh)':'energy_delivered_kwh', 'Power (kW)':'power_kw','filename':'file_id'}, inplace=True)
# print(f'rename columns:\t {time.time()-new_time}')
# new_time = time.time()
# 
# # map location and files to foreign key ids
# # https://stackoverflow.com/a/46049755/6432367
# df['location_id'] = location_id
# 
# df['file_id'] = df['file_id'].map(fdf.set_index('filename')['id'])
# print(f'mapping:\t {time.time()-new_time}')
# new_time = time.time()
# 
# # add nans to empty numeric fields
# # df[['charging_current_amps','actual_pilot_amps','voltage_volts','energy_delivered_kwh','power_kw']] = df[['charging_current_amps','actual_pilot_amps','voltage_volts','energy_delivered_kwh','power_kw']].apply(pd.to_numeric, errors='coerce', axis=1)
# for col in ['charging_current_amps','actual_pilot_amps','voltage_volts','energy_delivered_kwh','power_kw']:
#     df[col] = pd.to_numeric(df[col], errors='coerce')
# 
# print(f'to numeric:\t {time.time()-new_time}')
# new_time = time.time()
# 
# # apply date transformations
# df['datetime'] = pd.to_datetime(df['datetime'], format='')
# pd.to_timestamp
# df['date'] = df['datetime'].dt.date
# df['time'] = df['datetime'].dt.time
# 
# print(f'time manipulation:\t {time.time()-new_time}')
# new_time = time.time()
# 
# # put columns in order
# df = df[['datetime', 'location_id', 'file_id', 'charging_current_amps', 'actual_pilot_amps', 'voltage_volts', 'charging_state', 'energy_delivered_kwh', 'power_kw', 'date', 'time']]
# print(f'reordering:\t {time.time()-new_time}')
# new_time = time.time()
# print(time.time()-start_time)

In [None]:
# start_time = time.time()
# new_time = start_time
# for col in ['charging_current_amps','actual_pilot_amps','voltage_volts','energy_delivered_kwh','power_kw']:
#     df[col] = pd.to_numeric(df[col], errors='coerce')
#     print(col, time.time()- new_time )
#     new_time = time.time()
# print(f'total time {time.time() - start_time}')

In [None]:
# copy_from_stringio(conn, df.iloc[0:20000,:], 'f_charges')
# copy_from_stringio(conn, df, 'f_charges')