In [1]:
# SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import os
from pyspark.sql.functions import *
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("lakehouse") \
    .master("spark://spark-master:7077") \
    .config("spark.delta.columnMapping.mode", "name") \
    .config("spark.sql.catalogImplementation","hive") \
    .config("spark.sql.warehouse.dir","s3a://hive/") \
    .config("spark.sql.hive.metastore.version","3.1.3") \
    .config("spark.sql.hive.metastore.jars","path") \
    .config("spark.sql.hive.metastore.jars.path","file:///opt/spark/hive/jars/*") \
    .config("spark.sql.legacy.charVarcharAsString", True) \
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
    .config("hive.metastore.warehouse.dir","s3a://hive/") \
    .config("spark.hive.metastore.schema.verification","false") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

# Minimierung des LOGS
spark.sparkContext.setLogLevel("ERROR")
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger("LOGGER")
logger.setLevel(log4jLogger.Level.INFO)



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9d370e67-3fbc-499d-b6b4-63abc7e99ca4;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.3.0 in central
	found io.delta#delta-storage;3.3.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.3.0/delta-spark_2.12-3.3.0.jar ...
	[SUCCESSFUL ] io.delta#delta-spark_2.12;3.3.0!delta-spark_2.12.jar (322ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/3.3.0/delta-storage-3.3.0.jar ...
	[SUCCESSFUL ] io.delta#delta-storage;3.3.0!delta-storage.jar (62ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.9.3/antlr4-runtime-4.9.3.jar ...
	[SUCCESSFUL ] org.antlr#antlr4-runtime;4.9.3!antlr4-runtime.jar (44ms)
:: resolution report :: resolve 1359ms :: artifacts dl 43

In [3]:
# Datenaufnahme
directory = "data/"
ignore_file = ".DS_Store"

# Liste, um die Namen zu sammeln
names = []

# For-Loop, um durch den Verzeichnisinhalt zu iterieren und nur die Namen zu erfassen
for entry in os.scandir(directory):
    # Ignoriere die Datei ".DS_Store"
    if entry.name == ignore_file:
        continue
    # Überprüfen, ob der Dateiname mit ".db" endet
    if entry.name.endswith(".db"):
        # Endung entfernen
        name_without_extension = os.path.splitext(entry.name)[0]
        names.append((name_without_extension,))
    else:
        # Anderenfalls den Originalnamen verwenden
        names.append((entry.name,))

df_file_list = spark.createDataFrame(names, ["name"])

for row in df_file_list.collect():
    DB_NAME = row["name"]
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}")
    spark.sql(f'USE {DB_NAME}')

    # Verwende einen relativen Pfad und konvertiere in einen absoluten Pfad:
    relative_path = f"data/{DB_NAME}.db"
    absolute_path = os.path.abspath(relative_path)
    jdbc_url = f"jdbc:sqlite:{absolute_path}"

    table_names = [row["name"] for row in df_tables.collect()]
    #print(f"{DB_NAME} Started!")
    for TABLE_NAME in table_names:
        #print(f"{TABLE_NAME} Started!")
        df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", TABLE_NAME).option("driver", "org.sqlite.JDBC").load()
        spark.sql(f"DROP DATABASE IF EXISTS {TABLE_NAME} cascade") # Löscht den MetaStore Eintrag
        df.write.format("delta").mode("overwrite").option("path", f's3a://bronze/fidus/{DB_NAME}/{TABLE_NAME}/').saveAsTable(TABLE_NAME)
        #print(f"{TABLE_NAME} Done!")

    print(f"{DB_NAME} SUCCESS!")
print("ALL DONE!")


ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near '.'.(line 1, pos 30)

== SQL ==
CREATE DATABASE IF NOT EXISTS ._.DS_Store
------------------------------^^^


In [4]:
# Join der Datenbanken

