# Shread train-dev-test csv datasets progressively using BigQuery 

In [2]:
import google.datalab.bigquery as bq
import pandas as pd
import numpy as np
import seaborn as sns
import shutil

In [3]:
def test_sample(a,b):
  basequery = """
  SELECT MAX(farmhash) as max_farmhash, COUNT(answer_count) as count
  FROM
  (
  SELECT 
    MOD(ABS(FARM_FINGERPRINT(CAST(id as STRING))), EVERY_N * 100)  as farmhash, answer_count
  FROM 
    `bigquery-public-data.stackoverflow.posts_questions`
  """
  sampler = "WHERE MOD(ABS(FARM_FINGERPRINT(CAST(id as STRING))), EVERY_N * 100) < 20 AND  MOD(ABS(FARM_FINGERPRINT(CAST(id as STRING))), EVERY_N * 100) >= 10  "
  sampler2 = "AND {0} >= {1}\n AND {0} < {2} )".format(
           "MOD(ABS(FARM_FINGERPRINT(CAST(id AS STRING))), EVERY_N * 100) * {}".format(10),
           a, b
          )
    
  return "{}\n{}\n{}".format(basequery, sampler, sampler2)

EVERY_N = 100
query_maxhash = test_sample(0,70).replace("EVERY_N", str(EVERY_N))
df_maxhash = bq.Query(query_maxhash).execute().result().to_dataframe()
print(df_maxhash)


def test_sample2(a,b):
  basequery = """
  SELECT MIN(farmhash10) as min_farmhash10, MAX(farmhash10) as max_farmhash10, COUNT(answer_count) as count
  FROM
  (  
  SELECT 
    MOD(ABS(FARM_FINGERPRINT(CAST(id as STRING))), EVERY_N * 100)*10  as farmhash10, answer_count
  FROM 
    `bigquery-public-data.stackoverflow.posts_questions`
  """
  sampler = "WHERE MOD(ABS(FARM_FINGERPRINT(CAST(id as STRING))), EVERY_N * 100) < 20 AND  MOD(ABS(FARM_FINGERPRINT(CAST(id as STRING))), EVERY_N * 100) >= 10  "
  sampler2 = "AND {0} >= {1}\n AND {0} < {2})".format(
           "MOD(ABS(FARM_FINGERPRINT(CAST(id AS STRING))), EVERY_N * 100) * {}".format(10),
           (10*10)+a, (10*10)+b
          )
    
  return "{}\n{}\n{}".format(basequery, sampler, sampler2)
  #return "{}\n{}".format(basequery, sampler)


EVERY_N = 100
queryhash = test_sample2(0,60).replace("EVERY_N", str(EVERY_N))
df_hash = bq.Query(queryhash).execute().result().to_dataframe()
print(df_hash.head())


  max_farmhash  count
0         None      0
   min_farmhash10  max_farmhash10  count
0             100             150  10333


In [4]:

def sample_between(a, b, shredstart):
  basequery = """
  SELECT 
    answer_count, comment_count, favorite_count,  score, view_count,
    TIMESTAMP_DIFF(last_activity_date, creation_date, DAY) as days_posted,
    IF(accepted_answer_id IS NULL , 0, 1 ) as accepted
  FROM 
    `bigquery-public-data.stackoverflow.posts_questions`
  """
  
  # Use sampling for initial model development. Once model is developed, shread the entire dataset into  .csv files based on condition in the sampler.
  sampler = "WHERE MOD(ABS(FARM_FINGERPRINT(CAST(id as STRING))), EVERY_N * 100) < {1} AND MOD(ABS(FARM_FINGERPRINT(CAST(id as STRING))), EVERY_N * 100) >= {0}".format(
            shredstart, shredstart + 10
            )
  sampler2 = "AND {0} >= {1}\n AND {0} < {2}".format(
           "MOD(ABS(FARM_FINGERPRINT(CAST(id AS STRING))), EVERY_N * 100) * {}".format(10),
           (shredstart*10)+a, (shredstart*10)+b
          )
  return "{}\n{}\n{}".format(basequery, sampler, sampler2)


