**Python Version Requirement:** Python 3.6

In [2]:
from d3m import container
import datamart
import datamart_nyu
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_absolute_error, mean_squared_error, \
    mean_squared_log_error, median_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from pathlib import Path
import subprocess
import os
import shutil
import json
import time

In [3]:
DATAMART_PATH = '/Users/fchirigati/projects/d3m/datamart'

In [4]:
def train_and_test_model(data, target_variable_name):
    """Builds a model using data to predict the target variable.
    """

    X_train, X_test, y_train, y_test = train_test_split(
        data.drop(target_variable_name, axis=1),
        data[target_variable_name],
        test_size=0.33,
        random_state=42
    )

    # normalizing data first
    scaler_X = StandardScaler().fit(X_train)
    scaler_y = StandardScaler().fit(y_train.values.reshape(-1, 1))
    X_train = scaler_X.transform(X_train)
    y_train = scaler_y.transform(y_train.values.reshape(-1, 1))
    X_test = scaler_X.transform(X_test)
    y_test = scaler_y.transform(y_test.values.reshape(-1, 1))

    forest = RandomForestRegressor(
        n_estimators=100,
        random_state=42,
        n_jobs=-1,
        max_depth=len(data.columns)-1
    )
    forest.fit(X_train, y_train.ravel())
    yfit = forest.predict(X_test)

    return dict(
        mean_absolute_error=mean_absolute_error(y_test, yfit),
        mean_squared_error=mean_squared_error(y_test, yfit),
        median_absolute_error=median_absolute_error(y_test, yfit),
        r2_score=r2_score(y_test, yfit)
    )

In [5]:
def get_performance_scores(data, target_variable_name, missing_value_imputation):
    """Builds a model using data to predict the target variable,
    returning different performance metrics.
    """

    if missing_value_imputation:
        
        # imputation on data
        fill_NaN = SimpleImputer(missing_values=np.nan, strategy='mean')
        new_data = pd.DataFrame(fill_NaN.fit_transform(data))
        new_data.columns = data.columns
        new_data.index = data.index

        # training and testing model
        return train_and_test_model(new_data, target_variable_name)

    else:
        return train_and_test_model(data, target_variable_name)

In [6]:
def print_results(results):
    if not results:
        return
    for result in results:
        print(result.score())
        print(result.get_json_metadata()['metadata']['name'])
        if (result.get_augment_hint()):
            left_columns = []
            for column_ in result.get_augment_hint().left_columns:
                left_columns.append([])
                for column in column_:
                    left_columns[-1].append((column.resource_id, column.column_index))
            print("Left Columns: %s" % str(left_columns))
            right_columns = []
            for column_ in result.get_augment_hint().right_columns:
                right_columns.append([])
                for column in column_:
                    right_columns[-1].append((column.resource_id, column.column_index))
            print("Right Columns: %s" % str(right_columns))
        else:
            print(result.id())
        print("-------------------")

In [7]:
def get_materialize_info(results):
    if not results:
        return
    id_to_materialize = dict()
    for result in results:
        id_ = result.get_json_metadata()['id']
        if id_ in id_to_materialize:
            continue
        id_to_materialize[id_] = dict(
            has_info=False,
            url=None,
            path=None
        )
        if 'direct_url' in result.get_json_metadata()['metadata']['materialize']:
            id_to_materialize[id_]['url'] = result.get_json_metadata()['metadata']['materialize']['direct_url']
            id_to_materialize[id_]['has_info'] = True
        else:
            # try to find them on volumes
            datamart_file_path = os.path.join(DATAMART_PATH, 'volumes/datasets', id_, 'main.csv')
            if os.path.exists(datamart_file_path):
                id_to_materialize[id_]['path'] = datamart_file_path
                id_to_materialize[id_]['has_info'] = True
    return id_to_materialize