# Liste der Datenbanken
databases = [
    "fidus_and",
    "fidus_hbs",
    "fidus_hrs",
    "fidus_mhw",
    "fidus_oaf",
    "fidus_sub",
    "fidus_sue",
    "fidus_wrw"
]

# Liste der Tabellennamen
tables = [
    "behdatei",
    "praxarzt",
    "patdat",
    "adrver",
    "begrun",
    "arbehdat",
    "rechkopf",
    "rechpos",
    "ekktexte",
    "ckeytabs",
    "patinfo"
]

# Wir bauen die temporäre View "joined_view" schrittweise auf.
joined_view = None

# Verschachtelte Schleife: Für jede Datenbank und jede Tabelle
for db in databases:
    for tbl in tables:
        full_table_name = f"{db}.{tbl}"
        print(f"Verarbeite Tabelle: {full_table_name}")

        try:
            if joined_view is None:
                # Initialen Start: Lege die erste Tabelle als temporäre View ab
                spark.sql(f"""
                    CREATE OR REPLACE TEMP VIEW joined_view AS
                    SELECT *
                    FROM {full_table_name}
                """)
                joined_view = "joined_view"
            else:
                spark.sql(f"""
                    CREATE OR REPLACE TEMP VIEW joined_view AS
                    SELECT t1.*, t2.*
                    FROM joined_view t1
                    INNER JOIN {full_table_name} t2
                    ON t1.id = t2.bsnr
                """)
        except Exception as e:
            print(f"Fehler beim Laden/Joinen von {full_table_name}: {e}")

# Finale Abfrage aus der temporären View, die alle Spalten enthalten sollte
df_final = spark.sql("SELECT * FROM joined_view")

Verarbeite Tabelle: fidus_and.behdatei


Hive Session ID = d4fa367a-f18b-4392-8a36-cd3dc9dda52f


Verarbeite Tabelle: fidus_and.praxarzt
Fehler beim Laden/Joinen von fidus_and.praxarzt: [RECURSIVE_VIEW] Recursive view `joined_view` detected (cycle: `joined_view` -> `joined_view`).
Verarbeite Tabelle: fidus_and.patdat
Fehler beim Laden/Joinen von fidus_and.patdat: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `t2`.`bsnr` cannot be resolved. Did you mean one of the following? [`t2`.`kasnr`, `t2`.`stra`, `t2`.`akm`, `t2`.`dmp`, `t2`.`egk`].; line 6 pos 31;
'CreateViewCommand `joined_view`, SELECT t1.*, t2.*
                    FROM joined_view t1
                    INNER JOIN fidus_and.patdat t2
                    ON t1.id = t2.bsnr, false, true, LocalTempView, false