def create_query(phase, EVERY_N, shredstart):
  """Phase: train (70%) valid (15%) or test (15%)"""
  query = ""
  if phase == 'train':
    query = sample_between(0,60, shredstart)
  elif phase == 'valid':
    query = sample_between(60,75, shredstart)
  else:
    query = sample_between(75, 100, shredstart)
  return query.replace("EVERY_N", str(EVERY_N))

#print(create_query('train', 100))
#(answer_count - AVG(answer_count)) / STDDEV_POP(answer_count)  as answer_count,
#IF(accepted_answer_id IS NULL , cast(0 as int64), cast(1 as int64)) as accepted

In [5]:
def to_csv(df, filename):
  outdf = df.copy(deep = True)
  #outdf.loc[:, 'key'] = np.arange(0, len(outdf)) # rownumber as key
  # Reorder columns so that target is first column
  #print(outdf.head())
  #print(df.head())
  cols = outdf.columns.tolist()
  #print(cols)
  cols.remove('accepted')
  cols.insert(0, 'accepted')
  #print(cols)
  outdf = outdf[cols]  
  
  
  #Normalizing input columns  and replace NaN or null
  normalize_cols = outdf.columns.tolist()
  normalize_cols.remove('accepted')
  for normalize_cols_name in normalize_cols:
    outdf[normalize_cols_name].fillna(0, inplace = True)
    outdf[normalize_cols_name] = (outdf[normalize_cols_name] - outdf[normalize_cols_name].mean())  / outdf[normalize_cols_name].std() 
  #print(outdf)
  #print(outdf['answer_count'] )
  outdf.to_csv(filename,  header = False, index_label = False, index = False)
  print("Wrote {} to {}".format(len(outdf), filename))

In [6]:

for phase in ['train', 'valid', 'test']:
  #for x in range(2):
  for x in range(10):
    query = create_query(phase, 100, x*10)
    #print(query)
    df = bq.Query(query).execute().result().to_dataframe()
    #print(df.head())
    to_csv(df, 'stackoverflow-{}-{}.csv'.format(phase,(x+1)*10))

Wrote 10186 to stackoverflow-train-10.csv
Wrote 10333 to stackoverflow-train-20.csv
Wrote 10426 to stackoverflow-train-30.csv
Wrote 10260 to stackoverflow-train-40.csv
Wrote 10298 to stackoverflow-train-50.csv
Wrote 10401 to stackoverflow-train-60.csv
Wrote 10276 to stackoverflow-train-70.csv
Wrote 10249 to stackoverflow-train-80.csv
Wrote 10291 to stackoverflow-train-90.csv
Wrote 10332 to stackoverflow-train-100.csv
Wrote 3500 to stackoverflow-valid-10.csv
Wrote 3453 to stackoverflow-valid-20.csv
Wrote 3367 to stackoverflow-valid-30.csv
Wrote 3573 to stackoverflow-valid-40.csv
Wrote 3482 to stackoverflow-valid-50.csv
Wrote 3476 to stackoverflow-valid-60.csv
Wrote 3431 to stackoverflow-valid-70.csv
Wrote 3496 to stackoverflow-valid-80.csv
Wrote 3469 to stackoverflow-valid-90.csv
Wrote 3354 to stackoverflow-valid-100.csv
Wrote 3352 to stackoverflow-test-10.csv
Wrote 3476 to stackoverflow-test-20.csv
Wrote 3439 to stackoverflow-test-30.csv
Wrote 3408 to stackoverflow-test-40.csv
Wrote 35

##### Refactor by not using sampling and creating large shreaded datasets (check size)

In [7]:
!ls -l *.csv

