# Getting started using parquet via DuckDB with Wintap

## Workflow:
Parquet files -> DuckDB Tables/Views -> SQL EDA/Extraction -> Pandas -> Resume typical workflow

The motivation for introducing DuckDB to the workflow for initial EDA and extraction of subsets is to allow for working with datasets larger than memory. Once the subset of interest is identified using SQL, the result can be extracted easily into pandas.

### Map parquet into DuckDB
* Initialize an in-memory database with views for all event types at an aggregation level.
    * Note that views are basically pointers to the parquet files and use no memory.
* Present a summary of current dataset
    * Tabular view with row counts and parquet file sizes

In [None]:
# Import packages used in notebooks
from wintap.datautils import stdviewutil as sv
from wintap.datautils import rawutil as ru
from wintap.datautils import stdview_duckdb as svd
#from wintap.notebookutils.datasetchooser import dataset_chooser
from wintap.notebookutils import dataset_chooser
import os
import altair as alt

In [None]:
# Define imports, functions
# This dataset_chooser() uses a .env file in the top level of this project. It needs to define DATAPATH as the top level of where your data sets are.
# You can optionally define a DEFAULT_PATH pointing to a specific dataset. This provides the convenience of not having to select the dataset when restarting the notebook.
# See .env-default for an example.
# If there is no .env or the paths are invalid, dataset_chooser() defaults to users home directory.

# To enable logging output to jupyter, uncomment the following 3 lines:
#import logging
#logger = logging.getLogger()
#logger.setLevel(logging.DEBUG)
#from wintap.notebookutils.datasetchooser import dataset_chooser
#%run notebookutil.py

w_datasets=dataset_chooser()
display(w_datasets)

In [None]:
# Initialize an in-memory db. Save reference in a variable and then set magic-duckdb environment. Result is ability to use the same DB instance from python code and %dql/%%dql magics.
# Also create views for every top-level type found in the current dataset.
con=ru.init_db(w_datasets.selected) # ,agg_level='rolling')
%dql -co con
# Display the list of tables/views
%dql show tables

In [None]:
# Data sets may have annotations in the form of discrete values interesting or sample data within them.
# Load any that exist for the current dataset.
# To Do: move this to notebookutil.py once its stabile.
if os.path.exists(w_datasets.selected+'/annotations.py'):
    %run $w_datasets.selected/annotations.py
    %whos
    display(SIMPLE)
else:
    print('No annotations defined for this dataset.')

### Summarize event data and display in chart to help understand event distribution over time

In [None]:
# Tabular summary
display(svd.table_summary(con,w_datasets.selected))

In [None]:
# Events over time. 
# To do: Dynamically adjust the bucket size based on the dataset duration for the best resolution/performance.
svd.init_db(con,SUMMARY_INTERVAL)
eventdf=svd.fetch_summary_data(con)
svd.display_event_chart(eventdf)

### EDA
* Summarize: display table schema and some statistics about its contents
* Head: list a small set of rows
* Group By: aggregate on 1-N columns
* Time partitions: Filter or Group By Days using DayPK
* Joining tables
    * Within a single day: All systems go...
    * Over multiple days: PROCESS and HOST both need to be deduped
* Specific events: highlight events of interest (puttyx/notepad++/etc)

In [None]:
# Summarize process to get a high level view of the columns and values
# Create a file with sample values per dataset.
%dql -j summarize SELECT * FROM process where daypk BETWEEN {{MIN_DAYPK}} AND {{MAX_DAYPK}}

In [None]:
# Get all columns for the first 10 rows
%dql select * from process limit 10

In [None]:
# Select all executions of a specific process by name
%dql select pid_hash, first(process_name), first(daypk) daypk, count(*) from process where process_name = 'putty.exe' group by pid_hash order by daypk

In [None]:
%%dql -j
-- Use GROUP BY to find the most and least common process_name. Jupyter helps out by displaying the first and last sets of rows.
-- Calculate a counts for some common fields also.
-- To keep it fast for demos, limit to a subset of DayPKs. Try commenting out the WHERE clause to see results over all the data.
-- Note: the cell magic (%%dql) treats the entire cell as SQL, so python (#) comments do not work 
SELECT process_name, count(distinct hostname) num_hostname, count(distinct file_md5) num_file_md5, count(distinct user_name) num_user_name, count(distinct pid_hash), count(*) num_rows
FROM process
WHERE daypk BETWEEN {{MIN_DAYPK}} AND {{MAX_DAYPK}}
GROUP BY ALL
ORDER BY num_rows

In [None]:
# Simple count of processes per day, with result assigned to a panda
# Convert dayPK to a timestamp and altair then displays it nicely.
processes_per_day = %dql select strptime(dayPK,'%Y%m%d') dayPK, count(*) num_rows from process group by all order by daypk
# Chart that using Altair
chart = alt.Chart(processes_per_day).mark_line().encode(
        x='dayPK:T',
        y='num_rows',
        tooltip=['dayPK:T','num_rows']
    ).properties(
        width=1200,
        height=400,
        title='Processes Per Day'
    ).interactive()
display(chart)

In [None]:
# Display a single process and its network connections
# Adding the daypk filter reduces the search space to just the single day rather than ~180 that are in the set.
proc = %dql -j select * from process where pid_hash='{{SIMPLE.PID_HASH}}' and daypk={{SIMPLE.DAYPK}}
net = %dql -j select * from process_net_conn where pid_hash='{{SIMPLE.PID_HASH}}' and daypk={{SIMPLE.DAYPK}} order by first_seen
display(proc)
display(net)

### Extraction

In [None]:
# Assign query result to a panda
# This demonstrates using the single-line magic, so we'll keep the SQL short to be readable. Get all process_names for 1 day that used the network.
%dql -j -o net_sum_df select p.process_name, count(distinct pnc.conn_id) num_conn_ids, count(*) num_rows from process p join process_net_conn pnc on pnc.pid_hash=p.pid_hash where p.dayPK={{SIMPLE.DAYPK}} and pnc.dayPK={{SIMPLE.DAYPK}} group by all order by all
net_sum_df.info()

In [None]:
%%dql -j -o net_sum2_df
-- Assign query result to a panda when using cell magic. 
-- With multiline, SQL can be formatted be more readable. Get all process_names for 1 day that used the network with additional features.
select 
  p.process_name,
  count(distinct p.hostname) num_hosts,
  count(distinct p.user_name) num_users,
  count(distinct pnc.conn_id) num_conn_ids,
  count(distinct pnc.remote_port) num_remote_ports,
  sum(tcp_recv_size) tcp_recv_size,
  sum(tcp_send_size) tcp_send_size,
  sum(udp_recv_size) udp_recv_size,
  sum(udp_send_size) udp_send_size,
  count(*) num_rows 
from process p 
join process_net_conn pnc on pnc.pid_hash=p.pid_hash 
-- Note: filtering both tables by dayPK dramatically increases speed at the cost of reducing the data scope.
where p.dayPK={{SIMPLE.DAYPK}} and pnc.dayPK={{SIMPLE.DAYPK}}
group by all 
order by all

In [None]:
# Create a file-based database with views to the current parquet data. Useful for opening directly as a DuckDB database from other tools.
rollingdb=ru.init_db(w_datasets.selected,database='rolling.db')
rollingdb.close()
# test

In [None]:
# Generate SQL that will map all event types into views. Does not execute the SQL.
# Intended for generating SQL that will be executed in another context, such as the CLI or DBeaver.
globs=ru.get_glob_paths_for_dataset(w_datasets.selected,'rolling')
stmts=ru.generate_view_sql(globs)
for sql in stmts:
    print(sql.strip()+';')