# Move YTS data to BigQuery

## TODO

- Move in and out
- ExpStart and NewStart

- upload csv -> gcs -> bq using API

## Costs

[BQ Pricing guide](https://cloud.google.com/bigquery/pricing)

Storage: \$0.02 per 1 GB per month.  
Query: \$5 per 1 TB, first TB per month free.

## Links
Resources: [project](https://console.cloud.google.com/home/dashboard?project=info-group-162919) 
[storage](https://console.cloud.google.com/storage/browser?project=info-group-162919)
[bigquery](https://bigquery.cloud.google.com/dataset/info-group-162919:yts?pli=1)

[Variable definitions](https://docs.google.com/document/d/139zMJgQjDEwZLR40CiOMZIecklTKQ9Z4_pxWN7w9JZ0/edit)

Help:
[bq](https://cloud.google.com/bigquery/docs/how-to)
[bq sql](https://cloud.google.com/bigquery/docs/reference/standard-sql/)
[bq python api](https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/usage.html)

speed up queries: [bq partitions](https://cloud.google.com/bigquery/docs/partitioned-tables)

# Explore raw data

Greg's files have `\r` for newlines, `csv` module can recognize it, but `bash` tools usually don't. To view them, need to convert newline characters first:
```bash
tr '\r' '\n' < YTS2_3_2.csv > YTS2_3_2_conv.csv
```

Some cells have vetrical tabs in them (`\v` or `^K`). Extract all rows that have such cells:
```bash
head -n1 YTS2_3_2_conv.csv > YTS2_3_2_vtab.csv
grep -P '\v' YTS2_3_2_conv.csv >> YTS2_3_2_vtab.csv
```

In [1]:
import csv, json
from time import time
import pandas as pd
from google.cloud import storage, bigquery

In [2]:
# BQ parameters
client = bigquery.Client()
wide_table_ref = client.dataset('yts').table('wide')
long_table_ref = client.dataset('yts').table('long')
lag_table_ref = client.dataset('yts').table('lag')
fips_table_ref = client.dataset('yts').table('fips')
state_table_ref = client.dataset('yts').table('state')
wide_table_path = wide_table_ref.dataset_id + '.' + wide_table_ref.table_id
long_table_path = long_table_ref.dataset_id + '.' + long_table_ref.table_id
lag_table_path = lag_table_ref.dataset_id + '.' + lag_table_ref.table_id
fips_table_path = fips_table_ref.dataset_id + '.' + fips_table_ref.table_id
state_table_path = state_table_ref.dataset_id + '.' + state_table_ref.table_id

In [3]:
def query_report(query_job, start_time):
    elapsed_time = time() - start_time
    mb_proc = query_job.total_bytes_processed / (1 << 20)
    mb_bill = query_job.total_bytes_billed / (1 << 20)
    tb_bill = mb_bill / (1 << 20)
    cost = 5 * tb_bill 
    print('Query finished in %d seconds. Processed %d MB, billed %d MB, cost $%.2f.' % (elapsed_time, mb_proc, mb_bill, cost))

In [12]:
raw_filename = 'YTS10_23_17All.csv'
corrected_filename = 'YTS10_23_17All_corrected.csv'
corrections_filename = 'YTS10_23_17All_corrections.csv'
bq_schema_filename = 'bq_schema.json'

# Correct raw data
Keep only last value in fields with vertical tabs.

In [None]:
with open(raw_filename, mode='r', errors='ignore', newline='') as fp_in, \
    open(corrected_filename, mode='w', newline='') as fp_out, \
    open(corrections_filename, mode='w', newline='') as fp_cor:

    reader = csv.reader(fp_in)
    writer_out = csv.writer(fp_out)
    writer_cor = csv.writer(fp_cor)
    
    # header
    field_names_in = next(reader)
    _ = writer_out.writerow(field_names_in)
    field_names_cor = ['line_number', 'column_number', 'abi', 'field_name', 'old_value', 'new_value']
    _ = writer_cor.writerow(field_names_cor)
    
    # data rows
    line_i = 2
    for row_in in reader:
        row_out = []
        for col_i, value in enumerate(row_in):
            value_split = value.split('\v')
            new_value = value_split[-1]
            row_out.append(new_value)
            if len(value_split) > 1:
                abi = row_in[0]
                row_cor = [line_i, col_i, abi, field_names_in[col_i], value, new_value]
                _ = writer_cor.writerow(row_cor)
        _ = writer_out.writerow(row_out)
        
        line_i += 1
        if line_i % 1000000 == 0: print(line_i)

In [19]:
# check what fields where corrected
df = pd.read_csv(corrections_filename, dtype='object', usecols=['field_name'])
df.field_name.value_counts()

Latitude      7732297
Longitude     7732297
HQFIPS2016          1
Name: field_name, dtype: int64

There might be issues with some values, auto-detected schema sets ABI to integer and fails:
```
Errors:
gs://ig-anton/YTS10_23_17All_corrected.csv: CSV table encountered too many errors, giving up. Rows: 60519; errors: 1. (error code: invalid)
gs://ig-anton/YTS10_23_17All_corrected.csv: Could not parse 'IG4775007' as int for field ABI (position 0) starting at location 22211225052 (error code: invalid)
```

# Upload corrected CSV to GCS
```bash
gsutil -m cp YTS10_23_17All_corrections.csv gs://ig-anton
```

# Import CSV from GCS into BQ
Via Web UI, copy-paste JSON schema.

In [13]:
# prepare BQ schema
df = pd.read_csv(corrected_filename, nrows=100, dtype='object')

bq_schema = []
for field_name in df:
    if field_name == 'FirstYear' or field_name == 'LastYear' or field_name[:3] == 'Emp' or field_name[:5] == 'Sales':
        bq_type = 'INTEGER'
    elif field_name == 'Latitude' or field_name == 'Longitude':
        bq_type = 'FLOAT'
    else:
        bq_type = 'STRING'
    bq_field = {'name': field_name, 'type': bq_type}
    bq_schema.append(bq_field)

with open(bq_schema_filename, 'w') as fp:
    json.dump(bq_schema, fp, indent=2)

# Convert BQ table from wide to long format

In [4]:
# construct query: union of tables for every year
query_select_year = '''
SELECT
  {y} AS year,
  ABI AS abi,
  Emp{y} AS emp,
  Sales{y} AS sales,
  FIPS{y} AS fips,
  NAICS{y} AS naics,
  {startup} as startup
FROM
  `{table}`
WHERE
  Emp{y} IS NOT NULL
'''

query_list = []
for y in range(1997, 2017):
    startup = 'Startup%d' % y if y > 1997 else 'null'
    query_list.append(query_select_year.format(y=y, table=wide_table_path, startup=startup))
query = '\nUNION ALL\n'.join(query_list)

# configure job
job_id = 'yts_wide_to_long_%d' % time()
job_config = bigquery.QueryJobConfig()
job_config.destination = long_table_ref

# start job
t = time()
query_job = client.query(query, job_config=job_config, job_id=job_id)
_ = query_job.result()

query_report(query_job, t)

Query finished in 59 seconds. Processed 10517 MB, billed 10518 MB, cost $0.05.


# Prepare lagged values

In [11]:
query = '''
SELECT
  * 
  EXCEPT (year_preceding)
  REPLACE (
    IF(year_preceding = year - 1, emp_lag, null) AS emp_lag
  ),
  IF(year_preceding = year - 1, emp - emp_lag, null) AS emp_change
FROM (
  SELECT
    abi,
    year,
    fips,
    emp,
    sales,
    naics,
    startup,
    LAG(emp) OVER (PARTITION BY abi ORDER BY year) AS emp_lag,
    LAG(FALSE, 1, TRUE) OVER (PARTITION BY abi ORDER BY year) AS birth,
    LEAD(FALSE, 1, TRUE) OVER (PARTITION BY abi ORDER BY year) AS death,
    LAG(year) OVER (PARTITION BY abi ORDER BY year) AS year_preceding,
    CASE
      WHEN emp = 1 THEN 1
      WHEN emp BETWEEN 2 AND 9 THEN 2
      WHEN emp BETWEEN 10 AND 99 THEN 10
      WHEN emp BETWEEN 100 AND 499 THEN 100
      WHEN emp >= 500 THEN 500
      ELSE -1
    END AS size
  FROM
    `{table}`
);
'''
query = query.format(table=long_table_path)

# prepare job
job_id = 'yts_long_to_lag_%d' % time()
job_config = bigquery.QueryJobConfig()
job_config.destination = lag_table_ref

# start job
t = time()
query_job = client.query(query, job_config=job_config, job_id=job_id)
_ = query_job.result()

query_report(query_job, t)

Query finished in 96 seconds. Processed 15136 MB, billed 15137 MB, cost $0.07.


# Compute aggregate stats

## Aggregate on FIPS level

In [14]:
# continuation and birth
query_cont_bir = '''
SELECT
  fips,
  year,
  size,
  COUNT(*) AS est,
  SUM(emp) AS emp,
  SUM(sales) AS sales,
  COUNTIF(birth) AS est_birth,
  COUNTIF(NOT birth AND emp_change > 0) AS est_expand,
  COUNTIF(NOT birth AND emp_change < 0) AS est_contract,
  SUM(IF(birth, emp, 0)) AS emp_birth,
  SUM(IF(emp_change > 0, emp_change, 0)) AS emp_expand,
  SUM(IF(emp_change < 0, -emp_change, 0)) AS emp_contract
FROM
  {table}
GROUP BY
  fips, year, size
'''.format(table=lag_table_path)

# death
query_dea = '''
SELECT
  fips,
  year + 1 AS year,
  size,
  COUNT(*) AS est_death,
  SUM(emp) AS emp_death
FROM
  {table}
WHERE
  death
GROUP BY
  fips, year, size
'''.format(table=lag_table_path)

# join continuation, birth and death into one table
query_join = '''
SELECT
  *
FROM
  ({q_cont_bir}) FULL OUTER JOIN ({q_dea}) USING(fips, year, size)
'''.format(q_cont_bir=query_cont_bir, q_dea=query_dea)

# overwrite with zero null years: typically high size deaths
# overwrite with null undefined years: backward-looking variables in 1997 and forward-looking variables in 2016
query = '''
SELECT
  *
  REPLACE (
    IF(year = 1997, NULL, IF(est_birth is NULL, 0, est_birth)) AS est_birth,
    IF(year = 1997, NULL, IF(est_expand is NULL, 0, est_expand)) AS est_expand,
    IF(year = 1997, NULL, IF(est_contract is NULL, 0, est_contract)) AS est_contract,
    IF(year = 1997, NULL, IF(emp_birth is NULL, 0, emp_birth)) AS emp_birth,
    IF(year = 1997, NULL, IF(emp_expand is NULL, 0, emp_expand)) AS emp_expand,
    IF(year = 1997, NULL, IF(emp_contract is NULL, 0, emp_contract)) AS emp_contract,
    IF(year = 1997, NULL, IF(est_death is NULL, 0, est_death)) AS est_death,
    IF(year = 1997, NULL, IF(emp_death is NULL, 0, emp_death)) AS emp_death
  )
FROM
  ({q_join})
WHERE
  year != 2017 AND fips is not NULL
ORDER BY
  fips, year, size
'''.format(q_join=query_join)

# prepare job
job_id = 'yts_agg_fips_%d' % time()
job_config = bigquery.QueryJobConfig()
job_config.destination = fips_table_ref

# start job
t = time()
query_job = client.query(query, job_config=job_config, job_id=job_id)
_ = query_job.result()

query_report(query_job, t)

Query finished in 20 seconds. Processed 13632 MB, billed 13633 MB, cost $0.07.


## Aggregate on state level

In [15]:
# get state codes
df = pd.read_csv('https://www2.census.gov/geo/docs/reference/state.txt', delimiter='|', dtype='object')
state_codes = {
    'numcode_to_strcode': dict(zip(df.STATE, df.STUSAB)),
    'numcode_to_name': dict(zip(df.STATE, df.STATE_NAME)),
    'strcode_to_name': dict(zip(df.STUSAB, df.STATE_NAME))
}

In [18]:
# aggregate from FIPS by 2-digit state code
query_state_code = '''
SELECT
  SUBSTR(fips, 1, 2) as state_code,
  year,
  size,
  SUM(est) AS est,
  SUM(emp) AS emp,
  SUM(sales) AS sales,
  SUM(est_birth) AS est_birth,
  SUM(est_expand) AS est_expand,
  SUM(est_contract) AS est_contract,
  SUM(est_death) AS est_death,
  SUM(emp_birth) AS emp_birth,
  SUM(emp_contract) AS emp_contract,
  SUM(emp_expand) AS emp_expand,
  SUM(emp_death) AS emp_death
FROM
  `{table}`
GROUP BY
  state_code, year, size
'''.format(table=fips_table_path)

# add 2-letter state code
query_code_expr = 'CASE state_code\n'
for code_pair in state_codes['numcode_to_strcode'].items():
    query_code_expr += '  WHEN "%s" THEN "%s"\n' % code_pair
query_code_expr += '  ELSE "N/A"\nEND'

query = '''
SELECT
  {code_expr} AS state,
  *
FROM
  ({q_state_code})
ORDER BY
  state, year, size
'''.format(code_expr=query_code_expr, q_state_code=query_state_code)

# prepare job
job_id = 'yts_agg_state_%d' % time()
job_config = bigquery.QueryJobConfig()
job_config.destination = state_table_ref

# start job
t = time()
query_job = client.query(query, job_config=job_config, job_id=job_id)
_ = query_job.result()

query_report(query_job, t)

Query finished in 1 seconds. Processed 31 MB, billed 32 MB, cost $0.00.


# Query results
![Results layout](results_layout.png)

In [4]:
df

Unnamed: 0,state,state_code,year,size,est,emp,sales,est_birth,est_expand,est_contract,est_death,emp_birth,emp_contract,emp_expand,emp_death
0,WI,55,1998,9999,218651,3002702,396076775,13780,19532,16581,26950,91969,106740,143443,192636
1,WI,55,1998,1,38378,38378,4812488,3796,0,3713,5389,3796,8680,0,5389
2,WI,55,1998,2,128572,495115,70877932,8368,10465,9343,18369,28409,29923,17843,65527
3,WI,55,1998,10,47010,1188288,153653201,1505,8167,3228,3005,34370,34127,64657,69267
4,WI,55,1998,100,4185,740391,103059933,102,808,265,164,18774,17667,37614,30896
5,WI,55,1998,500,506,540530,63673221,9,92,32,23,6620,16343,23329,21557


In [3]:
query_state_year = '''
SELECT *
FROM `{table}`
WHERE state = '{st}' and year = {y}
'''.format(table=state_table_path, st='WI', y=1998)

vars_total = ['est', 'emp', 'sales', 'est_birth', 'est_expand', 'est_contract', 'est_death', 'emp_birth', 'emp_contract', 'emp_expand', 'emp_death']
query_agg = []
for v in vars_total:
    query_agg.append('  SUM(%s) as %s' % (v, v))
query_agg = ',\n'.join(query_agg)


query_total = '''
SELECT
  state, state_code, year,
  9999 as size,
{q_agg}
FROM state_year
GROUP BY state, state_code, year
'''.format(q_agg=query_agg)

query = '''
WITH state_year AS ({q_state_year})
SELECT * FROM state_year
UNION ALL
({q_total})
'''.format(q_state_year=query_state_year, q_total=query_total)

df = pd.read_gbq(query, verbose=False, dialect='standard', project_id=client.project)

# Appendix

## questions

emp == 0: what is this?

Null employment: is employment_category also null?

### "Startup" variable

When firm with same ABI appears after a break, is it a startup?

```
Row	abi	year	fips	emp	sales	naics	startup	emp_lag	birth	death	year_preceding	size	emp_change	emp_lag0	birth0	death0	 
11	612580	2007	9003	50	6950	33351711	null	50	false	false	2006	10	0	50	false	false	 
12	612580	2012	9003	45	6950	33351711	New	50	false	false	2007	10	null	null	false	false	 
13	612580	2013	9003	45	10615	33351711	null	45	false	false	2012	10	0	45	false	false	 

20	632026	2000	25017	297	null	33911203	null	297	false	false	1999	100	0	297	false	false	 
21	632026	2001	25017	275	null	33911203	null	297	false	false	2000	100	-22	297	false	false	 
22	632026	2003	25017	170	null	33451315	ExpStart	275	false	false	2001	100	null	null	false	false	 
23	632026	2004	25017	170	null	33451315	null	170	false	false	2003	100	0	170	false	false	 

46	929265	2007	9001	60	6600	33399920	null	60	false	false	2006	10	0	60	false	false	 
47	929265	2012	9001	60	12720	33211912	New	60	false	false	2007	10	null	null	false	false	 
48	929265	2013	9001	60	11054	33211912	null	60	false	false	2012	10	0	60	false	false	 
```