In [3]:
import logging
import os

from bs4 import BeautifulSoup
import urllib3

import requests, zipfile
import pandas
import sqlalchemy

#####

MASTR_URL = "https://www.marktstammdatenregister.de/MaStR/Datendownload"
XML_DUMMY_PATH = r"/uba/mastr/MaStR/Vollauszüge/recent/"
XML_FILTER = ["Netzanschlusspunkte"] # More can be added...

CONN_PARAMS_DIC = {
    "host": "10.0.0.102",
    "dbname": "mastr",
    "user": "uba_user",
    "password": "UBAit2021!",
    "port": "5432"
}

In [7]:
class MastrDBUpdate():
    def __init__(self, xmlPath:str=XML_DUMMY_PATH, dbParameterDic:dict=CONN_PARAMS_DIC, xmlFilter:list=XML_FILTER):
        self.xmlPath = xmlPath
        self.xmlFilter = xmlFilter
        self.xmlList = list()
        self.__db_parameter_dic = dbParameterDic
        self.__postgres_conn_string = self.__build_postgres_conn_string(param=self.__db_parameter_dic)

    def __build_postgres_conn_string (self, param:dict) -> str:
        return f'postgresql+psycopg2://{param["user"]}:{param["password"]}@{param["host"]}:{param["port"]}/{param["dbname"]}'

    def __create_postgres_engine (self):
        return sqlalchemy.create_engine(self.__postgres_conn_string, pool_recycle=3600) # , poolclass=NullPool)

    def xml_file_check(self, filter:bool=True, downloadMissing:bool=False):
        files = os.listdir(self.xmlPath)
        if not files or files[0].split(".")[-1].lower() != "xml":
            print(f"No files or .xml files in directory: {self.xmlPath}")
            if downloadMissing:
                print("Downloading and extracting todays 'Vollauszug' from MaStR Homepage into the mentioned directory.")
                #mastrDownloader = MastrDownloader()
                #mastrDownloader.clear_directory()
                #mastrDownloader.get_mastr_download_link()
                #mastrDownloader.download_mastr_files()
                #mastrDownloader.extract_mastr_files()
            else:
                return

            files = os.listdir(self.xmlPath)

        files.sort()
        filenames = [element.split(".")[0].split("_")[0] for element in files]
        uniqueFilenames = list(set(filenames))

        stackedList = []
        for uniqueName in uniqueFilenames:
            filteredList = [self.xmlPath + k for k in files if uniqueName in k]
            stackedList.append(filteredList)

        if filter:
            for i, liste in enumerate(stackedList):
                stackedList[i] = [x for x in liste if all(y not in x for y in self.xmlFilter)]
            stackedList = [x for x in stackedList if x != []]
            print(".xml choice for upload to DB filtered for listed invalid .xml's")
        
        self.xmlList = stackedList

    def xml_to_DataFrame(self, XMLpathList:list) -> pandas.DataFrame:
        listDfs = [pandas.read_xml(path_or_buffer=file, encoding="utf-16") for file in XMLpathList]
        return pandas.concat(listDfs,ignore_index=True)

    def __change_dtype_datetime(df:pandas.DataFrame):# -> pandas.DataFrame:
        listOfDateCols = list(df.filter(regex="datum(?i)").columns) # search for datum case insensitive
    
        for col in listOfDateCols:
            df[col] = pandas.to_datetime(df[col], errors = 'ignore')

    def update_mastr_postgres(self):
        engine = self.__create_postgres_engine()

        if len(self.xmlList)==0:
            print("no xml data found")
            return

        for files in self.xmlList:
            tableName = files[0].split("/")[-1].split(".")[0].split("_")[0]

            print(f"{tableName} start reading into dataFrame")
            listDfs = [pandas.read_xml(path_or_buffer=file, encoding="utf-16") for file in files]

            print(f"{tableName} start concating dataFrames")
            df = pandas.concat(listDfs,ignore_index=True)

            print("changing dtypes to dateTime")
            self.__change_dtype_datetime(df)

            print(f"{tableName} load dataFrames into Postgres-DB")
            df.to_sql(
                name=tableName,
                schema="mastr_raw",
                con=engine,
                if_exists="replace",
                index=False
            )
            del(df, listDfs, tableName)
            print(".")

        engine.dispose()
        print("connection to DB disposed")

updateDB = MastrDBUpdate()