Skip to content

Commit

Permalink
Merge pull request #4 from ContinuumIO/new_intake
Browse files Browse the repository at this point in the history
updates for intake 0.2
  • Loading branch information
martindurant committed Jul 23, 2018
2 parents 2d82cc3 + 7bd3dfe commit 5b8e5eb
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 37 deletions.
3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@ language: generic

os:
- linux
- osx

git:
depth: false

env:
matrix:
- CONDA_PY=27
- CONDA_PY=35
- CONDA_PY=36
global:
secure: KZbjTZeluoWVb6piOBkXXc7EGWzGgi6P3emP4Vg4pNUNPLveY49pQF7vKIqiW7HQJgDpdbYFELJKTOtQAKWFNgWduvcISRvp0MMq4VPwRBimY0akXMYYqMi0KlnVGJJhgiviIXKYmuO71kMElQ6SKu9zxOg3C1tRBhOlLbDVnCTmmBAPSVHYhhYNFUSN8zgSneXeWuLcQ1XVt8eLj9GbMUH6hh0K7NsbUDKK+jpiBpa/phOkndRUifhRIuN1Fd/sQ4Pi4/N8Y5o7nY58AHMPzE3S2LY8ze0qMPNe527cDj2efOx0QvRIROKGveKu9MV5tyzUsto6/XF34GHV22AwRjcVN0mMoVrg5XsuLiNui3JBQST1SgTAr6z221QIz8IZATnFkMSlax1PD0Lu1XOcE1o4g8whGTqw0v5ltszu/Ym0rFu179+Cs3lC8WLBxE/CbqonZEfrYP75UwIQ54N3kJYBH7EHLxvjWktibbJAp/wWD7vCknaw9x3y8W5e6U9qXOA7a13pPASGNIbnbnkfAQ1mkDY0631jukVLbXnGZcNwmuOwdbD0Bry+uQHHFbwApfHoF/kJ3jkT7F4yTvvwhn4qzx7MEDe+go4kj0b+HBD7xi7yt52eRCY+c71L7dhw6CggQ8anCKof+pBN97sG0EJo5Q9KBYzIss7MsKsSXDg=
Expand Down
2 changes: 1 addition & 1 deletion conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ requirements:
- jinja2
run:
- attrs
- intake
- intake>=0.2
- python

test:
Expand Down
4 changes: 0 additions & 4 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@ API Reference
.. currentmodule:: intake_netflow

.. autosummary::
Plugin
intake_netflow.source.NetflowSource
intake_netflow.v9.PacketStream
intake_netflow.v9.RecordStream

.. autoclass:: Plugin
:members:

.. autoclass:: intake_netflow.source.NetflowSource
:members:

Expand Down
2 changes: 2 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Welcome to intake_netflow's documentation!
==========================================

This package enables Intake to read Netflow v9-format files.

.. toctree::
:maxdepth: 2
:caption: Contents:
Expand Down
19 changes: 1 addition & 18 deletions intake_netflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,6 @@
from intake.source import base

from ._version import get_versions

__version__ = get_versions()['version']
del get_versions


class Plugin(base.Plugin):
"""Cisco Netflow packets to sequence of Python dicts reader"""

def __init__(self):
super(Plugin, self).__init__(name='netflow', version='0.1', container='python', partition_access=False)

def open(self, urlpath, **kwargs):
"""
Parameters:
urlpath : str
Location of the data files; can include protocol and glob characters.
"""
from .source import NetflowSource
base_kwargs, source_kwargs = self.separate_base_kwargs(kwargs)
return NetflowSource(urlpath=urlpath, metadata=base_kwargs['metadata'])
from .source import NetflowSource
39 changes: 28 additions & 11 deletions intake_netflow/source.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,53 @@
from dask.bytes import open_files

from intake.source import base

from .v9 import RecordStream
from . import __version__


class NetflowSource(base.DataSource):
name = 'netflow'
version = __version__
container = 'python'
partition_access = True

def __init__(self, urlpath, metadata=None):
"""Source to load Cisco Netflow packets as sequence of Python dicts.
Parameters:
urlpath : str
Location of the data files; can include protocol and glob characters.
Location of the data files; can include protocol and glob
characters.
"""
self._urlpath = urlpath
self._streams = open_files(urlpath, mode='rb')

super(NetflowSource, self).__init__(container='python', metadata=metadata)
super(NetflowSource, self).__init__(metadata=metadata)

def _get_schema(self):
self._streams = open_files(self._urlpath, mode='rb')
self.npartitions = len(self._streams)
return base.Schema(datashape=None,
dtype=None,
shape=None,
npartitions=len(self._streams),
extra_metadata={})

def _get_partition(self, i):
with self._streams[i] as f:
return list(RecordStream(f))
return read_stream(self._streams[i])

def _close(self):
for stream in self._streams:
stream.close()
def read(self):
return self.to_dask().compute()

def to_dask(self):
import dask.delayed
import dask.bag as db
dpart = dask.delayed(read_stream)
parts = [dpart(stream) for stream in self._streams]
return db.from_delayed(parts)

def _close(self):
self._streams = None


def read_stream(stream):
from .v9 import RecordStream
with stream as f:
return list(RecordStream(f))

0 comments on commit 5b8e5eb

Please sign in to comment.