In [None]:
from elasticsearch import Elasticsearch, helpers
import pandas as pd
import json
from tqdm import tqdm
import re

In [None]:
'''
Ingest E5 data into an elastic search cluster and enter your credentials below
'''
username = '*'
password = '*'
host = '*'
index_name = "*"

# Initialize Elasticsearch client
es = Elasticsearch([host], http_auth=(username, password))

# Test connection and index existence
if not es.ping():
    print("Elasticsearch cluster is not accessible!")
else:
    print("Connected to Elasticsearch.")
if not es.indices.exists(index=index_name):
    print(f"Index {index_name} does not exist.")
else:
    print(f"Index {index_name} exists.")

In [None]:
username = 'elastic'
password = 'stimulus5affect-roof'
host = 'http://128.143.69.88:9200'
index_name = "theia*"

# Initialize Elasticsearch client
es = Elasticsearch([host], http_auth=(username, password))

# Test connection and index existence
if not es.ping():
    print("Elasticsearch cluster is not accessible!")
else:
    print("Connected to Elasticsearch.")
if not es.indices.exists(index=index_name):
    print(f"Index {index_name} does not exist.")
else:
    print(f"Index {index_name} exists.")

# Define the query
query = {"query": {"match_all": {}}}

# Get total count of documents to process
total_docs = es.count(index=index_name, body=query)['count']
print(f"Total documents to process: {total_docs}")

edge_types = set([
    'EVENT_CLOSE',
    'EVENT_OPEN',
    'EVENT_READ',
    'EVENT_WRITE',
    'EVENT_EXECUTE',
    'EVENT_RECVFROM',
    'EVENT_RECVMSG',
    'EVENT_SENDMSG',
    'EVENT_SENDTO',
])

# Define file paths
id_to_type_file = 'e5_data/theia_id_to_type.json'
net2prop_file = 'e5_data/theia_net2prop.json'
info_file = 'e5_data/theia_info.json'

# Initialize buffers
net2prop_buffer = []
id_to_type_buffer = []
info_buffer = []
buffer_size = 100000  # Change this number to adjust the buffer size

# Function to append data to a file
def append_to_file(file_path, data):
    with open(file_path, 'a') as file:
        for item in data:
            file.write(json.dumps(item) + '\n')

# Function to check buffer and flush if necessary
def check_and_flush_buffer():
    global net2prop_buffer, id_to_type_buffer, info_buffer
    if len(net2prop_buffer) >= buffer_size:
        append_to_file(net2prop_file, net2prop_buffer)
        net2prop_buffer = []
    if len(id_to_type_buffer) >= buffer_size:
        append_to_file(id_to_type_file, id_to_type_buffer)
        id_to_type_buffer = []
    if len(info_buffer) >= buffer_size:
        append_to_file(info_file, info_buffer)
        info_buffer = []
        