+- 'Project [t1.*, t2.*]
   +- 'Join Inner, (id#214 = 't2.bsnr)
      :- SubqueryAlias t1
      :  +- SubqueryAlias joined_view
      :     +- View (`joined_view`, [kspatnr#182,ksnam#183,ksgebdat#184,ksbehdat#185,kslz1#186,kslz2#187,kslz3#188,kslz4#189,kslz5#190,kslz6#191,kslz7#192,kslz8#1

In [2]:
# Join der Datenbanken

# Liste der Datenbanken
databases = [
    "fidus_and",
    "fidus_hbs",
    "fidus_hrs",
    "fidus_mhw",
    "fidus_oaf",
    "fidus_sub",
    "fidus_sue",
    "fidus_wrw"
]

# Liste der Tabellennamen
tables = ["ekktexte"]

# Wir bauen die temporäre View "joined_view" schrittweise auf.
joined_view = None

# Verschachtelte Schleife: Für jede Datenbank und jede Tabelle
for db in databases:
    for tbl in tables:
        full_table_name = f"{db}.{tbl}"
        print(f"Verarbeite Tabelle: {full_table_name}")

        try:
            if joined_view is None:
                # Initialen Start: Lege die erste Tabelle als temporäre View ab
                spark.sql(f"""
                    CREATE OR REPLACE TEMP VIEW joined_view AS
                    SELECT *
                    FROM {full_table_name}
                """)
                joined_view = "joined_view"
            else:
                spark.sql(f"""
                    CREATE OR REPLACE TEMP VIEW joined_view AS
                    SELECT t1.*, t2.*
                    FROM joined_view t1
                    INNER JOIN {full_table_name} t2
                    ON t1.id = t2.bsnr
                """)
        except Exception as e:
            print(f"Fehler beim Laden/Joinen von {full_table_name}: {e}")

# Finale Abfrage aus der temporären View, die alle Spalten enthalten sollte
df_final = spark.sql("SELECT * FROM joined_view")

Verarbeite Tabelle: fidus_and.ekktexte


Hive Session ID = fe080bee-99c7-452a-88a0-9ab5833c8479


Verarbeite Tabelle: fidus_hbs.ekktexte
Fehler beim Laden/Joinen von fidus_hbs.ekktexte: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `t1`.`id` cannot be resolved. Did you mean one of the following? [`t1`.`nr`, `t2`.`nr`, `t1`.`datum`, `t2`.`datum`, `t1`.`inhalt`].; line 6 pos 23;
'CreateViewCommand `joined_view`, SELECT t1.*, t2.*
                    FROM joined_view t1
                    INNER JOIN fidus_hbs.ekktexte t2
                    ON t1.id = t2.bsnr, false, true, LocalTempView, false
+- 'Project [t1.*, t2.*]
   +- 'Join Inner, ('t1.id = 't2.bsnr)
      :- SubqueryAlias t1
      :  +- SubqueryAlias joined_view
      :     +- View (`joined_view`, [patnr#6,datum#7,nr#8,fduschl#9,inhalt#10])
      :        +- Project [cast(patnr#11 as double) AS patnr#6, cast(datum#12 as string) AS datum#7, cast(nr#13 as double) AS nr#8, cast(fduschl#14 as string) AS fduschl#9, cast(inhalt#15 as string) AS inhalt#10]
      :           +- Project [patnr#11, datum#1

In [5]:
df_final.show(truncate=False)

                                                                                

+--------+--------+----------+----------+-----+-----+------+-----+-----+-----+-----+-----+-----+------+----------------------------+---------+-----+---------+------+---------+------+----------+----------+--------+---------+------------+-----+------+----+------+---------+----------+--------+--------+------+-------+--------+--------+-------+--------+--------+----------+--------+----------+---------------------+
|kspatnr |ksnam   |ksgebdat  |ksbehdat  |kslz1|kslz2|kslz3 |kslz4|kslz5|kslz6|kslz7|kslz8|kslz9|kslz10|kskas                       |ksgs     |ksver|kasnr    |kasgor|scheinart|behart|kvklesdatu|kvkgueltig|regkas  |versichnr|iknr        |verst|verste|ktgr|abrart|abrgebiet|kvkl_zulnr|id      |scheinid|praxis|arzt   |abrkennz|abrjjq  |uhrzeit|wop     |egk     |personengr|dmp     |vschutzvon|kartekas             |
+--------+--------+----------+----------+-----+-----+------+-----+-----+-----+-----+-----+-----+------+----------------------------+---------+-----+---------+------+---------

In [6]:
spark.sql("CREATE DATABASE IF NOT EXISTS fidus_joined")
df_final.write.format("delta").mode("overwrite").option("path", "s3a://silver/fidus/fidus_joined").saveAsTable("fidus_joined.fidus_joined")

                                                                                

In [7]:
spark.sql("show databases").show()

+------------+
|   namespace|
+------------+
|     default|
|   fidus_and|
|   fidus_hbs|
|   fidus_hrs|
|fidus_joined|
|   fidus_kpi|
|   fidus_mhw|
|   fidus_oaf|
|   fidus_sub|
|   fidus_sue|
|   fidus_wrw|
+------------+

