Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve connection towards Azure #121

Merged
merged 3 commits into from
Jan 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 30 additions & 28 deletions datareservoirio/storage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@

log = logging.getLogger(__name__)

_BLOBSTORAGE_SESSION = requests.Session()
_BLOBSTORAGE_SESSION.mount(
"https://",
requests.adapters.HTTPAdapter(
max_retries=requests.adapters.Retry(total=5, backoff_factor=0.4, backoff_max=10)
),
)


def _encode_for_path_safety(value):
return str(base64.urlsafe_b64encode(str(value).encode()).decode())
Expand Down Expand Up @@ -276,7 +284,7 @@ def _evict_from_cache(self):
)


def _blob_to_df(blob_url):
def _blob_to_df(blob_url, session=_BLOBSTORAGE_SESSION):
"""
Download blob from remote storage and present as a Pandas Series.

Expand All @@ -285,6 +293,8 @@ def _blob_to_df(blob_url):
blob_url : str
Fully formated URL to the blob. Must contain all the required parameters
in the URL.
session : requests.Session, default _BLOBSTORAGE_SESSION
Session object to make HTTP calls.

Return
------
Expand All @@ -293,19 +303,15 @@ def _blob_to_df(blob_url):
(``Int64``) and column ``values`` are ``str`` or ``float64``.
"""

with requests.Session() as session:
retries = requests.adapters.Retry(total=5, backoff_factor=0.4, backoff_max=10)
session.mount("https://", requests.adapters.HTTPAdapter(max_retries=retries))
response = session.request(method="get", url=blob_url, timeout=30, stream=True)
response.raise_for_status()

response.encoding = "utf-8" # enforce encoding
response = session.request(method="get", url=blob_url, timeout=30, stream=True)
response.raise_for_status()
response.encoding = "utf-8" # enforce encoding

content = [
line.split(",", maxsplit=1)
for line in response.iter_lines(decode_unicode=True)
if line
]
content = [
line.split(",", maxsplit=1)
for line in response.iter_lines(decode_unicode=True)
if line
]

df = (
pd.DataFrame(content, columns=("index", "values"), copy=False)
Expand All @@ -316,7 +322,7 @@ def _blob_to_df(blob_url):
return df


def _df_to_blob(df, blob_url):
def _df_to_blob(df, blob_url, session=_BLOBSTORAGE_SESSION):
"""
Upload a Pandas Dataframe as blob to a remote storage.

Expand All @@ -328,6 +334,9 @@ def _df_to_blob(df, blob_url):
df : pandas.DataFrame
Pandas DataFrame where column ``index`` is nano-seconds since epoch
(``Int64``) and column ``values`` are ``str`` or ``float64``.
session : requests.Session, default _BLOBSTORAGE_SESSION
Session object to make HTTP calls.

"""
if not isinstance(df, pd.DataFrame):
raise ValueError
Expand All @@ -340,18 +349,11 @@ def _df_to_blob(df, blob_url):
df.to_csv(fp, line_terminator="\n", **kwargs)
fp.seek(0)

with requests.Session() as session:
retries = requests.adapters.Retry(
total=5, backoff_factor=0.4, backoff_max=10
)
session.mount(
"https://", requests.adapters.HTTPAdapter(max_retries=retries)
)
session.request(
method="put",
url=blob_url,
headers={"x-ms-blob-type": "BlockBlob"},
data=fp,
timeout=(30, None),
).raise_for_status()
session.request(
method="put",
url=blob_url,
headers={"x-ms-blob-type": "BlockBlob"},
data=fp,
timeout=(30, None),
).raise_for_status()
return