# Start processing documents
with tqdm(total=total_docs, desc="Processing Documents") as pbar:
    for doc in helpers.scan(es, query=query, index=index_name, size=1000):
        pbar.update(1)
        
        line = doc['_source']
        str_line = json.dumps(line)
        
        if "avro.cdm20.NetFlowObject" in str_line:
            net_flow_object = line['datum']['com.bbn.tc.schema.avro.cdm20.NetFlowObject']
            try:
                nodeid = net_flow_object['uuid']
                srcaddr = net_flow_object['localAddress'].get('string')
                srcport = net_flow_object['localPort'].get('int')
                dstaddr = net_flow_object['remoteAddress'].get('string')
                dstport = net_flow_object['remotePort'].get('int')

                net2prop_data = {nodeid: [srcaddr, srcport, dstaddr, dstport]}
                id_to_type_data = {nodeid: 'NETFLOW'}
                net2prop_buffer.append(net2prop_data)
                id_to_type_buffer.append(id_to_type_data)
            except: 
                pass

        if "schema.avro.cdm20.Subject" in str_line:
            subject = line['datum']['com.bbn.tc.schema.avro.cdm20.Subject']
            uuid = subject['uuid']
            record_type = subject['type'] 
            id_to_type_data = {uuid: record_type}
            id_to_type_buffer.append(id_to_type_data)

        if "schema.avro.cdm20.FileObject" in str_line:
            file_object = line['datum']['com.bbn.tc.schema.avro.cdm20.FileObject']
            uuid = file_object['uuid']
            file_type = file_object['type']
            id_to_type_data = {uuid: file_type}
            id_to_type_buffer.append(id_to_type_data)
            
        x = line            

        try:
            action = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['type']
        except:
            action = ''

        try:
            actor = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['subject']['com.bbn.tc.schema.avro.cdm20.UUID']
        except:
            actor = ''

        try:
            obj = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['predicateObject']['com.bbn.tc.schema.avro.cdm20.UUID']
        except:
            obj = ''

        try:
            cmd = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['properties']['map']['exec']
        except:
            cmd = ''

        try:
            path = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['predicateObjectPath']['string']
        except:
            path = ''

        try:
            timestampnano = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['timestampNanos']
            timestamp = x['@timestamp']
        except:
            timestamp = ''
            timestampnano = ''

        if action in edge_types:
            info_data = {'actorID': actor, 'objectID': obj, 'action': action, 'timestampNanos': timestampnano, 'timestamp': timestamp, 'exec': cmd, 'path': path, 'hostid': x['hostId']}
            info_buffer.append(info_data)
        
        check_and_flush_buffer()
        
append_to_file(net2prop_file, net2prop_buffer)
append_to_file(id_to_type_file, id_to_type_buffer)
append_to_file(info_file, info_buffer)

In [None]:
def load_dict_from_jsonl(file_path):
    result = {}
    with open(file_path, 'r') as file:
        for line in file:
            data = json.loads(line)
            result.update(data)
    return result

def load_list_from_jsonl(file_path):
    result = []
    with open(file_path, 'r') as file:
        for line in file:
            data = json.loads(line)
            result.append(data)
    return result

In [None]:
id_to_type_file = 'e5_data/clearscope_id_to_type.json'
net2prop_file = 'e5_data/clearscope_net2prop.json'
info_file = 'e5_data/clearscope_info.json'

In [None]:
def stitch():
    
    global id_to_type_file, net2prop_file, info_file
    
    id_to_type = load_dict_from_jsonl(id_to_type_file)
    net2prop = load_dict_from_jsonl(net2prop_file)
    info = load_list_from_jsonl(info_file)
    
    for i in range(len(info)):
        try:
            typ = id_to_type[info[i]['objectID']]
            info[i]['object'] = typ
            info[i]['actor_type'] = id_to_type[info[i]['actorID']]
            if typ == 'NETFLOW':
                attr = net2prop[info[i]['objectID']]
                info[i]['path'] = attr[0]+' '+attr[1]+' '+attr[2]+' '+attr[3]
        except:
            info[i]['object'] = None
            info[i]['actor_type'] = None
            
    df = pd.DataFrame.from_records(info)
    df = df.dropna()
    return df

In [None]:
df = stitch()
df.to_parquet("e5_data/clearscope_df.parquet", index=False)

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq

def load_dict_from_jsonl(file_path):
    result = {}
    with open(file_path, 'r') as file:
        for line in file:
            data = json.loads(line)
            result.update(data)
    return result

