Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hard to read the pipeline #120

Open
2 of 4 tasks
fortuna opened this issue Feb 4, 2022 · 2 comments
Open
2 of 4 tasks

Hard to read the pipeline #120

fortuna opened this issue Feb 4, 2022 · 2 comments

Comments

@fortuna
Copy link
Collaborator

fortuna commented Feb 4, 2022

I noticed that we are ingesting files with different types with the same functions in the pipeline. Furthermore, the types are not only different, but sometimes only slightly different, causing even more confusion. Add to that the fact that all the different data types are simply called "row" and it becomes a huge effort to make sense of how data is flowing.

We have to clean that up so we can make sense of what's going on. Especially with Satellite, which has many types.

Split flows

First and foremost, this call has to change:

lines = _read_scan_text(p, new_filenames)

We should not create a single PCollection with different types. Instead, each file should be its own PCollection.

Then process_satellite_lines should be removed in favor of different flows that process and join the different datasets.
The partition logic can be gone.

As a rule of thumb, consider all logic selection based on filenames like this harmful:

if filename == SATELLITE_BLOCKPAGES_FILE:

Another similar practice that is also harmful is to detect the source file based on the presence of fields:

Those practices can be replaced by creating separate PCollections for each input types.

This cleanup can be incremental. For instance, it seems that extracting the blockpage logic is quite easy. Just call _process_satellite_blockpages on a PCollection with the blockpage files only.

Then you can extract each tag input file to their own flows.

I have the impression that this cleanup will speed up the pipeline, as you can have more lightweight workers for many parts of the flow, instead of having to load all the data for all the flows. It will also shuffle less data for joins (each flow can be sorted separately).

Define and clarify row types

Another significant improvement is to name each data type and create type aliases for the existing Row (e.g. SatelliteScan or ResolverTags). We should not see the Row type anywhere. Note that they can all be a generic dictionary type, but the type annotations will help understand what goes where. We should also rename the variables to reflect their type.

/cc @ohnorobo @avirkud

TODOs

  • triple cogroup
  • add strict types
  • remove all filename selection in logic
  • switch to 'big cogroup' instead of doing individual joins
@fortuna
Copy link
Collaborator Author

fortuna commented Feb 5, 2022

Reference for types: https://docs.censoredplanet.org/dns.html

For inputs, we can define some types like BlockpagesEntry or ResultsEntry for blockpages.json or results.json, for example.
Then we can think of names for the intermediate data.

@fortuna
Copy link
Collaborator Author

fortuna commented Feb 8, 2022

I talked to @ohnorobo. We'll update the pipeline to remove the redundant duplication of the observations, which should significantly improve the pipeline performance and address some of the modeling concerns. She described the change in a TODO here:

# TODO we want to change the structure here
# Currently we're CoGrouping over two elements using a (date, received_ip) key
# A row with flattened received ips like
# {"ip": "1.2.3.4",
# "date": "2021-01-01"
# "domain": "example.com"
# "roundtrip_id": "abc"
# "received" [{"ip": "5.6.7.8"}]
# }
# and metadata like
# {"ip": "5.6.7.8", "date":"2021-01-01", "asn": 123}
#
# But in the future we want to have three pieces
# 1. The inital row (without flattening the received ips) with a roundtrip_id
# 2. Information on a single received ip like:
# {"received_ip": "5.6.7.8", "date": "2021-01-01", "roundtrip_id": "abc"}
# 3. Metadata as above.
#
# Then we can CoGroup 2 and 3 together by (date, received_ip)
# and then Group them with 1 by roundtrip_id
# and add metadata to the appropriate answers
# This way we avoid having to multiply rows by their number of received ips

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant