Skip to content

Commit

Permalink
Merge 9b63605 into 29c0df7
Browse files Browse the repository at this point in the history
  • Loading branch information
mediuminvader committed Aug 18, 2023
2 parents 29c0df7 + 9b63605 commit a58356c
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 48 deletions.
14 changes: 8 additions & 6 deletions protos/types.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";

package dozer.types;

Expand Down Expand Up @@ -63,7 +64,7 @@ enum Type {
Decimal = 9; // Decimal number.
Timestamp = 10; // ISO 8601 combined date and time with time zone.
Date = 11; // ISO 8601 calendar date without timezone.
Bson = 12; // BSON data.
Json = 12; // JSON data.
Point = 13; // Geo Point type.
Duration = 14; // Duration type.
}
Expand Down Expand Up @@ -96,13 +97,13 @@ message DurationType {

// rust-decimal as a message
message RustDecimal {
// the sign of the Decimal value, 0 meaning positive and 1 meaning negative
uint32 flags = 1;
// the lo, mid, hi, and flags fields contain the representation of the Decimal
// value as a 96-bit integer
uint32 scale = 1;
uint32 lo = 2;
uint32 mid = 3;
uint32 hi = 4;
bool negative = 5;
}

// A field value.
Expand All @@ -119,8 +120,9 @@ message Value {
bytes bytes_value = 8; // Binary data.
RustDecimal decimal_value = 9; // Decimal value.
google.protobuf.Timestamp timestamp_value = 10; // DateTime & Timestamp.
string date_value = 11; // ISO 8601 calendar date without timezone.
PointType point_value = 12; // Point type.
DurationType duration_value = 13; // Duration type.
string date_value = 11; // ISO 8601 calendar date without timezone.
PointType point_value = 12; // Point type.
DurationType duration_value = 13; // Duration type.
google.protobuf.Value json_value = 14; // JSON type.
};
}
9 changes: 7 additions & 2 deletions pydozer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ class ApiClient:
url (str, optional): Dozer gRPC URL. Defaults to Env variable DOZER_API_URL or `0.0.0.0:50051`.
secure (bool, optional): Intialize a secure channel. Defaults to False.
"""
def __init__(self, endpoint, url=DOZER_API_URL, secure=False, token=None):
def __init__(self, endpoint, url=DOZER_API_URL, app_id=None, secure=False, token=None):

self.metadata = [('authorization', f'Bearer {token}')] if token else None
self.metadata = []

if app_id:
self.metadata = [('x-dozer-app-id', app_id)]
if token:
self.metadata.insert(0, ('authorization', f'Bearer {token}'))

if secure:
channel = grpc.secure_channel(url)
Expand Down
13 changes: 8 additions & 5 deletions pydozer/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ class IngestClient:
url (str, optional): Ingest Server URL. Defaults to DOZER_INGEST_URL or `0.0.0.0:8085`.
secure (bool, optional): Intialize a secure channel. Defaults to False.
"""
def __init__(self, url=DOZER_INGEST_URL, secure=False):
def __init__(self, url=DOZER_INGEST_URL, app_id=None, secure=False):

self.metadata = [('x-dozer-connector-app-id', app_id)] if app_id else None

channel = grpc.insecure_channel(url)
if secure:
channel = grpc.secure_channel(url)
Expand Down Expand Up @@ -46,14 +49,14 @@ def ingest_raw(self, request) -> IngestResponse:
Args:
request (IngestRequest):
"""
return self.ingestor.ingest(request)
return self.ingestor.ingest(request, metadata=self.metadata)

def ingest_raw_stream(self, generator) -> IngestResponse:
"""Ingest a stream in Common Format into Dozer
Args:
generator: Generator function that yields IngestRequest
"""
return self.ingestor.ingest(generator)
return self.ingestor.ingest(generator, metadata=self.metadata)

def ingest_df(self, schema_name, df, seq_no=1) -> IngestResponse:
"""Ingest a pandas dataframe into Dozer using ingest stream
Expand All @@ -69,7 +72,7 @@ def get_messages(seq_no):
yield rec
print("Ingesting via stream...")

return self.ingestor.ingest_stream(get_messages(seq_no))
return self.ingestor.ingest_stream(get_messages(seq_no), metadata=self.metadata)

def ingest_df_arrow(self, schema_name, df, batch_size=BATCH_SIZE, seq_no=1) -> IngestResponse:
"""Ingest a dataframe into Dozer in Arrow Format
Expand All @@ -94,4 +97,4 @@ def get_messages(seq_no):
pbar.close()
print("Ingesting via stream in Arrow Format...")

return self.ingestor.ingest_arrow_stream(get_messages(seq_no))
return self.ingestor.ingest_arrow_stream(get_messages(seq_no), metadata=self.metadata)
51 changes: 26 additions & 25 deletions pydozer/types_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions tests/dozer-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,23 @@ sources:
- name: trips
table_name: trips
connection: !Ref ingest
columns:
- name: trips_arrow
table_name: trips_arrow
connection: !Ref ingest_arrow
columns:
- name: users
table_name: users
connection: !Ref ingest
columns:

endpoints:
- name: trips
path: /trips
table_name: trips
index:
primary_key:

- name: trips_arrow
path: /trips_arrow
table_name: trips_arrow
index:
primary_key:

- name: users
path: /users
table_name: users
index:
primary_key:

3 changes: 2 additions & 1 deletion tests/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def dozer_server():
p = subprocess.Popen([which_dozer, "-c", "tests/dozer-config.yaml"], stdout=subprocess.PIPE)
while True:
line = p.stdout.readline()
if b'[api] Serving' in line:
print(line)
if b'Starting Rest Api Server' in line:
break
if b'ERROR' in line:
exit(1)
Expand Down

0 comments on commit a58356c

Please sign in to comment.