def process_info_file(info_file_path, id_to_type, net2prop, parquet_file_path):
    # Define the schema with all fields as string types, except timestampNanos as int64
    schema = pa.schema([
        ('actorID', pa.string()),
        ('objectID', pa.string()),
        ('actor_type', pa.string()),
        ('object', pa.string()),
        ('action', pa.string()),
        ('timestampNanos', pa.int64()),
        ('timestamp', pa.string()),
        ('exec', pa.string()),
        ('path', pa.string()),
        ('hostid', pa.string()),
    ])
    
    writer = None
    
    with open(info_file_path, 'r') as file:
        for line in file:
            data = json.loads(line)
            try:
                typ = id_to_type[data['objectID']]
                data['object'] = typ
                data['actor_type'] = id_to_type[data['actorID']]
                if typ == 'NETFLOW':
                    attr = net2prop[data['objectID']]
                    data['path'] = ' '.join(str(item) for item in attr)
            except KeyError:
                data['object'] = None
                data['actor_type'] = None
            
            # Convert fields to their respective types
            record = {key: data.get(key, '') for key in schema.names}
            record['timestampNanos'] = int(data.get('timestampNanos', 0)) if data.get('timestampNanos') is not None else 0
            
            # Convert this record into a PyArrow RecordBatch
            batch = pa.RecordBatch.from_pandas(pd.DataFrame([record]), schema=schema, preserve_index=False)
            if writer is None:
                writer = pq.ParquetWriter(parquet_file_path, schema, compression='snappy')
            writer.write_batch(batch)
    
    if writer:
        writer.close()

def stitch():
    global id_to_type_file, net2prop_file, info_file
    
    id_to_type = load_dict_from_jsonl(id_to_type_file)
    net2prop = load_dict_from_jsonl(net2prop_file)
    
    process_info_file(info_file, id_to_type, net2prop, "e5_data/clearscope_df.parquet")
    
stitch()

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime

def load_dict_from_jsonl(file_path):
    result = {}
    with open(file_path, 'r') as file:
        for line in file:
            data = json.loads(line)
            result.update(data)
    return result

def is_timestamp_in_range(timestamp_str):
    # Define your time ranges
    range1_start = datetime.strptime("2019-05-08 00:00:00", "%Y-%m-%d %H:%M:%S")
    range1_end = datetime.strptime("2019-05-09 00:00:00", "%Y-%m-%d %H:%M:%S")
    
    range2_start = datetime.strptime("2019-05-15 00:00:00", "%Y-%m-%d %H:%M:%S")
    range2_end = datetime.strptime("2019-05-16 00:00:00", "%Y-%m-%d %H:%M:%S")
    
    range3_start = datetime.strptime("2019-05-17 00:00:00", "%Y-%m-%d %H:%M:%S")
    range3_end = datetime.strptime("2019-05-18 00:00:00", "%Y-%m-%d %H:%M:%S")
    
    # Adjust the parsing format to match the timestamp in your data, ignoring milliseconds and timezone
    # First, remove the timezone information if it exists
    if '+' in timestamp_str:
        timestamp_str = timestamp_str.split('+')[0]
    elif '-' in timestamp_str and timestamp_str.rfind('-') > timestamp_str.find('T'):
        timestamp_str = timestamp_str.rsplit('-', 1)[0]
    
    # Next, remove the milliseconds
    timestamp_str = timestamp_str.split('.')[0]
    
    # Finally, parse the timestamp
    timestamp = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S")
    
    # Check if the timestamp falls within any of the defined ranges
    if timestamp < range1_start or (range2_start <= timestamp < range2_end) or (range3_start <= timestamp < range3_end):
        return True
    else:
        return False

def process_info_file(info_file_path, id_to_type, net2prop, parquet_file_path):
    schema = pa.schema([
        ('actorID', pa.string()),
        ('objectID', pa.string()),
        ('actor_type', pa.string()),
        ('object', pa.string()),
        ('action', pa.string()),
        ('timestampNanos', pa.int64()),
        ('timestamp', pa.string()),
        ('exec', pa.string()),
        ('path', pa.string()),
        ('hostid', pa.string()),
    ])
    
    writer = None
    
    with open(info_file_path, 'r') as file:
        for line in file:
            data = json.loads(line)
            
            if not is_timestamp_in_range(data.get('timestamp', '')):
                continue  # Skip records not within the specified time ranges
            
            try:
                typ = id_to_type[data['objectID']]
                data['object'] = typ
                data['actor_type'] = id_to_type[data['actorID']]
                if typ == 'NETFLOW':
                    attr = net2prop[data['objectID']]
                    data['path'] = ' '.join(str(item) for item in attr)
            except KeyError:
                data['object'] = None
                data['actor_type'] = None
            
            record = {key: data.get(key, '') for key in schema.names}
            record['timestampNanos'] = int(data.get('timestampNanos', 0)) if data.get('timestampNanos') is not None else 0
            
            batch = pa.RecordBatch.from_pandas(pd.DataFrame([record]), schema=schema, preserve_index=False)
            if writer is None:
                writer = pq.ParquetWriter(parquet_file_path, schema, compression='snappy')
            writer.write_batch(batch)
    
    if writer:
        writer.close()

