Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve connection towards Azure #121

Merged
merged 3 commits into from
Jan 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 30 additions & 28 deletions datareservoirio/storage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@

log = logging.getLogger(__name__)

_AZURE_SESSION = requests.Session()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a blobstorage or blobstorageaccount session? All of our resources are in azure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should call it BLOBSTORAGE_SESSION to be more precise? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just keep the underscore prefix, and call it whatever you like :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to BLOBSTORAGE_SESSION now

_AZURE_SESSION.mount(
"https://",
requests.adapters.HTTPAdapter(
max_retries=requests.adapters.Retry(total=5, backoff_factor=0.4, backoff_max=10)
),
)


def _encode_for_path_safety(value):
return str(base64.urlsafe_b64encode(str(value).encode()).decode())
Expand Down Expand Up @@ -276,7 +284,7 @@ def _evict_from_cache(self):
)


def _blob_to_df(blob_url):
def _blob_to_df(blob_url, session=_AZURE_SESSION):
"""
Download blob from remote storage and present as a Pandas Series.

Expand All @@ -285,6 +293,8 @@ def _blob_to_df(blob_url):
blob_url : str
Fully formated URL to the blob. Must contain all the required parameters
in the URL.
session : requests.Session, default _AZURE_SESSION
Session object to make HTTP calls.

Return
------
Expand All @@ -293,19 +303,15 @@ def _blob_to_df(blob_url):
(``Int64``) and column ``values`` are ``str`` or ``float64``.
"""

with requests.Session() as session:
retries = requests.adapters.Retry(total=5, backoff_factor=0.4, backoff_max=10)
session.mount("https://", requests.adapters.HTTPAdapter(max_retries=retries))
response = session.request(method="get", url=blob_url, timeout=30, stream=True)
response.raise_for_status()

response.encoding = "utf-8" # enforce encoding
response = session.request(method="get", url=blob_url, timeout=30, stream=True)
response.raise_for_status()
response.encoding = "utf-8" # enforce encoding

content = [
line.split(",", maxsplit=1)
for line in response.iter_lines(decode_unicode=True)
if line
]
content = [
line.split(",", maxsplit=1)
for line in response.iter_lines(decode_unicode=True)
if line
]

df = (
pd.DataFrame(content, columns=("index", "values"), copy=False)
Expand All @@ -316,7 +322,7 @@ def _blob_to_df(blob_url):
return df


def _df_to_blob(df, blob_url):
def _df_to_blob(df, blob_url, session=_AZURE_SESSION):
"""
Upload a Pandas Dataframe as blob to a remote storage.

Expand All @@ -328,6 +334,9 @@ def _df_to_blob(df, blob_url):
df : pandas.DataFrame
Pandas DataFrame where column ``index`` is nano-seconds since epoch
(``Int64``) and column ``values`` are ``str`` or ``float64``.
session : requests.Session, default _AZURE_SESSION
Session object to make HTTP calls.

"""
if not isinstance(df, pd.DataFrame):
raise ValueError
Expand All @@ -340,18 +349,11 @@ def _df_to_blob(df, blob_url):
df.to_csv(fp, line_terminator="\n", **kwargs)
fp.seek(0)

with requests.Session() as session:
retries = requests.adapters.Retry(
total=5, backoff_factor=0.4, backoff_max=10
)
session.mount(
"https://", requests.adapters.HTTPAdapter(max_retries=retries)
)
session.request(
method="put",
url=blob_url,
headers={"x-ms-blob-type": "BlockBlob"},
data=fp,
timeout=(30, None),
).raise_for_status()
session.request(
method="put",
url=blob_url,
headers={"x-ms-blob-type": "BlockBlob"},
data=fp,
timeout=(30, None),
).raise_for_status()
return