-rw-r--r-- 1 root root  420995 May 16 08:59 stackoverflow-test-100.csv
-rw-r--r-- 1 root root  414392 May 16 08:59 stackoverflow-test-10.csv
-rw-r--r-- 1 root root  428369 May 16 08:59 stackoverflow-test-20.csv
-rw-r--r-- 1 root root  427660 May 16 08:59 stackoverflow-test-30.csv
-rw-r--r-- 1 root root  418436 May 16 08:59 stackoverflow-test-40.csv
-rw-r--r-- 1 root root  440083 May 16 08:59 stackoverflow-test-50.csv
-rw-r--r-- 1 root root  424308 May 16 08:59 stackoverflow-test-60.csv
-rw-r--r-- 1 root root  429799 May 16 08:59 stackoverflow-test-70.csv
-rw-r--r-- 1 root root  444289 May 16 08:59 stackoverflow-test-80.csv
-rw-r--r-- 1 root root  427058 May 16 08:59 stackoverflow-test-90.csv
-rw-r--r-- 1 root root 1289503 May 16 08:58 stackoverflow-train-100.csv
-rw-r--r-- 1 root root 1272389 May 16 08:57 stackoverflow-train-10.csv
-rw-r--r-- 1 root root 1276705 May 16 08:57 stackoverflow-train-20.csv
-rw-r--r-- 1 root root 1304776 May 16 08:57 stackoverflow-train-30.csv


In [8]:
%bash
head stackoverflow-test-100.csv

1,-0.3683210043696039,0.0008645057730129897,-0.16657359952844625,-0.08359529668588077,-0.16237617798053855,-0.25354894024689006
1,-0.3683210043696039,0.0008645057730129897,-0.16657359952844625,-0.08359529668588077,-0.16115509227933522,-0.035211876510711476
0,-0.3683210043696039,-0.735046033504261,-0.16657359952844625,-0.08359529668588077,-0.1956272809209988,-0.36481686695859644
0,-0.3683210043696039,0.7367750450502869,-0.16657359952844625,-0.08359529668588077,-0.12997044206398833,-0.2955368371192321
0,-0.3683210043696039,0.0008645057730129897,-0.16657359952844625,0.36269191751202456,-0.005043981863954096,2.1502581556946914
1,-0.3683210043696039,-0.367090763865624,-0.16657359952844625,-0.08359529668588077,-0.19506370290505878,-0.36901565664583064
1,-0.3683210043696039,-0.735046033504261,-0.16657359952844625,-0.17285273952546185,-0.18942792274565876,-0.35851868242774515
1,0.3044971393420383,2.944506662882109,0.14370162278794293,3.8437321882556863,1.7170625755100517,-0.36901565664583064
0

# tf.estimator modeling

In [9]:
# Ensure that we have TensorFlow 1.13.1 installed.
!pip3 freeze | grep tensorflow==1.13.1 || pip3 install tensorflow==1.13.1

tensorflow==1.13.1


In [10]:
import tensorflow as tf
import pandas as pd
import shutil

print(tf.__version__)

  from ._conv import register_converters as _register_converters


1.13.1


In [3]:
#tf.enable_eager_execution()

In [11]:
!ls -l *.csv

-rw-r--r-- 1 root root  420995 May 16 08:59 stackoverflow-test-100.csv
-rw-r--r-- 1 root root  414392 May 16 08:59 stackoverflow-test-10.csv
-rw-r--r-- 1 root root  428369 May 16 08:59 stackoverflow-test-20.csv
-rw-r--r-- 1 root root  427660 May 16 08:59 stackoverflow-test-30.csv
-rw-r--r-- 1 root root  418436 May 16 08:59 stackoverflow-test-40.csv
-rw-r--r-- 1 root root  440083 May 16 08:59 stackoverflow-test-50.csv
-rw-r--r-- 1 root root  424308 May 16 08:59 stackoverflow-test-60.csv
-rw-r--r-- 1 root root  429799 May 16 08:59 stackoverflow-test-70.csv
-rw-r--r-- 1 root root  444289 May 16 08:59 stackoverflow-test-80.csv
-rw-r--r-- 1 root root  427058 May 16 08:59 stackoverflow-test-90.csv
-rw-r--r-- 1 root root 1289503 May 16 08:58 stackoverflow-train-100.csv
-rw-r--r-- 1 root root 1272389 May 16 08:57 stackoverflow-train-10.csv
-rw-r--r-- 1 root root 1276705 May 16 08:57 stackoverflow-train-20.csv
-rw-r--r-- 1 root root 1304776 May 16 08:57 stackoverflow-train-30.csv