def stitch():
    global id_to_type_file, net2prop_file, info_file
    
    id_to_type = load_dict_from_jsonl(id_to_type_file)
    net2prop = load_dict_from_jsonl(net2prop_file)
    
    process_info_file(info_file, id_to_type, net2prop, "e5_data/clearscope_df.parquet")
    
stitch()

In [None]:
def load_dict_from_jsonl(file_path):
    result = {}
    with open(file_path, 'r') as file:
        for line in file:
            data = json.loads(line)
            result.update(data)
    return result

def stitch(data_buffer,file_path):
    
    id_to_type_file = 'e5_data/theia_id_to_type.json'
    net2prop_file = 'e5_data/theia_net2prop.json' 
    
    id_to_type = load_dict_from_jsonl(id_to_type_file)
    net2prop = load_dict_from_jsonl(net2prop_file)
    info = data_buffer
    
    for i in range(len(info)):
        try:
            typ = id_to_type[info[i]['objectID']]
            info[i]['object'] = typ
            info[i]['actor_type'] = id_to_type[info[i]['actorID']]
            if typ == 'NETFLOW':
                attr = net2prop[info[i]['objectID']]
                info[i]['path'] = attr[0]+' '+attr[1]+' '+attr[2]+' '+attr[3]
        except:
            info[i]['object'] = None
            info[i]['actor_type'] = None
            
    df = pd.DataFrame.from_records(info)
    df = df.dropna()
    df.to_parquet(f"e5_data/{file_path}.parquet", index=False)

In [None]:
def query_elastic(query,file_path):
    
    username = 'elastic'
    password = 'stimulus5affect-roof'
    host = 'http://128.143.69.88:9200'
    index_name = "clearscope*"

    # Initialize Elasticsearch client
    es = Elasticsearch([host], http_auth=(username, password))

    # Test connection and index existence
    if not es.ping():
        print("Elasticsearch cluster is not accessible!")
    else:
        print("Connected to Elasticsearch.")
    if not es.indices.exists(index=index_name):
        print(f"Index {index_name} does not exist.")
    else:
        print(f"Index {index_name} exists.")

    total_docs = es.count(index=index_name, body=query)['count']

    edge_types = set([
        'EVENT_CLOSE',
        'EVENT_OPEN',
        'EVENT_READ',
        'EVENT_WRITE',
        'EVENT_EXECUTE',
        'EVENT_RECVFROM',
        'EVENT_RECVMSG',
        'EVENT_SENDMSG',
        'EVENT_SENDTO',
    ])

    info_buffer = []

    # Start processing documents
    with tqdm(total=total_docs, desc="Processing Documents") as pbar:
        for doc in helpers.scan(es, query=query, index=index_name, size=5000):
            pbar.update(1)

            line = doc['_source']
            str_line = json.dumps(line)

            x = line            

            try:
                action = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['type']
            except:
                action = ''

            try:
                actor = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['subject']['com.bbn.tc.schema.avro.cdm20.UUID']
            except:
                actor = ''

            try:
                obj = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['predicateObject']['com.bbn.tc.schema.avro.cdm20.UUID']
            except:
                obj = ''

            try:
                cmd = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['properties']['map']['exec']
            except:
                cmd = ''

            try:
                path = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['predicateObjectPath']['string']
            except:
                path = ''

            try:
                timestampnano = x['datum']['com.bbn.tc.schema.avro.cdm20.Event']['timestampNanos']
                timestamp = x['@timestamp']
            except:
                timestamp = ''
                timestampnano = ''

            if action in edge_types:
                info_data = {'actorID': actor, 'objectID': obj, 'action': action, 'timestampNanos': timestampnano, 'timestamp': timestamp, 'exec': cmd, 'path': path, 'hostid': x['hostId']}
                info_buffer.append(info_data)
    
    stitch(info_buffer,file_path)

