Skip to content

Commit

Permalink
use better temp dir
Browse files Browse the repository at this point in the history
  • Loading branch information
pnadolny13 committed May 11, 2024
1 parent 4710d9e commit e266d87
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import csv
import pathlib
import tempfile
import uuid
from tempfile import mkstemp
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union, cast

import sqlalchemy as sa
Expand Down Expand Up @@ -147,27 +147,28 @@ def bulk_insert_records( # type: ignore[override]
True if table exists, False if not, None if unsure or undetectable.
"""
columns = self.column_representation(schema)
temp_dir = "./output"
columns_str = ','.join(
[
f'"{column.name}"' for column in columns
]
)
csv_fd, csv_file = mkstemp(suffix='.csv', prefix=f'{table}_', dir=temp_dir)
with open(csv_fd, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=[column.name for column in columns])
for record in records:
writer.writerow(record)

copy_sql = "COPY {} ({}) FROM STDIN WITH (FORMAT CSV, ESCAPE '\\')".format(
table.name,
columns_str,
)
self.logger.info("Inserting with SQL: %s", copy_sql)
with open(csv_file, "r") as f:
with connection.connection.cursor() as cur:
cur.copy_expert(copy_sql, f)
pathlib.Path.unlink(csv_file)
script_dir = pathlib.Path(__file__).parent
with tempfile.TemporaryDirectory(dir=script_dir) as temp_dir:
csv_fd, csv_file = tempfile.mkstemp(suffix='.csv', prefix=f'{table}_', dir=temp_dir)
with open(csv_fd, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=[column.name for column in columns])
for record in records:
writer.writerow(record)

copy_sql = "COPY {} ({}) FROM STDIN WITH (FORMAT CSV, ESCAPE '\\')".format(
table.name,
columns_str,
)
self.logger.info("Inserting with SQL: %s", copy_sql)
with open(csv_file, "r") as f:
with connection.connection.cursor() as cur:
cur.copy_expert(copy_sql, f)
pathlib.Path.unlink(csv_file)
return True

def upsert(
Expand Down

0 comments on commit e266d87

Please sign in to comment.