In [7]:
from dataclasses import dataclass
from typing import Any, Union, LiteralString
import os

In [None]:
"""
Design ideas and how I want the API to behave

I want to build a python dictionary of lists separate from creating the dataframe, so I can switch dataframe libraries whenever I want.

# First, I need to parse the raw_file into entries.
entries.parse() internally might look something like this

# generator inside of a comprehension pulls all of the chunks of text for each entry
return [entry for entry in match(raw_file_data)]

"""

In [10]:
@dataclass
class Column:
    """A class to represent a column in a table, with the column name and the type
    of data it holds.
    
    Attributes:
        name (str): The name of the column
        data_type (Any): The type of data the column holds
        parser: A function 
        parsing_rules dict["string": Any]: A dict carrying the extractors keyword arguments to be fed into the function when used to extract data for that column

    Returns:
        An instance of the Column class

    Example:
        column = Column(
            name="name",
            data_type=str,
            parser=Parser.regex(),
            parsing_rules={"pattern": r"([A-Za-z]+)"}
        )
        
    """

    name: str
    data_type: Any
    parser: Parser
    extract_rules: dict[str: Any]
    

    def extract(self, raw_data):
        self.extractor(parse_rules=self.parse_rules, raw_data=raw_data)

    def _convert_to_type(self, extracted_data)

SyntaxError: invalid syntax (3294232417.py, line 32)

In [None]:
@dataclass
class Schema:
    """An object the represents the rules for building a extracting data and inserting it
    into a table that will eventually be part of an in-memory database.

    This object is meant to provide a more developer-friendly API from which 
    to process the structure and data to be put into a given table.
    
    Attributes:
        schema (dict[str, Any]): 
            A dictionary representing the schema of the table,
            with the column names as keys and the types as values.

            Psuedo example:
            schema = {
                'name': {
                    'cast_to_type': str,
                    '},
                'age': int,
                'is_student': bool
            }

    Returns:
        An instance of the schema class
    """

    schema: 

In [2]:
@dataclass
class Table:
    """A class to represent a table and help assemble tabkles in an in-memory
    database.
    
    Attributes:
        db_path (str): 
            The path to the database file
        schema (Schema):
        table (dict[str, list[Any]]): A dictionary representing the table, with the
            column names as keys and the values as lists.

    Returns:
        An instance of the Table class, that can be used to build a table in the database.
    """

    db_path: str
    schema: Schema
    table: dict[str, list[Any]]

    def __post_init__(self):
        self.columns = [key for key in self.schema.keys()]
        self._build_attrs(self.columns)

    def build(self, file_path: Union[os.PathLike, str]):
        """Build the a dictionary representing the table, using the attributes
        representing column names and their accompanying list of values as each key:
        value pair.

        From the schema, also recast the values in the table to the appropriate type, as
        defined in the values of self.schema.
        
        Args:
            file_path (Union[os.PathLike, str]): The path to the file to be read and
            processed.

        Returns:
            dict: The dictionary , with all list values appropriately recasted to the
            schema-declared types, to be used as an object assemble a dataframe
        """

        for column in self.columns:
            type_caster = self.schema[column]
            values = [type_caster(value) for value in getattr(self, column)]
            self.table[column] = values

    def _build_attrs(self, attr_names: list[str]):
        """Insert all column names from the schema as attributes to the class instance"""
        for attr in attr_names:
            setattr(self, attr, [])


In [3]:
schema = {
    "first_name": str,
    "last_name": str,
}

data =

In [14]:
# source: https://stackoverflow.com/questions/39443427/extract-multiple-lines-from-a-file-using-regular-expressions-in-python

s = """13:45:09 HEY HOW ARE YOU

     I AM FINE

13:50:10 OK THEN

     Bye"""

import re

m = re.search(r'^\d\d:\d\d:\d\d(.*?)(?=\d\d:\d\d:\d\d)', s, re.I + re.M + re.S)
print(m.groups()[0])

 HEY HOW ARE YOU

     I AM FINE




In [16]:
import re