In [None]:
query = {
      "query": {
        "bool": {
          "should": [
            {
              "range": {
                "@timestamp": {
                  "gte": "2019-05-15T00:00:00.360Z",
                  "lte": "2019-05-16T00:00:00.360Z",
                  "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
                }
              }
            },
            {
              "range": {
                "@timestamp": {
                  "gte": "2019-05-17T00:00:00.360Z",
                  "lte": "2019-05-18T00:00:00.360Z",
                  "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
                }
              }
            }
          ],
          "minimum_should_match": 1
        }
      }
    }
    
query_elastic(query,"mal_clearscope1")

In [None]:
import concurrent.futures
import json
from tqdm import tqdm

tasks = [
    ({"query": {"range": {"@timestamp": {"gte": "2019-05-08T00:00:00.360Z", "lte": "2019-05-09T00:00:00.360Z", "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"}}}}, "ben_clearscope1"),
    ({"query": {"bool": {"should": [{"range": {"@timestamp": {"gte": "2019-05-15T00:00:00.360Z", "lte": "2019-05-16T00:00:00.360Z", "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"}},},{"range": {"@timestamp": {"gte": "2019-05-17T00:00:00.360Z", "lte": "2019-05-18T00:00:00.360Z", "format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"}}}], "minimum_should_match": 1}}}, "mal_clearscope1"),
]

# Execute the queries in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Schedule the query_elastic function to be called with the queries
    future_to_query = {executor.submit(query_elastic, task[0], task[1]): task for task in tasks}
    
    # Iterate through the completed futures
    for future in concurrent.futures.as_completed(future_to_query):
        task = future_to_query[future]
        try:
            # Get the result of the execution
            data = future.result()
            # Optionally, process the result here
        except Exception as exc:
            print(f'{task[1]} generated an exception: {exc}')
        else:
            print(f'{task[1]} has completed')

In [None]:
import json
from datetime import datetime
from tqdm import tqdm

# Define the time ranges
time_ranges = [
    {"gte": "2019-05-15T00:00:00", "lte": "2019-05-16T00:00:00"},
    {"gte": "2019-05-17T00:00:00", "lte": "2019-05-18T00:00:00"},
]

# Convert time range strings to datetime objects for comparison
for range_ in time_ranges:
    range_["gte"] = datetime.strptime(range_["gte"], "%Y-%m-%dT%H:%M:%S")
    range_["lte"] = datetime.strptime(range_["lte"], "%Y-%m-%dT%H:%M:%S")

def record_in_range(record_timestamp, time_ranges):
    """Check if the record's timestamp falls within any of the specified ranges."""
    # Remove timezone and microseconds from the timestamp
    # This takes up to the seconds component and ignores the rest
    timestamp_without_tz_and_microseconds = record_timestamp[:19]
    
    # Parse the adjusted timestamp
    record_dt = datetime.strptime(timestamp_without_tz_and_microseconds, "%Y-%m-%dT%H:%M:%S")
    
    # Check if the datetime object falls within any of the specified ranges
    return any(range_["gte"] <= record_dt <= range_["lte"] for range_ in time_ranges)


info_buffer = []

# Assuming the file path is 'your_file.json'
with open('e5_data/clearscope_info.json', 'r') as file:
    file.seek(0, 2)  # Move to the end of the file to get the total size
    total_size = file.tell()
    file.seek(0)  # Reset to the start of the file

    processed_bytes = 0  # Initialize processed bytes counter
    
    with tqdm(total=total_size, unit='B', unit_scale=True, desc="Processing records") as pbar:
        for line in file:
            line_bytes = len(line.encode('utf-8'))  # Get the byte length of the line
            processed_bytes += line_bytes  # Update processed bytes
            
            line = line.replace("'", '"')  # Ensure proper JSON formatting
            try:
                record = json.loads(line)
                if record_in_range(record['timestamp'], time_ranges):
                    info_buffer.append(record)
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
            
            pbar.update(line_bytes)  # Update the progress bar with the byte length of the processed line