In [None]:
'''
# Before reading csv was incorporated using pandas dataframe
# But now reading csv is incorporated using tensorflow and so it's in the graps and also it reads progressively the shreaded files

df_train = pd.read_csv(filepath_or_buffer = "./stackoverflow-train.csv")
df_valid = pd.read_csv(filepath_or_buffer = "./stackoverflow-valid.csv")
df_test = pd.read_csv(filepath_or_buffer = "./stackoverflow-test.csv")

CSV_COLUMNNAMES = list(df_train) # CSV_COLUMNNAMES = df_train.columns.tolist()
print(CSV_COLUMNNAMES)

FEATURE_NAMES = CSV_COLUMNNAMES[1:]
LABEL_NAME = CSV_COLUMNNAMES[0]
'''

In [12]:
# Debugging  issue:  Field 0 in record 0 is not a valid int32: accepted
#	 [[{{node DecodeCSV}}]]
#	 [[node IteratorGetNext (defined at /usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/util.py:110) ]]

# Solution :  Skip the first line (header row) 
# skip(count)  :  Creates a Dataset that skips count elements from this dataset.

df_train = pd.read_csv(filepath_or_buffer = "stackoverflow-train-20.csv")
df_train.info()

df_test = pd.read_csv(filepath_or_buffer = "stackoverflow-test-10.csv")
df_test.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10332 entries, 0 to 10331
Data columns (total 7 columns):
1                       10332 non-null int64
6.453257689081337       10332 non-null float64
-0.35101482510366283    10332 non-null float64
6.353505428401191       10332 non-null float64
16.54424381238194       10332 non-null float64
3.201635366264088       10332 non-null float64
5.228021343993179       10332 non-null float64
dtypes: float64(6), int64(1)
memory usage: 565.1 KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3351 entries, 0 to 3350
Data columns (total 7 columns):
1                       3351 non-null int64
0.12470310007233937     3351 non-null float64
2.2582277091259537      3351 non-null float64
-0.06564592296475838    3351 non-null float64
-0.18404444736230954    3351 non-null float64
-0.14508480486243064    3351 non-null float64
-0.3741223082495156     3351 non-null float64
dtypes: float64(6), int64(1)
memory usage: 183.3 KB


In [None]:
'''featcols = [ tf.feature_column.numeric_column(feat) for feat in  FEATURE_NAMES ]
#print(featcols) '''

In [16]:
CSV_COLUMNS = ['accepted', 'answer_count', 'comment_count', 'favorite_count', 'score', 'view_count', 'days_posted']
DEFAULTS = [[0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]]

#DEFAULTS = [tf.constant([0], dtype=tf.int32),
#            tf.constant([0.0], dtype=tf.float32),
#            tf.constant([0.0], dtype=tf.float32),
#           tf.constant([0.0], dtype=tf.float32),
#            tf.constant([0.0], dtype=tf.float32),
#            tf.constant([0.0], dtype=tf.float32),
#            tf.constant([0.0], dtype=tf.float32) ]