In [8]:
def download_datasets_and_generate_training_records(results, supplied_data, supplied_data_path, target,
                                                    id_to_materialize, dir_):
    if not results:
        return
    
    training_records = list()
    
    current_working_dir = os.getcwd()
    os.chdir(dir_)
    try:
        # downloading candidate datasets
        for id_ in id_to_materialize:
            if id_to_materialize[id_]['url'] or id_to_materialize[id_]['path']:
                if id_to_materialize[id_]['url']:
                    subprocess.call('wget -O %s %s'%(id_, id_to_materialize[id_]['url']), shell=True)
                else:
                    shutil.copyfile(id_to_materialize[id_]['path'], id_)
                    
                if (not os.path.exists(id_)) or os.stat(id_).st_size <= 0:
                    print('%s has no valid materialization information for download.' % id_)
                    id_to_materialize[id_]['has_info'] = False
                    continue
                    
            else:
                print('%s has no materialization information for download.' % id_)
            
        os.mkdir('joined-datasets')
        for i in range(len(results)):
            time.sleep(2)
            metadata = results[i].get_json_metadata()
            id_ = metadata['id']
            if not id_to_materialize[id_]['has_info']:
                continue
                
            try:
                join_ = results[i].augment(
                    supplied_data=supplied_data,
                    connection_url='http://localhost:8002/'
                )
            except Exception as e:
                continue
                
            # excluding d3mIndex
            join_['learningData'].drop(['d3mIndex'], axis=1, inplace=True)
            
            # query and candidate keys
            left_column_index = results[i].get_augment_hint().left_columns[0][0].column_index
            right_column_index = results[i].get_augment_hint().right_columns[0][0].column_index
            query_key = list(supplied_data['learningData'].columns)[left_column_index]
            candidate_key = metadata['metadata']['columns'][right_column_index]['name']
            
            # paths
            join_path = 'joined-datasets/%d.csv'%i
            candidate_path = '%s_%s'%(id_, candidate_key.replace('%s'%os.path.sep, '_'))
            
            if not os.path.exists(candidate_path):
                companion_data = pd.read_csv(id_)
                # collecting candidate key column
                candidate_key_column = companion_data[candidate_key]
                # excluding categorical / textual attributes
                companion_data = companion_data.select_dtypes(exclude=['object'])
                if candidate_key not in companion_data.columns:
                    companion_data[candidate_key] = candidate_key_column
                # excluding columns with all NaN values
                companion_data.dropna(axis=1, how='all', inplace=True)
                # if the final dataset has only the key, ignore
                if len(companion_data.columns) < 2:
                    continue
                # saving candidate dataset
                companion_data.to_csv(candidate_path, index=False)
            
            # need to load and save again to exclude categorical / textual attributes
            join_['learningData'].to_csv(join_path, index=False)
            joined_data = pd.read_csv(join_path)
            # collecting key column
            key_column = joined_data[query_key]
            # if key column is not unique, this means that aggregation is necessary
            #   so we ignore
            if len(set(key_column.tolist())) != len(key_column.tolist()):
                os.remove(candidate_path)
                os.remove(join_path)
                continue
            # excluding categorical / textual attributes
            joined_data = joined_data.select_dtypes(exclude=['object'])
            if query_key not in joined_data.columns:
                joined_data[query_key] = key_column
            # excluding columns with all NaN values
            joined_data.dropna(axis=1, how='all', inplace=True)
            # if number of columns in joined dataset is the same as in query data,
            #   it means that there was no join (no intersection), and we ignore
            if len(supplied_data['learningData'].columns) -1 == len(joined_data.columns):
                os.remove(candidate_path)
                os.remove(join_path)
                continue
            joined_data.to_csv(join_path, index=False)
            
            # scores before augmentation
            scores_query = get_performance_scores(
                pd.read_csv(supplied_data_path).drop([query_key], axis=1),
                target,
                True
            )
            
            # scores after augmentation
            scores_query_candidate = get_performance_scores(
                joined_data.drop([query_key], axis=1),
                target,
                True
            )
            
            training_records.append(dict(
                query_dataset=supplied_data_path,
                query_key=query_key,
                target=target,
                candidate_dataset=os.path.abspath(candidate_path),
                candidate_key=candidate_key,
                joined_dataset=os.path.abspath('joined-datasets/%d.csv'%i),
                imputation_strategy='mean',
                mean_absolute_error=[scores_query['mean_absolute_error'],
                                     scores_query_candidate['mean_absolute_error']],
                mean_squared_error=[scores_query['mean_squared_error'],
                                    scores_query_candidate['mean_squared_error']],
                median_absolute_error=[scores_query['median_absolute_error'],
                                       scores_query_candidate['median_absolute_error']],
                r2_score=[scores_query['r2_score'],
                          scores_query_candidate['r2_score']]
            ))
    except Exception as e:
        raise e
    finally:
        os.chdir(current_working_dir)
        
    return training_records

