In [None]:
! pip install boto3
! pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
! pip install azure-storage-blob
! pip install pyarrow
! pip install dotenv

Collecting dotenv
  Downloading dotenv-0.0.5.tar.gz (2.4 kB)
  Downloading dotenv-0.0.4.tar.gz (2.0 kB)
  Downloading dotenv-0.0.2.tar.gz (6.7 kB)
  Downloading dotenv-0.0.1.tar.gz (6.5 kB)
[31mERROR: Could not find a version that satisfies the requirement dotenv (from versions: 0.0.1, 0.0.2, 0.0.4, 0.0.5)[0m
[31mERROR: No matching distribution found for dotenv[0m


In [None]:
! pip freeze

absl-py==0.12.0
alabaster==0.7.12
albumentations==0.1.12
altair==4.1.0
appdirs==1.4.4
argcomplete==1.12.3
argon2-cffi==21.1.0
arviz==0.11.4
astor==0.8.1
astropy==4.3.1
astunparse==1.6.3
atari-py==0.2.9
atomicwrites==1.4.0
attrs==21.2.0
audioread==2.1.9
autograd==1.3
azure-common==1.1.27
azure-core==1.19.0
azure-nspkg==3.0.2
azure-storage-blob==12.9.0
Babel==2.9.1
backcall==0.2.0
beautifulsoup4==4.6.3
bleach==4.1.0
blis==0.4.1
bokeh==2.3.3
boto3==1.19.2
botocore==1.22.2
Bottleneck==1.3.2
branca==0.4.2
bs4==0.0.1
CacheControl==0.12.6
cached-property==1.5.2
cachetools==4.2.4
catalogue==1.0.0
certifi==2021.5.30
cffi==1.14.6
cftime==1.5.1
chardet==3.0.4
charset-normalizer==2.0.6
clang==5.0
click==7.1.2
cloudpickle==1.3.0
cmake==3.12.0
cmdstanpy==0.9.5
colorcet==2.0.6
colorlover==0.3.0
community==1.0.0b1
contextlib2==0.5.5
convertdate==2.3.2
coverage==3.7.1
coveralls==0.5
crcmod==1.7
cryptography==35.0.0
cufflinks==0.17.3
cvxopt==1.2.7
cvxpy==1.0.31
cycler==0.10.0
cymem==2.0.5
Cython==0.29.2

In [1]:
import pandas as pd
import boto3
from botocore import UNSIGNED
from botocore.client import Config
from io import StringIO, BytesIO
from pprint import pprint
import csv
from datetime import datetime, timedelta
import pyarrow as pa
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import pyarrow.parquet as pq
from azure.core.exceptions import ResourceNotFoundError

In [None]:
# azure

def read_csv_from_blob(container, meta_file_name):
  meta_file = container.get_blob_client(meta_file_name)

  with open('meta.csv', "wb") as my_blob:
    blob_data = meta_file.download_blob()
    blob_data.readinto(my_blob)

  df = pd.read_csv('meta.csv')
  return df

In [None]:
# adaptive layer.

# aws
def read_csv_to_df(bucket, key, decoding='utf-8', sep=','):
    csv_obj = bucket.Object(key=key).get()['Body'].read().decode('utf-8')
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=sep)
    return df

# azure


def write_csv_blob(container, meta_file_name, df):
    blob_client = container.get_blob_client(blob=meta_file_name)
    blob_client.upload_blob(df.to_csv(index=False), overwrite=True)
    return True

# azure


def write_to_azure_blog(conn_str, container, df, key):
    parquet_file = BytesIO()
    df.to_parquet(parquet_file, engine='pyarrow')
    parquet_file.seek(0)

    blob_service_client = BlobServiceClient.from_connection_string(conn_str)
    blob_client = blob_service_client.get_blob_client(
        container='processed', blob=key)
    blob_client.upload_blob(parquet_file)
    return True


def write_to_blob(dataframe: pd.DataFrame, container, filename, fileformat):

    if dataframe.empty:
        print('Nothing to write, dataframe is empty')
        return None

    elif fileformat == 'csv':
        blob_client = container.get_blob_client(blob=filename)
        blob_client.upload_blob(dataframe.to_csv(index=False), overwrite=True)
        return True

    elif fileformat == 'parquet':

        parquet_file = BytesIO()
        dataframe.to_parquet(parquet_file, engine='pyarrow')
        parquet_file.seek(0)

        blob_client = container.get_blob_client(blob=filename)
        blob_client.upload_blob(parquet_file)
        return True


# aws
def list_files_in_prefix(bucket, prefix):
    files = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]
    return files

# azure


def get_date_list(container, arg_date, src_format, meta_file_name):

    min_date = datetime.strptime(
        arg_date, src_format).date() - timedelta(days=1)
    today_date = datetime.today().date()
    try:
        df_meta = read_csv_from_blob(container, meta_file_name)
        dates = [(min_date + timedelta(days=x))
                 for x in range(0, (today_date - min_date).days + 1)]
        print(dates)
        print('\n')

        src_dates = set(pd.to_datetime(df_meta['source_date']).dt.date)
        dates_missing = set(dates[1:]) - src_dates

        if dates_missing:
            min_date = min(set(dates[1:]) - src_dates) - timedelta(days=1)
            return_dates = [date.strftime('%Y-%m-%d')
                            for date in dates if date >= min_date]

            return_min_date = arg_date
        else:
            return_dates = []
            return_min_date = datetime(2200, 1, 1).date()

    except azure.core.exceptions.ResourceNotFoundError:
        return_dates = [(min_date + timedelta(days=x)).strftime(src_format)
                        for x in range(0, (today_date - min_date).days + 1)]
        return_min_date = arg_date

    return return_min_date, return_dates

# azure


def update_meta(extract_date_list, container, meta_file_name):
    df_new = pd.DataFrame(columns=['source_date', 'datetime_of_processing'])
    df_new['source_date'] = extract_date_list
    df_new['datetime_of_processing'] = datetime.today().strftime(
        '%Y-%m-%d %H:%M:%S')

    df_old = read_csv_from_blob(container, meta_file_name)

    df_all = pd.concat([df_old, df_new])
    write_csv_blob(container, meta_file_name, df_all)


In [None]:
# Application layer

def extract(bucket, date_list):
  print('-------------------------------------------------------EXTRACTING-------------------------------------------\n')

  files = [key for date in date_list for key in list_files_in_prefix(bucket, date)]

  df = pd.concat([read_csv_to_df(bucket, obj) for obj in files], ignore_index = True)
  print('-------------------------------------------------------EXTRACTION COMPLETE-------------------------------------------\n')
  return df

def transform_report1(df, columns, date):

  df = df.loc[:, columns]
  df.dropna(inplace=True)

  df['opening_price'] = df.sort_values(by =['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('first')
  df['closing_price'] = df.sort_values(by =['Time']).groupby(['ISIN','Date'])['StartPrice'].transform('last')


  df = df.groupby(['ISIN', 'Date'], as_index = False).agg(
        opening_price_eur = ('opening_price', 'min'), closing_price_eur = ('closing_price', 'min'),
        minimum_price = ('MinPrice', 'min'), maximum_price = ('MaxPrice', 'max'), daily_traded_volume = ('TradedVolume', 'sum'),
                         )
  df['prev_closing_price'] = df.sort_values(by=['Date']).groupby(['ISIN'])['closing_price_eur'].shift(1)

  df['change_prev_closing%'] = (df['closing_price_eur'] - df['prev_closing_price']) / df['prev_closing_price'] *100

  df = df.round(decimals=2)
  df.drop(columns=['prev_closing_price'], inplace = True)

  df = df[df.Date >= date]


  return df

def load(df, container, conn_str, extract_date_list, meta_file_name):
  key = 'xetra_daily_report' + datetime.today().strftime("%Y%m%d_%H%M%S") + '.parquet'
  write_to_azure_blog(conn_str, container, df, key)  
  update_meta(extract_date_list, container, meta_file_name)

  return True

def etl_report1(bucket, date_list, columns, container, conn_str, date, meta_file_name):
  df = extract(bucket, date_list)
  df =transform_report1(df, columns, date)

  extract_date_list = [dat for dat in date_list if dat >= date]
  load(df, container, conn_str, extract_date_list, meta_file_name)

  return True


In [None]:
def main():
  # parameters/configrations
  date = '2021-10-24'
  src_format = '%Y-%m-%d'
  src_bucket_name = 'deutsche-boerse-xetra-pds'
  target_container = 'processed'
  columns = ['ISIN', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
  conn_str = 'DefaultEndpointsProtocol=https;AccountName=xetratl;AccountKey=uVTwzEe5IA2+Pf8xlBR9gxDXvLrzNLc3bjh3oQnMXGFyfesQMK44f2UB9XVNbLiCTofAP8vMeBxjW+cO1GyRyA==;EndpointSuffix=core.windows.net'
    container = 'processed'
  meta_file_name = 'meta_file.csv'
  # init bucket connection
  s3 = boto3.resource(service_name = 's3',config = Config(signature_version = UNSIGNED))
  bucket = s3.Bucket(src_bucket_name)

  # init blob connection
  blob_client = BlobServiceClient.from_connection_string(conn_str=conn_str)
  container = blob_client.get_container_client(container)
  # run application

  min_dat, date_list = get_date_list(container, date, src_format, meta_file_name)
  etl_report1(bucket, date_list, columns, container, conn_str, date, meta_file_name)

In [None]:
# run--
main()


[datetime.date(2021, 10, 23), datetime.date(2021, 10, 24), datetime.date(2021, 10, 25)]


-------------------------------------------------------EXTRACTING-------------------------------------------

-------------------------------------------------------EXTRACTION COMPLETE-------------------------------------------

-------------------------------------------------------TRANSFORMING-------------------------------------------

-------------------------------------------------------TRANSFORMATION COMPLETE-------------------------------------------

-------------------------------------------------------LOADING-------------------------------------------

-------------------------------------------------------LOADING COMPLETE-------------------------------------------

-------------------------------------------------------ETL JOB COMPLETE-------------------------------------------



In [2]:
s3 = boto3.resource(service_name = 's3',config = Config(signature_version = UNSIGNED))

In [3]:
bucket = s3.Bucket('deutsche-boerse-xetra-pds')

In [13]:
[obj.key for obj in bucket.objects.filter(Prefix='2021-11-02 00:00:00')]

[]

AttributeError: 's3.Bucket.objectsCollection' object has no attribute 'key'