Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
eisber committed Jan 27, 2017
1 parent bff3046 commit b351234
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 80 deletions.
34 changes: 17 additions & 17 deletions DataScience/DataScience.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from multiprocessing.dummy import Pool
from shutil import rmtree
from vowpalwabbit import pyvw

import gzip

# m = CheckpointedModel(1, 'd:/Data/TrackRevenue', 'onlinetrainer', '20161204\\000052\\')

Expand Down Expand Up @@ -76,16 +76,16 @@ def load_data(ts, blob):
print('Found {0} events. Sorting data files by time...'.format(len(global_idx)))
data.sort(key=lambda jd: jd.ts)

#ordered_joined_events = open(os.path.join(cache_folder, 'data_' + start_date_string + '-' + end_date_string + '.json'), 'w', encoding='utf8')
#for jd in data:
# print (jd.filename)
# file = open(jd.filename, 'r', encoding='utf8')
# for line in file:
# line = line.strip() + ('\n')
# _ = ordered_joined_events.write(line)
#ordered_joined_events.close()
ordered_joined_events = gzip.open(os.path.join(cache_folder, 'data_' + start_date_string + '-' + end_date_string + '.json.gz'), 'wt', encoding='utf8')
for jd in data:
print (jd.filename)
file = open(jd.filename, 'r', encoding='utf8')
for line in file:
line = line.strip() + ('\n')
_ = ordered_joined_events.write(line)
ordered_joined_events.close()

#sys.exit(0)
sys.exit(0)

# concatenate file ordered by time

Expand Down Expand Up @@ -174,13 +174,13 @@ def tabulate_metrics(metrics, top = None):
num_valid_events += 1
else:
missing_events_counter += 1
if num_valid_events > 0:
scoring_model_filename = os.path.join(scoring_dir,
m.ts.strftime('%Y'),
m.ts.strftime('%m'),
m.ts.strftime('%d'),
m.modelid + '.model')
_ = ordered_joined_events.write(json.dumps({'_tag':'save_{0}'.format(scoring_model_filename)}) + ('\n'))
if num_valid_events > 0:
scoring_model_filename = os.path.join(scoring_dir,
m.ts.strftime('%Y'),
m.ts.strftime('%m'),
m.ts.strftime('%d'),
m.model_id + '.model')
_ = ordered_joined_events.write(json.dumps({'_tag':'save_{0}'.format(scoring_model_filename)}) + ('\n'))
ordered_joined_events.close()

# Commenting out debugging prints
Expand Down
3 changes: 3 additions & 0 deletions DataScience/DataScience.pyproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
<Compile Include="common.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="Complex-FeatureEng.py">
<SubType>Code</SubType>
</Compile>
<Compile Include="DataScience.py" />
<Compile Include="Eval.py">
<SubType>Code</SubType>
Expand Down
101 changes: 51 additions & 50 deletions DataScience/Experimentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import threading
from datetime import datetime
import configparser
import gzip