In [9]:
if not os.path.exists('companion-datasets'):
    os.mkdir('companion-datasets')
for p in ['taxi-vehicle-collision', 'ny-taxi-demand', 'college-debt', 'poverty-estimation']:
    if not os.path.exists('companion-datasets/%s'%p):
        os.mkdir('companion-datasets/%s'%p)

In [10]:
client = datamart_nyu.NYUDatamart('http://localhost:8002/')

## NY Taxi and Vehicle Collision Problem

In [25]:
taxi_vehicle_collision_path = str(Path.home()) + '/projects/dataset-ranking/use-cases/data/taxi-vehicle-collision/' +\
       'taxi-vehicle-collision-v2.csv'
taxi_vehicle_collision = container.Dataset.load('file://' + taxi_vehicle_collision_path)

In [26]:
cursor = client.search_with_data(query=None, supplied_data=taxi_vehicle_collision)

In [27]:
taxi_vehicle_collision_results = list()
results = cursor.get_next_page()
while results:
    taxi_vehicle_collision_results += results
    results = cursor.get_next_page()

In [28]:
len(taxi_vehicle_collision_results)

1067

In [29]:
# print_results(taxi_vehicle_collision_results)

In [30]:
taxi_vehicle_collision_info = get_materialize_info(taxi_vehicle_collision_results)

In [31]:
taxi_vehicle_collision_training_records = download_datasets_and_generate_training_records(
    taxi_vehicle_collision_results,
    taxi_vehicle_collision,
    taxi_vehicle_collision_path,
    'n. trips',
    taxi_vehicle_collision_info,
    'companion-datasets/taxi-vehicle-collision/'
)

datamart.url.0a41288b3f9256e9906062a5fd75169a has no valid materialization information for download.
datamart.upload.a031bc4968cb4838967e4709e63a0ddc has no materialization information for download.
datamart.upload.83ee6db44a3f434aa0031dc4eb266094 has no materialization information for download.
datamart.upload.469f627ada7349f285ad22d3028bc38d has no materialization information for download.
datamart.upload.c90cd58ac0c54b169580b49b387cc59e has no materialization information for download.
datamart.upload.4095f5182de54d2fb4ceb5bc6268f627 has no materialization information for download.
datamart.upload.4eb2156e6a994f33ba71dd59f44c4c59 has no materialization information for download.


  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
Error from DataMart: 500 Internal Server Error
  if (await self.run_code(code, result,  async_=asy)):
Error from DataMart: 500 Internal Server Error
Error from DataMart: 500 Internal Server Error
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
Error from DataMart: 500 Internal Server Error
Error from DataMart: 500 Internal Server Error
Error from DataMart: 500 Internal Server Error
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  i

  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
Error from DataMart: 500 Internal Server Error
Error from DataMart: 500 Internal Server Error
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
Error from DataMart: 500 Internal Server Error
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
  if (await self.run_code(code, result,  async_=asy)):
Error from DataMart: 500 Internal Server Error
  if (await self.run_code(code, result,  async_=asy)):
Error from DataMart: 500 Internal Server Error
  if (await self.run_code(code, result,  async_=as

# NY Taxi Demand Problem

In [17]:
ny_taxi_demand_path = str(Path.home()) + '/projects/dataset-ranking/use-cases/data/ny-taxi-demand/' +\
       'yellow-taxi-2017-v2.csv'
ny_taxi_demand = container.Dataset.load('file://' + ny_taxi_demand_path)

## College Debt Problem

In [11]:
college_debt_path = str(Path.home()) + '/projects/dataset-ranking/use-cases/data/college-debt/' +\
       'college-debt-v2.csv'
college_debt = container.Dataset.load('file://' + college_debt_path)

In [12]:
cursor = client.search_with_data(query=None, supplied_data=college_debt)

In [13]:
college_debt_results = list()
results = cursor.get_next_page()
while results:
    college_debt_results += results
    results = cursor.get_next_page()

In [14]:
len(college_debt_results)

3

In [15]:
print_results(college_debt_results)

0.9575737
Most- Recent- Cohorts- Scorecard- Elements
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 0)]]
-------------------
0.9575737
College Scorecard Data - Most Recent
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 0)]]
-------------------
0.8439322
Most- Recent- Cohorts- Scorecard- Elements
Left Columns: [[('0', 12)]]
Right Columns: [[('0', 83)]]
-------------------