print(f"Filtered records count: {len(info_buffer)}")

In [None]:
import json
from datetime import datetime
from tqdm import tqdm

# Define the time ranges
time_ranges = [
    {"gte": "2019-05-15T00:00:00", "lte": "2019-05-16T00:00:00"},
    {"gte": "2019-05-17T00:00:00", "lte": "2019-05-18T00:00:00"},
]

# Convert time range strings to datetime objects for comparison
for range_ in time_ranges:
    range_["gte"] = datetime.strptime(range_["gte"], "%Y-%m-%dT%H:%M:%S")
    range_["lte"] = datetime.strptime(range_["lte"], "%Y-%m-%dT%H:%M:%S")

def record_in_range(record_timestamp, time_ranges):
    record_dt = datetime.strptime(record_timestamp[:19], "%Y-%m-%dT%H:%M:%S")
    return any(range_["gte"] <= record_dt <= range_["lte"] for range_ in time_ranges)

info_buffer = []

total_records = 152489742  # The total number of records

# Assuming the file path is 'your_file.json'
with open('e5_data/clearscope_info.json', 'r') as file:
    # Initialize the progress bar with the total number of records
    with tqdm(total=total_records, unit='records', desc="Processing records") as pbar:
        for line in file:
            try:
                record = json.loads(line)
                if record_in_range(record['timestamp'], time_ranges):
                    info_buffer.append(record)
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
            finally:
                # Update the progress bar by one for each record processed
                pbar.update(1)

print(f"Filtered records count: {len(info_buffer)}")

In [None]:
import dask.dataframe as dd
from datetime import datetime
import pandas as pd
from dask.diagnostics import ProgressBar

time_ranges = [
    {"gte": "2019-05-15T00:00:00", "lte": "2019-05-16T00:00:00"},
    {"gte": "2019-05-17T00:00:00", "lte": "2019-05-18T00:00:00"},
]

# Convert the time range strings to datetime objects for comparison
for range_ in time_ranges:
    range_["gte"] = datetime.strptime(range_["gte"], "%Y-%m-%dT%H:%M:%S")
    range_["lte"] = datetime.strptime(range_["lte"], "%Y-%m-%dT%H:%M:%S")

def record_in_range(record_timestamp, time_ranges):
    # If the input is already a datetime object (e.g., Timestamp), use it directly
    if isinstance(record_timestamp, datetime):
        record_dt = record_timestamp
    else:
        # Otherwise, assume it's a string and parse it
        record_dt = datetime.strptime(record_timestamp[:19], "%Y-%m-%dT%H:%M:%S")
    
    # Convert to offset-naive UTC datetime for comparison
    if record_dt.tzinfo is not None:
        record_dt = record_dt.replace(tzinfo=None)
    
    return any(range_["gte"] <= record_dt <= range_["lte"] for range_ in time_ranges)

# Function to apply to each partition of the Dask DataFrame
def filter_partition(partition, time_ranges=time_ranges):
    return partition[partition['timestamp'].apply(lambda x: record_in_range(x, time_ranges))]

# Assuming JSON records are stored one per line in 'your_file.json'
ddf = dd.read_json('e5_data/clearscope_info.json', blocksize="1GB")

# Apply the filtering function to each partition
filtered_ddf = ddf.map_partitions(filter_partition, time_ranges=time_ranges, meta=ddf)

# Wrap the compute() call with the ProgressBar context manager
with ProgressBar():
    filtered_result = filtered_ddf.compute()

# Convert the result to a list of dictionaries
info_buffer = filtered_result.to_dict(orient='records')

print(f"Filtered records count: {len(info_buffer)}")

In [None]:
import dask.dataframe as dd
from datetime import datetime
import pandas as pd
from dask.diagnostics import ProgressBar