class Command:
def __init__(self, base, learning_rate="", cb_type="", marginal_list="", ignore_list="", interaction_list="", regularization=""):
Expand All @@ -22,34 +23,28 @@ def __init__(self, base, learning_rate="", cb_type="", marginal_list="", ignore_
full_command = "{0} -l {1}".format(full_command, learning_rate)
if cb_type != "":
full_command = "{0} --cb_type {1}".format(full_command, cb_type)
if marginal_list != "":
if len(marginal_list) != 0:
full_command = "{0} --marginal {1}".format(full_command, ''.join(marginal_list))
if ignore_list != "":
if len(ignore_list) != 0:
for ignored_namespace in ignore_list:
full_command = "{0} --ignore {1}".format(full_command, ignored_namespace)
if interaction_list != "":
if len(interaction_list) != 0:
for interaction in interaction_list:
full_command = "{0} -q {1}".format(full_command, interaction)
if regularization != "":
full_command = "{0} --l1 {1}".format(full_command, regularization)
self.full_command = full_command
print(full_command)
self.loss = None

def result_writer(queue):
def result_writer(command_list):
experiment_file = open("experiments.tsv", "a")
# Read from queue and write results to file until done
while True:
msg = queue.get()
if (msg == 'complete'):
break
command = msg
for command in command_list:
line = "{0:7f}\t{1}\t{2:7f}\t{3}\t{4}\t{5}\t{6}\t{7}\t{8}".format(float(command.loss), \
command.base, command.learning_rate, command.cb_type, str(command.marginal_list), \
str(command.ignore_list), str(command.interaction_list), str(command.regularization), str(datetime.now()))
experiment_file.write(line + "\n")
experiment_file.flush()

experiment_file.close()
experiment_file.flush()

def run_experiment(command, timeout=1000):
while True:
Expand All @@ -62,61 +57,65 @@ def run_experiment(command, timeout=1000):
m = re.search('average loss = (.+)\n', str(results))
loss = m.group(1)
command.loss = float(loss)
result_queue.put(command)
return command

def run_experiment_set(command_list):
# Run the experiments in parallel using 5 processes
p = Pool(5)
p = Pool(20)
results = p.map(run_experiment, command_list)
results.sort(key=lambda result: result.loss)
del p
result_writer(results)
return results

if __name__ == '__main__':
# Identify namespaces and detect marginal features
if len(sys.argv) < 2:
print("Usage: python experimenter.py {file_name}. Where file_name is the merged Decision Service logs in JSON format")
sys.exit()
data = open(sys.argv[1], 'r', encoding="utf8")
counter = 0
shared_features = []
action_features = []
marginal_features = []
for line in data:
counter += 1
event = json.loads(line)
for feature in event.keys():
if feature[0] != '_' and feature[0] not in shared_features:
shared_features.append(feature[0])
action_set = event['_multi']
for action in action_set:
for feature in action.keys():
if feature[0] != '_' and feature[0] not in action_features:
action_features.append(feature[0])
if action[feature].get('constant', 0) == 1 and 'id' in action[feature]:
marginal_features.append(feature[0])
# We are assuming the schema is consistent throughout the file, so we don't need to read all of it
if counter >= 50:
break


file_name = sys.argv[1]
# a if condition else b
with gzip.open(file_name, 'rt', encoding='utf8') if file_name.endswith('.gz') else open(file_name, 'r', encoding="utf8") as data:
counter = 0
shared_features = []
action_features = []
marginal_features = []
for line in data:
counter += 1
event = json.loads(line)
for feature in event.keys():
if feature[0] != '_' and feature[0] not in shared_features:
shared_features.append(feature[0])
action_set = event['_multi']
for action in action_set:
for feature in action.keys():
if feature[0] != '_' and feature[0] not in action_features:
action_features.append(feature[0])
if action[feature].get('constant', 0) == 1 and 'id' in action[feature]:
marginal_features.append(feature[0])
# We are assuming the schema is consistent throughout the file, so we don't need to read all of it
if counter >= 50:
break

# disable auto discovery
# shared_features = []
# action_features = []
# marginal_features = []

# Read config file to get certain parameter values for experimentation
config = configparser.ConfigParser()
config.read('ds.config')
experiments_config = config['Experimentation']

data.close()
print("Shared feature namespaces: " + str(shared_features))
print("Action feature namespaces: " + str(action_features))
print("Marginal feature namespaces: " + str(marginal_features))

# Start the writing process for experiment results
result_queue = Queue()
writer_p = Process(target=result_writer, args=((result_queue),))
writer_p.daemon = True
writer_p.start()

# Base parameters and setting up the cache file
base_command = "vw --cb_adf -d %s --json -c --power_t 0" % sys.argv[1] # TODO: VW location should be a command line parameter.
# -q LG -q TG
# base_command += " --quadratic UG --quadratic RG --quadratic AG --ignore B --ignore C --ignore D --ignore E --ignore F --marginal JK"
initial_command = Command(base_command, learning_rate=0.5)
run_experiment(initial_command, timeout=3600)

Expand Down Expand Up @@ -153,6 +152,9 @@ def run_experiment_set(command_list):
command = Command(base_command, learning_rate=best_learning_rate, cb_type=best_cb_type, marginal_list=marginal_list)
command_list.append(command)

if len(command_list) == 0:
break

results = run_experiment_set(command_list)
if results[0].loss < best_loss:
best_loss = results[0].loss
Expand All @@ -169,7 +171,7 @@ def run_experiment_set(command_list):
all_features = shared_features + action_features
while True:
command_list = []
for features in all_features:
for features in shared_features: # all_features:
for action_feature in action_features:
interaction_list = list(best_interaction_list)
interaction = '{0}{1}'.format(features, action_feature)
Expand All @@ -179,6 +181,9 @@ def run_experiment_set(command_list):
command = Command(base_command, learning_rate=best_learning_rate, cb_type=best_cb_type, marginal_list=best_marginal_list, interaction_list=interaction_list)
command_list.append(command)

if len(command_list) == 0:
break

results = run_experiment_set(command_list)
if results[0].loss < best_loss:
best_loss = results[0].loss
Expand All @@ -198,10 +203,6 @@ def run_experiment_set(command_list):
best_regularization = results[0].regularization

# TODO: Repeat above process of tuning parameters and interactions until convergence / no more improvements.

# Wait for writing thread to finish
result_queue.put('complete')
writer_p.join()

print("Best parameters:")
print("Best learning rate: {0}".format(best_learning_rate))
Expand Down
38 changes: 28 additions & 10 deletions DataScience/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,22 @@ def get_checkpoint_models(block_blob_service, start_date, end_date):
yield (ts, 'onlinetrainer', time_container.name)

class CachedBlob:
def __init__(self, block_blob_service, root, container, name):
def __init__(self, block_blob_service, root, container, name, expected_size):
self.filename = os.path.join(str(root), str(container), str(name))

# TODO: validate the size
if not os.path.exists(self.filename):
print(self.filename)
dn = ntpath.dirname(self.filename)
if not os.path.exists(dn):
os.makedirs(dn)
block_blob_service.get_blob_to_path(container, name,self. filename)
block_blob_service.get_blob_to_path(container, name, self.filename)
else:
actual_size = os.stat(self.filename).st_size
if actual_size != expected_size:
print('{0} mismatch in size. Expected: {1} vs {2}'.format(self.filename, expected_size, actual_size))
os.remove(self.filename)
block_blob_service.get_blob_to_path(container, name, self.filename)

class JoinedDataReader:
def __init__(self, joined_data):
Expand Down Expand Up @@ -76,7 +84,7 @@ def __init__(self, ids):
# single joined data blob
class JoinedData(CachedBlob):
def __init__(self, block_blob_service, root, joined_examples_container, ts, blob):
super(JoinedData,self).__init__(block_blob_service, root, joined_examples_container, blob.name)
super(JoinedData,self).__init__(block_blob_service, root, joined_examples_container, blob.name, blob.properties.content_length)
self.blob = blob
self.ts = ts
self.blob = blob
Expand All @@ -91,13 +99,23 @@ def index(self):
self.ids.append(Event(event_and_model_id))
f.close()
else:
f = open(self.filename, 'r', encoding='utf8')
for line in f:
js = json.loads(line)
evt_id = js['_eventid']
model_id = js['_modelid']
self.ids.append(Event(evt_id, model_id))
f.close()
with open(self.filename + '.ids', 'w', encoding='utf8') as f_id:
with open(self.filename, 'r', encoding='utf8') as f:
for line in f:
js = json.loads(line)

evt_id = js['_eventid']
_ = f_id.write(evt_id)

# model id might be missing in older data sets
model_id = None
if '_modelid' in js:
model_id = js['_modelid']
_ = f_id.write(' ')
_ = f_id.write(model_id)
_ = f_id.write('\n')

self.ids.append(Event([evt_id, model_id]))

def ips(self, policies):
f = open(self.filename, 'r')
Expand Down
25 changes: 25 additions & 0 deletions LogCookig/LogCookig.vcxproj.filters
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<Filter Include="Source Files">
<UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier>
<Extensions>cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx</Extensions>
</Filter>
<Filter Include="Header Files">
<UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier>
<Extensions>h;hh;hpp;hxx;hm;inl;inc;xsd</Extensions>
</Filter>
<Filter Include="Resource Files">
<UniqueIdentifier>{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}</UniqueIdentifier>
<Extensions>rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms</Extensions>
</Filter>
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="main.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
</Project>
6 changes: 3 additions & 3 deletions LogCookig/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ using namespace std;
struct EventReaderHandler : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, EventReaderHandler>
{
bool _found_event_id_key = false;
bool _found_model_id = false;
bool _found_model_id = false;

string event_id;
string model_id;
string model_id;

bool Key(const char* str, SizeType len, bool copy)
{
Expand Down Expand Up @@ -73,7 +73,7 @@ int main(int argc, char* argv[])
cerr << "Missing event id on line " << line_number << endl;
return -1;
}
output << handler.event_id << endl; // << " " << handler.model_id << endl;
output << handler.event_id << " " << handler.model_id << endl;

line_number++;
}
Expand Down

0 comments on commit b351234

Please sign in to comment.