In [None]:
# default_exp analysis

# Analysis

> Analysis examples, using different formats and processors.

# pandas

In-memory processing of data internally stored in NumPy arrays.

[Scaling suggestions](https://pandas.pydata.org/pandas-docs/stable/user_guide/scale.html) in official pandas documentation.

# dask

[Website](https://dask.org/)
[Docs](https://docs.dask.org/en/latest/)
[Tutorial](https://tutorial.dask.org/)
[Examples](https://examples.dask.org/index.html)

## Distributed scheduler

[Docs](https://distributed.dask.org/en/latest/)

Dask default port for dashboard diagnostics is 8787, which is also default port for RStudio server (rserver).
A solution is to start client (or cluster) with parameter `dashboard_address="localhost:8899"`.
Whether setting default port in config file is possible is [unknown](https://stackoverflow.com/questions/60535300/dask-distributed-configuration-file-for-dashboard-address).

In [None]:
# export
from ig_format import storage

# Repeated cross-section analysis

Every year can be processed one at a time.

## Simple aggregation

In [None]:
%%timeit?

In [None]:
%%time
# pandas
df = storage.read_csv(2000, full=True)
res = dict()
for c in ['employees']:
#     res[c] = (df[c].sum(), df[c].mean(), df[c].std())
    res[c] = (df[c].sum())
print(res)

In [None]:
# dask
import dask
import dask.config
import dask.distributed
import dask.dataframe as dd

In [None]:
client = dask.distributed.Client(dashboard_address='localhost:8899')
client

In [None]:
client.close()

In [None]:
%%time
year = 2000
dt = storage.dtypes_from_schema(f'./data/csv/{year}_schema.json', False)
df = dd.read_csv(f'./data/csv/{year}.csv', dtype=dt)
res = dict()
c = 'employees'
res[c] = (df['employees'].sum().compute())
print(res)

In [None]:
fields = 'count count_na count_notna count_unique min max sum mean std p1 p25 p50 p75 p99'.split()
defaults = [pd.np.nan] * len(fields)
Stats = namedtuple('Stats', fields, defaults=defaults)

def comp_stats(s):
    """Return dictionary of stats computed from a Series."""
    count = len(s)
    na = s.isna().sum()
    if pd.api.types.is_numeric_dtype(s):
        q = s.quantile([0, 0.01, 0.25, 0.5, 0.75, 0.99, 1])
        num_stats = dict(min=q[0], max=q[1], sum=s.sum(), mean=s.mean(), std=s.std(),
                         p1=q[0.01], p25=q[0.25], p50=q[0.5], p75=q[0.75], p99=q[0.99])
    else:
        num_stats = dict()
    return Stats(count=count, count_na=na, count_notna=count - na, count_unique=s.nunique(), **num_stats)

In [None]:
stats_by_year = {}
stat_cols = ['sales', 'employees', 'state', 'naics2']
for y in data_years:
    dt = dtypes_from_schema(schema_path)
    df = pd.read_csv(f'{data_dir}/{y}.csv', dtype=dt)
    df['naics2'] = df['naics'].str[:2]
    s = [comp_stats(df[c]) for c in stat_cols]
    s = pd.DataFrame(s, index=stat_cols)
    stats_by_year[y] = s
stats_by_year = pd.concat(stats_by_year).reorder_levels([1, 0]).sort_index()

In [None]:
stats_by_year

In [None]:
dfg = df.groupby('naics2')
dfg['sales'].apply(lambda c: pd.Series(comp_stats(c))).to_frame().unstack()

# Dynamic analysis

Growth rates that take two years simultaneously.

# Full panel analysis

Use all years to run a regression