In [16]:
college_debt_info = get_materialize_info(college_debt_results)

In [17]:
college_debt_training_records = download_datasets_and_generate_training_records(
    college_debt_results,
    college_debt,
    college_debt_path,
    'DEBT_EARNINGS_RATIO',
    college_debt_info,
    'companion-datasets/college-debt/'
)

  if (await self.run_code(code, result,  async_=asy)):


## Poverty Estimation Problem

In [18]:
poverty_estimation_path = str(Path.home()) + '/projects/dataset-ranking/use-cases/data/poverty-estimation/' +\
       'poverty-estimation-v2.csv'
poverty_estimation = container.Dataset.load('file://' + poverty_estimation_path)

In [19]:
cursor = client.search_with_data(query=None, supplied_data=poverty_estimation)

In [20]:
poverty_estimation_results = list()
results = cursor.get_next_page()
while results:
    poverty_estimation_results += results
    results = cursor.get_next_page()

In [21]:
len(poverty_estimation_results)

13

In [22]:
print_results(poverty_estimation_results)

1.0
SF Development Pipeline 2017 Q3
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 25)]]
-------------------
1.0
SF Development Pipeline 2017 Q2
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 31)]]
-------------------
1.0
SF Development Pipeline 2019 Q2
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 25)]]
-------------------
0.93730605
Zillow Median Listing Prices 2017
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 3)]]
-------------------
0.9362234
FIPS Population
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 0)]]
-------------------
0.9362234
Unemployment in the US
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 0)]]
-------------------
0.012638724
SF Development Pipeline 2016 Q3
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 23)]]
-------------------
0.0092448
SF Development Pipeline 2016 Q4
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 25)]]
-------------------
0.007953859
SF Development Pipeline 2016 Q2
Left Columns: [[('0', 1)]]
Right Columns: [[('0', 22)

In [23]:
poverty_estimation_info = get_materialize_info(poverty_estimation_results)

In [24]:
poverty_estimation_training_records = download_datasets_and_generate_training_records(
    poverty_estimation_results,
    poverty_estimation,
    poverty_estimation_path,
    'POVALL_2016',
    poverty_estimation_info,
    'companion-datasets/poverty-estimation/'
)

datamart.upload.a8241c91db1e4d75a4e4dd37cce12cd1 has no materialization information for download.
datamart.upload.2f6a998b4f5c4c589aaf990c867446b9 has no materialization information for download.


## Generating file with training records

In [32]:
if os.path.exists('taxi-vehicle-collision-datamart-records/'):
    shutil.rmtree('taxi-vehicle-collision-datamart-records/')
os.mkdir('taxi-vehicle-collision-datamart-records/')

In [33]:
training_records = open('taxi-vehicle-collision-datamart-records/datamart-records', 'w')
for record in taxi_vehicle_collision_training_records:
    training_records.write(json.dumps(record) + "\n")
training_records.close()

In [34]:
if os.path.exists('college-debt-datamart-records/'):
    shutil.rmtree('college-debt-datamart-records/')
os.mkdir('college-debt-datamart-records/')

In [35]:
training_records = open('college-debt-datamart-records/datamart-records', 'w')
for record in college_debt_training_records:
    training_records.write(json.dumps(record) + "\n")
training_records.close()

In [36]:
if os.path.exists('poverty-estimation-datamart-records/'):
    shutil.rmtree('poverty-estimation-datamart-records/')
os.mkdir('poverty-estimation-datamart-records/')

In [37]:
training_records = open('poverty-estimation-datamart-records/datamart-records', 'w')
for record in poverty_estimation_training_records:
    training_records.write(json.dumps(record) + "\n")
training_records.close()