#i=0
def read_dataset(filename, mode, batch_size = 512):
  def decode_line(row):
    #print(row)
    cols = tf.decode_csv(row, record_defaults = DEFAULTS)
    #print(cols)
    features = dict(zip(CSV_COLUMNS,cols))
    #print(i+1)
    label = features.pop('accepted')  # remove label from features and store
    #print("features: {} \n label: {}".format(features, label))
    return features, label
  
  # Create list of file names that match "glob" pattern (i.e. data_file_*.csv)
  filenames_dataset = tf.data.Dataset.list_files(filename, shuffle=False)
  # Read lines from text files
  #textlines_dataset = filenames_dataset.flat_map(tf.data.TextLineDataset).skip(1)
  textlines_dataset = filenames_dataset.flat_map(tf.data.TextLineDataset)
  # Parse text lines as comma-separated values (CSV)
  dataset = textlines_dataset.map(decode_line)
  
  # Note:
  # use tf.data.Dataset.flat_map to apply one to many transformations (here: filename -> text lines)
  # use tf.data.Dataset.map      to apply one to one  transformations (here: text line -> feature list)
  
  if(mode == tf.estimator.ModeKeys.TRAIN):
    num_epochs = 10  # loop indefinitely
    dataset = dataset.shuffle(buffer_size = 10*batch_size, seed=2)
  else:
    num_epochs = 1
  
  dataset = dataset.repeat(num_epochs).batch(batch_size)
  return dataset

  
def get_train_input_fn():
  dataset = read_dataset('./stackoverflow-train-*.csv', tf.estimator.ModeKeys.TRAIN)
  features1, label1 = dataset.make_one_shot_iterator().get_next()
  #print("Training set :  \nfeatures1 : {}\nlabel: {}".format(features1, label1))
  with tf.Session() as sess:
    print(sess.run(tf.shape(label1))) # output: [ 0.42116176  0.40666069]
  return features1, label1 

def get_valid_input_fn():
  dataset = read_dataset('./stackoverflow-valid-*.csv', tf.estimator.ModeKeys.EVAL)
  features1, label1 = dataset.make_one_shot_iterator().get_next()
  return features1, label1 

def get_test_input_fn():
  dataset = read_dataset('./stackoverflow-test-*.csv', tf.estimator.ModeKeys.PREDICT)
  features1, label1 = dataset.make_one_shot_iterator().get_next()
  with tf.Session() as sess:
    print(sess.run(tf.shape(label1))) # output: [ 0.42116176  0.40666069]
  return features1, label1 


#get_train_input_fn()

In [14]:
FEATURE_NAMES = CSV_COLUMNS[1:]
LABEL_NAME = CSV_COLUMNS[0]

featcols = [ tf.feature_column.numeric_column(feat) for feat in  FEATURE_NAMES ]
#print(featcols)

In [15]:
%%time
OUTDIR = "stackoverflow_model"

tf.logging.set_verbosity(tf.logging.INFO)
shutil.rmtree(path = OUTDIR, ignore_errors = True)

model = tf.estimator.DNNClassifier(
    hidden_units = [1024, 512, 128, 32],  # specify neural architecture
    feature_columns = featcols,
    n_classes=2,
    optimizer = tf.train.AdamOptimizer(learning_rate=0.001),
    model_dir = OUTDIR,
    config = tf.estimator.RunConfig(tf_random_seed = 1)  
  )

model.train(
    input_fn = lambda : get_train_input_fn()
    #,steps = 200
  )

