Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beefing up command line dynamic data handling functionality #11

Merged
merged 36 commits into from
Feb 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2c94edc
xlrd added to deps for data fetch methods using pands.read_excel
prakaa Feb 11, 2020
c2bd1f1
removed dates from static tables and bug fix to static_xl
prakaa Feb 11, 2020
0cc50ee
remove all provision of start and end time to static tables, includin…
prakaa Feb 11, 2020
5cf8f65
updated URL for FCAS variables table
prakaa Feb 12, 2020
28ca541
added user-agent header to requests and error handling for request ex…
prakaa Feb 12, 2020
9e259a2
add all HTTP 400 error codes to handler
prakaa Feb 12, 2020
1d0843b
bugfix: bitwise operators converted to boolean
prakaa Feb 12, 2020
7ce3895
Merge branch 'master' of https://github.com/UNSW-CEEM/NEMOSIS into UN…
prakaa Feb 12, 2020
c5c2ed9
modify fetch methods to ouput multiple formats and clean up downloade…
prakaa Feb 17, 2020
6bb4795
fixed bugs for HTTP 400 error catching
prakaa Feb 17, 2020
fcba777
print warning not raise http error as intermediate fcas times do not …
prakaa Feb 19, 2020
e86f080
add option in data fetch methods to not concatenate data
prakaa Feb 19, 2020
43c0d9b
option to pass kwargs to data format writer (pd.to_*). Detect .csv an…
prakaa Mar 2, 2020
5246897
filter function on date now omits data with invalid dates
prakaa Mar 2, 2020
05ce336
xlrd added to deps for data fetch methods using pands.read_excel
prakaa Feb 11, 2020
d57c459
removed dates from static tables and bug fix to static_xl
prakaa Feb 11, 2020
31d072d
bug fix for static_xl
prakaa Mar 16, 2020
e59e73a
updated URL for FCAS variables table
prakaa Feb 12, 2020
c3cffa0
added user-agent header to requests and error handling for request ex…
prakaa Feb 12, 2020
01da430
add all HTTP 400 error codes to handler
prakaa Feb 12, 2020
f5b7e1e
bugfix: bitwise operators converted to boolean
prakaa Feb 12, 2020
11ea674
modify fetch methods to ouput multiple formats and clean up downloade…
prakaa Feb 17, 2020
677d076
fixed bugs for HTTP 400 error catching
prakaa Feb 17, 2020
6576e1c
print warning not raise http error as intermediate fcas times do not …
prakaa Feb 19, 2020
5519004
add option in data fetch methods to not concatenate data
prakaa Feb 19, 2020
823d72b
option to pass kwargs to data format writer (pd.to_*). Detect .csv an…
prakaa Mar 2, 2020
8e69420
filter function on date now omits data with invalid dates
prakaa Mar 2, 2020
5e5491e
read function infers dtypes, does not cast to string
prakaa Mar 6, 2020
de0d97d
add start and end time back into static_xl
prakaa Mar 16, 2020
2ebea07
merge to keep start and end time in fetch methods
prakaa Mar 16, 2020
aaac2df
extract fcas providers table from registration and exemption list
prakaa Mar 22, 2020
d24abad
Merge branch 'pocket-rocket-nemosis' of github.com:prakaa/NEMOSIS int…
prakaa Mar 22, 2020
5a7f38b
add start and end dates to data-fetch-methods. need to update tests, etc
prakaa Mar 30, 2020
fbc8777
start+end times in static table fn calls and tests
prakaa Apr 2, 2020
5f6e5e4
fix fetching when csv fformat specified
prakaa Apr 20, 2020
2d5548e
for parquet and csv (where possible) remove index from file write
prakaa Apr 30, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion nemosis/custom_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def fcas4s_scada_match(start_time, end_time, table_name, raw_data_location, sele
start_time, end_time, table_name_fcas4s, raw_data_location)
# Pull in the 4 second fcas variable types.
table_name_variable_types = 'VARIABLES_FCAS_4_SECOND'
fcas4s_variable_types = data_fetch_methods.static_table(start_time, end_time, table_name_variable_types,
fcas4s_variable_types = data_fetch_methods.static_table(start_time, end_time,
table_name_variable_types,
raw_data_location)

# Select the variable types that measure MW on an interconnector and Gen_MW from a dispatch unit.
Expand Down
209 changes: 144 additions & 65 deletions nemosis/data_fetch_methods.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,37 @@
import os
from datetime import datetime, timedelta
import glob
import pandas as pd
import feather
from nemosis import filters, downloader, processing_info_maps, defaults, custom_tables
print(custom_tables.__file__)
from datetime import datetime
from nemosis import filters, downloader, \
processing_info_maps, defaults, custom_tables


def dynamic_data_compiler(start_time, end_time, table_name, raw_data_location, select_columns=None, filter_cols=None,
filter_values=None):
def dynamic_data_compiler(start_time, end_time, table_name, raw_data_location,
select_columns=None, filter_cols=None,
filter_values=None, fformat='feather',
keep_csv=True, data_merge=True, **kwargs):
"""
Downloads and compiles data for all dynamic tables.
Refer to README for tables
Args:
start_time (str): format 'yyyy/mm/dd HH:MM:SS'
end_time (str): format 'yyyy/mm/dd HH:MM:SS'
table_name (str): table as per documentation
raw_data_location (str): directory to download and cache data to.
existing data will be used if in this dir.
select_columns (list): return select columns
filter_cols (list): filter on columns
filter_values (list): filter index n filter col such that values are
equal to index n filter value
fformat (string): "feather" or "parquet" for storage and access
keep_csv (bool): retains CSVs in cache
data_merge (bool): concatenate DataFrames.
**kwargs: additional arguments passed to the pd.to_{fformat}() function

Returns:
all_data (pd.Dataframe): All data concatenated.
"""

print('Compiling data for table {}.'.format(table_name))
# Generic setup common to all tables.
if select_columns is None:
Expand All @@ -16,7 +40,6 @@ def dynamic_data_compiler(start_time, end_time, table_name, raw_data_location, s
# Pre loop setup, done at table type basis.
date_filter = processing_info_maps.filter[table_name]
setup_function = processing_info_maps.setup[table_name]

if setup_function is not None:
start_time, end_time = setup_function(start_time, end_time)

Expand All @@ -33,72 +56,115 @@ def dynamic_data_compiler(start_time, end_time, table_name, raw_data_location, s
end_time = datetime.strptime(end_time, '%Y/%m/%d %H:%M:%S')
start_search = datetime.strptime(start_search, '%Y/%m/%d %H:%M:%S')

data_tables = dynamic_data_fetch_loop(start_search, start_time, end_time, table_name, raw_data_location,
select_columns, date_filter, search_type)
data_tables = dynamic_data_fetch_loop(start_search, start_time, end_time,
table_name, raw_data_location,
select_columns, date_filter,
search_type, fformat=fformat,
keep_csv=keep_csv,
data_merge=data_merge,
write_kwargs=kwargs)

all_data = pd.concat(data_tables, sort=False)
if data_merge:
all_data = pd.concat(data_tables, sort=False)

finalise_data = processing_info_maps.finalise[table_name]
if finalise_data is not None:
for function in finalise_data:
all_data = function(all_data, start_time, table_name)
finalise_data = processing_info_maps.finalise[table_name]
if finalise_data is not None:
for function in finalise_data:
all_data = function(all_data, start_time, table_name)

if filter_cols is not None:
all_data = filters.filter_on_column_value(all_data, filter_cols, filter_values)
if filter_cols is not None:
all_data = filters.filter_on_column_value(all_data, filter_cols,
filter_values)

return all_data
return all_data


def dynamic_data_fetch_loop(start_search, start_time, end_time, table_name, raw_data_location, select_columns,
date_filter, search_type):
def dynamic_data_fetch_loop(start_search, start_time, end_time, table_name,
raw_data_location, select_columns,
date_filter, search_type, fformat='feather',
keep_csv=True, data_merge=True, write_kwargs=None):
data_tables = []
read_function = {'feather': pd.read_feather,
'csv': pd.read_csv,
'parquet': pd.read_parquet}
table_type = defaults.table_types[table_name]
date_gen = processing_info_maps.date_gen[table_type](start_search, end_time)
date_gen = processing_info_maps.date_gen[table_type](start_search,
end_time)

for year, month, day, index in date_gen:
data = None
# Write the file names and paths for where the data is stored in the cache.
filename_full, path_and_name, filename_full_feather, path_and_name_feather = \
processing_info_maps.write_filename[table_type](table_name, month, year, day, index, raw_data_location)
# Write the file names and paths
# for where the data is stored in the cache.
filename_stub, path_and_name = \
processing_info_maps.write_filename[table_type](table_name, month,
year, day, index,
raw_data_location)
full_filename = path_and_name + f'.{fformat}'

# If the data needed is not in the cache then download it.
if not os.path.isfile(path_and_name):
if not glob.glob(full_filename):
if day is None:
print('Downloading data for table {}, year {}, month {}'.format(table_name, year, month))
print(f'Downloading data for table {table_name}, '
+ f'year {year}, month {month}')
else:
print('Downloading data for table {}, year {}, month {}, day {}, time {}'.
format(table_name, year, month, day, index))
print(f'Downloading data for table {table_name}, '
+ f'year {year}, month {month}, day {day},'
+ f'time {index}.')

processing_info_maps.downloader[table_type](year, month, day, index, filename_full, raw_data_location)
processing_info_maps.downloader[table_type](year, month, day,
index, filename_stub,
raw_data_location)

# If the data exists in feather format the read in the data. If it only exists as a csv then read in from the
# csv and save to feather.
if os.path.isfile(path_and_name_feather) and os.stat(path_and_name_feather).st_size > 2000:
data = feather.read_dataframe(path_and_name_feather, select_columns)
elif os.path.isfile(path_and_name):
# If the data exists in the desired format, read it in
# If it does not, then read in from the csv and save to desired format
if glob.glob(full_filename) and fformat != 'csv':
data = read_function[fformat](full_filename,
columns=select_columns)
elif not glob.glob(path_and_name + '.[cC][sS][vV]'):
continue
else:
csv_file = glob.glob(path_and_name + '.[cC][sS][vV]')[0]
if day is None:
print('Creating feather file for faster future access of table {}, year {}, month {}.'.
format(table_name, year, month))
print(f'Creating {fformat} file for '
+ f'{table_name}, {year}, {month}.')
else:
print('Creating feather file for faster future access of table {}, year {}, month {}, day {}, time {}.'.
format(table_name, year, month, day, index))
print(f'Creating {fformat} file for '
+ f'{table_name}, {year}, {month}, {day}, {index}.')
# Check what headers the data has.
headers = pd.read_csv(path_and_name, skiprows=[0], nrows=1).columns
if defaults.table_types[table_name] == 'MMS':
# Remove columns from the table column list if they are not in the header, this deals with the fact AEMO
# has added and removed columns over time.
columns = [column for column in defaults.table_columns[table_name] if column in headers]
# Read the data from a csv.
data = pd.read_csv(path_and_name, skiprows=[0], dtype=str, usecols=columns)
headers = read_function['csv'](csv_file,
skiprows=[0],
nrows=1).columns.tolist()
# Remove columns from the table column list
# if they are not in the header, this deals with the fact
# AEMO has added and removed columns over time.
columns = [column for column in
defaults.table_columns[table_name]
if column in headers]
data = read_function['csv'](csv_file, skiprows=[0],
usecols=columns)
data = data[:-1]
elif defaults.table_types[table_name] == 'FCAS':
else:
columns = defaults.table_columns[table_name]
data = pd.read_csv(path_and_name, skiprows=[0], dtype=str, names=columns)
data = read_function['csv'](csv_file, skiprows=[0],
names=columns)

# Remove files of the same name
# Deals with case of corrupted files.
if os.path.isfile(full_filename):
os.unlink(full_filename)
# Write to required format
to_function = {'feather': data.to_feather,
'csv': data.to_csv,
'parquet': data.to_parquet}
if fformat == 'feather':
to_function[fformat](full_filename, **write_kwargs)
else:
to_function[fformat](full_filename, index=False,
**write_kwargs)
if not keep_csv:
os.remove(csv_file)

# Remove feather files of the same name, deals with case of corrupted files.
if os.path.isfile(path_and_name_feather):
os.unlink(path_and_name_feather)
# Write to feather file.
data.to_feather(path_and_name_feather)
# Delete any columns in data that were not explicitly selected.
if select_columns is not None:
for column in columns:
Expand All @@ -109,46 +175,59 @@ def dynamic_data_fetch_loop(start_search, start_time, end_time, table_name, raw_
# Filter by the start and end time.
if date_filter is not None:
data = date_filter(data, start_time, end_time)

data_tables.append(data)
if data_merge:
data_tables.append(data)

return data_tables


def static_table(start_time, end_time, table_name, raw_data_location, select_columns=None, filter_cols=None,
filter_values=None):
def static_table(start_time, end_time, table_name,
raw_data_location, select_columns=None,
filter_cols=None, filter_values=None):
print('Retrieving static table {}.'.format(table_name))
path_and_name = raw_data_location + '/' + defaults.names[table_name]
if not os.path.isfile(path_and_name):
print('Downloading data for table {}.'.format(table_name))
downloader.download_csv(defaults.static_table_url[table_name], raw_data_location, path_and_name)
downloader.download_csv(defaults.static_table_url[table_name],
raw_data_location, path_and_name)

table = pd.read_csv(raw_data_location + '/' + defaults.names[table_name], dtype=str,
table = pd.read_csv(raw_data_location + '/' + defaults.names[table_name],
dtype=str,
names=defaults.table_columns[table_name])
if select_columns is not None:
table = table.loc[:, select_columns]
for column in table.select_dtypes(['object']).columns:
table[column] = table[column].map(lambda x: x.strip())

if filter_cols is not None:
table = filters.filter_on_column_value(table, filter_cols, filter_values)
table = filters.filter_on_column_value(table, filter_cols,
filter_values)

return table


def static_table_xl(start_time, end_time, table_name, raw_data_location, select_columns=None, filter_cols=None,
filter_values=None):
path_and_name = raw_data_location + '/' + defaults.names[table_name] + '.xls'
def static_table_xl(start_time, end_time, table_name, raw_data_location,
select_columns=None, filter_cols=None, filter_values=None):
path_and_name = (raw_data_location + '/'
+ defaults.names[table_name] + '.xls')
print('Retrieving static table {}.'.format(table_name))
if not os.path.isfile(path_and_name):
print('Downloading data for table {}.'.format(table_name))
downloader.download_xl(defaults.static_table_url[table_name], raw_data_location, path_and_name)
downloader.download_xl(defaults.static_table_url[table_name],
raw_data_location, path_and_name)
xls = pd.ExcelFile(path_and_name)
table = pd.read_excel(xls, 'Generators and Scheduled Loads', dtype=str)
table = table.loc[:, select_columns]
table = pd.read_excel(xls, defaults.reg_exemption_list_tabs[table_name], dtype=str)
if select_columns is not None:
table = table.loc[:, select_columns]
if filter_cols is not None:
table = filters.filter_on_column_value(table, filter_cols, filter_values)
table = table.drop_duplicates(['DUID'])
table = filters.filter_on_column_value(table, filter_cols,
filter_values)
if table_name in defaults.table_primary_keys.keys():
primary_keys = defaults.table_primary_keys[table_name]
table = table.drop_duplicates(primary_keys)

table.dropna(axis=0, how='all', inplace=True)
table.dropna(axis=1, how='all', inplace=True)

return table

Expand Down
2 changes: 1 addition & 1 deletion nemosis/date_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ def year_month_day_index_gen(start_time, end_time):
continue
for minute in range(55, -1, -5):
index = str(hour).zfill(2) + str(minute).zfill(2)
yield str(year), month, str(day).zfill(2), index
yield str(year), month, str(day).zfill(2), index
Loading