m = re.search(r'(?<=\d\d:\d\d:\d\d)(.*?)(?=\d\d:\d\d:\d\d)', s, re.DOTALL).group()
print(m)

 HEY HOW ARE YOU

     I AM FINE




In [46]:
s = """
2024-03-09 11:01:25 INFO i.a.w.t.TemporalAttemptExecution(get):126 - Cloud storage job log path: /workspace/10344/0/logs.log
2024-03-09 11:01:25 INFO i.a.w.t.TemporalAttemptExecution(get):129 - Executing worker wrapper. Airbyte version: 0.50.33
2024-03-09 11:01:25 INFO i.a.a.c.AirbyteApiClient(retryWithJitterThrows):290 - Attempt 0 to save workflow id for cancellation
2024-03-09 11:01:25 INFO i.a.w.g.BufferedReplicationWorker(run):152 - start sync worker. job id: 10344 attempt id: 0
2024-03-09 11:01:25 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2024-03-09 11:01:25 INFO i.a.c.i.LineGobbler(voidCall):149 - ----- START REPLICATION -----
2024-03-09 11:01:25 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2024-03-09 11:01:25 INFO i.a.w.i.DefaultAirbyteDestination(start):92 - Running destination...
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable LAUNCHDARKLY_KEY: ''
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable LAUNCHDARKLY_KEY: ''
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable FEATURE_FLAG_CLIENT: ''
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable FEATURE_FLAG_CLIENT: ''
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable OTEL_COLLECTOR_ENDPOINT: ''
2024-03-09 11:01:25 INFO i.a.c.EnvConfigs(getEnvOrDefault):1158 - Using default value for environment variable OTEL_COLLECTOR_ENDPOINT: ''
2024-03-09 11:01:25 INFO i.a.w.p.KubeProcessFactory(create):128 - Attempting to start pod = source-mssql-read-10344-0-mjxeg for airbyte/source-mssql:3.7.7 with resources ConnectorResourceRequirements[main=io.airbyte.config.ResourceRequirements@3e5a6e37[cpuRequest=1,cpuLimit=,memoryRequest=4Gi,memoryLimit=,additionalProperties={}], heartbeat=io.airbyte.config.ResourceRequirements@7bfce681[cpuRequest=0.05,cpuLimit=0.2,memoryRequest=25Mi,memoryLimit=50Mi,additionalProperties={}], stdErr=io.airbyte.config.ResourceRequirements@14352f7[cpuRequest=0.01,cpuLimit=0.5,memoryRequest=25Mi,memoryLimit=50Mi,additionalProperties={}], stdIn=null, stdOut=io.airbyte.config.ResourceRequirements@42a7a875[cpuRequest=0.5,cpuLimit=1,memoryRequest=25Mi,memoryLimit=50Mi,additionalProperties={}]] and allowedHosts io.airbyte.config.AllowedHosts@2fe15dc0[hosts=[vpce-0a5e7638383b5fa7a-tv2ean05.vpce-svc-06121b7af7f9312f6.us-west-2.vpce.amazonaws.com, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}]
2024-03-09 11:01:25 INFO i.a.w.p.KubeProcessFactory(create):128 - Attempting to start pod = destination-s3-write-10344-0-rtjwz for airbyte/destination-s3:0.5.8 with resources ConnectorResourceRequirements[main=io.airbyte.config.ResourceRequirements@1105c12d[cpuRequest=1,cpuLimit=,memoryRequest=4Gi,memoryLimit=,additionalProperties={}], heartbeat=io.airbyte.config.ResourceRequirements@7bfce681[cpuRequest=0.05,cpuLimit=0.2,memoryRequest=25Mi,memoryLimit=50Mi,additionalProperties={}], stdErr=io.airbyte.config.ResourceRequirements@10d81c2e[cpuRequest=0.01,cpuLimit=0.5,memoryRequest=25Mi,memoryLimit=50Mi,additionalProperties={}], stdIn=io.airbyte.config.ResourceRequirements@6154e722[cpuRequest=0.5,cpuLimit=1,memoryRequest=25Mi,memoryLimit=50Mi,additionalProperties={}], stdOut=io.airbyte.config.ResourceRequirements@41707047[cpuRequest=0.01,cpuLimit=0.5,memoryRequest=25Mi,memoryLimit=50Mi,additionalProperties={}]] and allowedHosts null
2024-03-09 11:04:33 source > INFO debezium-sqlserverconnector-facppm1-change-event-source-coordinator i.d.p.s.AbstractSnapshotChangeEventSource(execute):104 Snapshot - Final stage
2024-03-09 11:04:33 source > WARN debezium-sqlserverconnector-facppm1-change-event-source-coordinator i.d.p.s.AbstractSnapshotChangeEventSource(execute):115 Snapshot was not completed successfully, it will be re-executed upon connector restart
2024-03-09 11:04:33 source > INFO debezium-sqlserverconnector-facppm1-change-event-source-coordinator i.d.c.s.SqlServerSnapshotChangeEventSource(close):263 Removing locking timeout
2024-03-09 11:04:33 source > ERROR debezium-sqlserverconnector-facppm1-change-event-source-coordinator i.d.p.ErrorHandler(setProducerThrowable):52 Producer failure io.debezium.DebeziumException: java.util.concurrent.ExecutionException: java.lang.InterruptedException: Interrupted while snapshotting table facppm1.dbo.CMC_CDDL_CL_LINE
	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:101) ~[debezium-core-2.4.0.Final.jar:2.4.0.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253) ~[debezium-core-2.4.0.Final.jar:2.4.0.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237) ~[debezium-core-2.4.0.Final.jar:2.4.0.Final]
	at io.debezium.connector.sqlserver.SqlServerChangeEventSourceCoordinator.executeChangeEventSources(SqlServerChangeEventSourceCoordinator.java:82) ~[debezium-connector-sqlserver-2.4.0.Final.jar:2.4.0.Final]
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137) ~[debezium-core-2.4.0.Final.jar:2.4.0.Final]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.util.concurrent.ExecutionException: java.lang.InterruptedException: Interrupted while snapshotting table facppm1.dbo.CMC_CDDL_CL_LINE
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
	at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:463) ~[debezium-core-2.4.0.Final.jar:2.4.0.Final]
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:165) ~[debezium-core-2.4.0.Final.jar:2.4.0.Final]
	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:92) ~[debezium-core-2.4.0.Final.jar:2.4.0.Final]
	... 9 more
Caused by: java.lang.InterruptedException: Interrupted while snapshotting table facppm1.dbo.CMC_CDDL_CL_LINE
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:551) ~[debezium-core-2.4.0.Final.jar:2.4.0.Final]
	at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:515) ~[debezium-core-2.4.0.Final.jar:2.4.0.Final]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
	... 5 more

Stack Trace: io.debezium.DebeziumException: java.util.concurrent.ExecutionException: java.lang.InterruptedException: Interrupted while snapshotting table facppm1.dbo.CMC_CDDL_CL_LINE
	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:101)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237)
	at io.debezium.connector.sqlserver.SqlServerChangeEventSourceCoordinator.executeChangeEventSources(SqlServerChangeEventSourceCoordinator.java:82)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.util.concurrent.ExecutionException: java.lang.InterruptedException: Interrupted while snapshotting table facppm1.dbo.CMC_CDDL_CL_LINE
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:463)
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:165)
	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:92)
	... 9 more
Caused by: java.lang.InterruptedException: Interrupted while snapshotting table facppm1.dbo.CMC_CDDL_CL_LINE
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:551)
	at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	... 5 more
"""

import re
import rich

# Use re.findall to get all matches
# matches = re.findall(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [\w>]+ .*?)(?=\n\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [\w>]+ |\Z)", s, re.DOTALL)
matches = re.findall(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} .*?)(?=\n\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}|\Z)", s, re.DOTALL)

# rich.print(matches)

for match in matches:
    print("\n")
    rich.print(match)

print(f"There are {len(matches)} entries found")

















































































































There are 28 entries found