INFO:tensorflow:Using config: {'_train_distribute': None, '_save_summary_steps': 100, '_tf_random_seed': 1, '_evaluation_master': '', '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_save_checkpoints_secs': 600, '_save_checkpoints_steps': None, '_experimental_distribute': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fd446cb57f0>, '_task_id': 0, '_master': '', '_model_dir': 'stackoverflow_model', '_protocol': None, '_is_chief': True, '_keep_checkpoint_max': 5, '_num_ps_replicas': 0, '_service': None, '_eval_distribute': None, '_task_type': 'worker', '_global_id_in_cluster': 0, '_num_worker_replicas': 1, '_device_fn': None}
Instructions for updating:
Colocations handled automatically by placer.
Training set :  
features1 : {'answer_count': <tf.Tensor 'IteratorGetNext:0' shape=(?,) dtype=float32>, 'sc

In [17]:
def validate_rmse(model):
  metrices = model.evaluate(input_fn = lambda : get_valid_input_fn() )
  print("RMSE on dataset = {}".format(metrices["average_loss"]**.5))

#validate_rmse(model, df_train)
validate_rmse(model)

INFO:tensorflow:Calling model_fn.
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2019-05-16T09:04:34Z
INFO:tensorflow:Graph was finalized.
Instructions for updating:
Use standard file APIs to check for files with this prefix.
INFO:tensorflow:Restoring parameters from stackoverflow_model/model.ckpt-2013
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Finished evaluation at 2019-05-16-09:04:37
INFO:tensorflow:Saving dict for global step 2013: accuracy = 0.6735644, accuracy_baseline = 0.5301003, auc = 0.732806, auc_precision_recall = 0.7207757, average_loss = 0.55780685, global_step = 2013, label/mean = 0.5301003, loss = 283.83344, precision = 0.6406194, prediction/mean = 0.52562964, recall = 0.8751499
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 2013: stackoverflow_model/model.ckpt-2013
RMSE on dataset = 0.7468


validate_rmse for 10 epochs on 2 validate csvs:

INFO:tensorflow:Saving dict for global step 401: accuracy = 0.67553574, accuracy_baseline = 0.5273982, auc = 0.73186123, auc_precision_recall = 0.7124213, average_loss = 0.5643283, global_step = 401, label/mean = 0.5273982, loss = 280.26962, precision = 0.6314025, prediction/mean = 0.53914005, recall = 0.9244614
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 401: stackoverflow_model/model.ckpt-401
RMSE on dataset = 0.751217886417676



on training set evaluate

INFO:tensorflow:Saving dict for global step 500: accuracy = 0.6811679, accuracy_baseline = 0.5281519, auc = 0.7331483, auc_precision_recall = 0.7117538, average_loss = 0.56418276, global_step = 500, label/mean = 0.5281519, loss = 71.741234, precision = 0.6524123, prediction/mean = 0.48095724, recall = 0.8482496
RMSE on dataset = 0.7511210011251841

In [18]:
raw_predictions = model.predict( input_fn = lambda : get_test_input_fn() )

#print(next(raw_predictions))
#print(next(raw_predictions))
#print(next(raw_predictions))
# class_ids determine the prediction

predictions = [p['class_ids'][0] for p in raw_predictions]
#confusion_matrix = tf.confusion_matrix(df_test['accepted'], predictions)
#print(confusion_matrix)
with tf.Session() as sess:
  print(sess.run(tf.shape(predictions))) # output: [ 0.42116176  0.40666069]

[512]
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from stackoverflow_model/model.ckpt-2013
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
[34581]


# make_one_shot_iterator()  

##### Parse all the *.csv files using make_one_shot_iterator()  

##### How it works:

Suppose we have only 2.csv files having #entries 10185 and 10332

Total number of iterations when make_one_shot_iterator() is used:   401


= ( 10185*10 + 10332*10 ) / 512

= ( 101850 + 103320 ) / 512 =  400.72265625

where 
  - 10 = 10 epochs
  - 512 = batch size
  - 10185*10  = 1st csv total no of entries to be iterated over
  - 10332*10  = 2st csv total no of entries to be iterated over


In [39]:
df_test_predictions = df_test.copy(deep = True)
df_test_predictions['accepted'] = predictions

NameError: name 'df_test' is not defined

In [None]:
#sns.set(style="ticks", color_codes=True)
sns.pairplot(df_test, hue="accepted", palette="husl")

In [None]:
sns.pairplot(df_test_predictions, hue="accepted", palette="husl")