time_ranges = [
    {"gte": "2019-05-15T00:00:00", "lte": "2019-05-16T00:00:00"},
]

# Convert the time range strings to datetime objects for comparison
for range_ in time_ranges:
    range_["gte"] = datetime.strptime(range_["gte"], "%Y-%m-%dT%H:%M:%S")
    range_["lte"] = datetime.strptime(range_["lte"], "%Y-%m-%dT%H:%M:%S")

def record_in_range(record_timestamp, time_ranges):
    # If the input is already a datetime object (e.g., Timestamp), use it directly
    if isinstance(record_timestamp, datetime):
        record_dt = record_timestamp
    else:
        # Otherwise, assume it's a string and parse it
        record_dt = datetime.strptime(record_timestamp[:19], "%Y-%m-%dT%H:%M:%S")
    
    # Convert to offset-naive UTC datetime for comparison
    if record_dt.tzinfo is not None:
        record_dt = record_dt.replace(tzinfo=None)
    
    return any(range_["gte"] <= record_dt <= range_["lte"] for range_ in time_ranges)

# Function to apply to each partition of the Dask DataFrame
def filter_partition(partition, time_ranges=time_ranges):
    return partition[partition['timestamp'].apply(lambda x: record_in_range(x, time_ranges))]

# Assuming JSON records are stored one per line in 'your_file.json'
ddf = dd.read_json('e5_data/theia_info.json', blocksize="1GB")

# Apply the filtering function to each partition
filtered_ddf = ddf.map_partitions(filter_partition, time_ranges=time_ranges, meta=ddf)

# Wrap the compute() call with the ProgressBar context manager
with ProgressBar():
    filtered_result = filtered_ddf.compute()

# Convert the result to a list of dictionaries
info_buffer = filtered_result.to_dict(orient='records')

print(f"Filtered records count: {len(info_buffer)}")

In [None]:
stitch(info_buffer,"theia_mal")

In [None]:
import dask.dataframe as dd
from datetime import datetime
import pandas as pd
from dask.diagnostics import ProgressBar

time_ranges = [
    {"gte": "2019-05-8T00:00:00", "lte": "2019-05-9T00:00:00"},
]

# Convert the time range strings to datetime objects for comparison
for range_ in time_ranges:
    range_["gte"] = datetime.strptime(range_["gte"], "%Y-%m-%dT%H:%M:%S")
    range_["lte"] = datetime.strptime(range_["lte"], "%Y-%m-%dT%H:%M:%S")

def record_in_range(record_timestamp, time_ranges):
    # If the input is already a datetime object (e.g., Timestamp), use it directly
    if isinstance(record_timestamp, datetime):
        record_dt = record_timestamp
    else:
        # Otherwise, assume it's a string and parse it
        record_dt = datetime.strptime(record_timestamp[:19], "%Y-%m-%dT%H:%M:%S")
    
    # Convert to offset-naive UTC datetime for comparison
    if record_dt.tzinfo is not None:
        record_dt = record_dt.replace(tzinfo=None)
    
    return any(range_["gte"] <= record_dt <= range_["lte"] for range_ in time_ranges)

# Function to apply to each partition of the Dask DataFrame
def filter_partition(partition, time_ranges=time_ranges):
    return partition[partition['timestamp'].apply(lambda x: record_in_range(x, time_ranges))]

# Assuming JSON records are stored one per line in 'your_file.json'
ddf = dd.read_json('e5_data/theia_info.json', blocksize="1GB")

# Apply the filtering function to each partition
filtered_ddf = ddf.map_partitions(filter_partition, time_ranges=time_ranges, meta=ddf)

# Wrap the compute() call with the ProgressBar context manager
with ProgressBar():
    filtered_result = filtered_ddf.compute()

# Convert the result to a list of dictionaries
info_buffer = filtered_result.to_dict(orient='records')

print(f"Filtered records count: {len(info_buffer)}")

In [None]:
stitch(info_buffer,"theia_ben")