In [None]:
import os
import paramiko
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from datetime import datetime
import re

# init catalog
spark = SparkSession.builder \
    .appName("SFTP to Unity Catalog Bronze") \
    .enableHiveSupport() \
    .getOrCreate()

# sftp
hostname = '147.175.106.193'
port = 2020
username = 'xbachorik'
password = 'AzureDatabricks1523'
remote_dir = '/home/xbachorik/data/weeks'

# dfbs
local_dir = '/dbfs/local_data/bronze'     
spark_dir = 'dbfs:/local_data/bronze'     

# write
catalog = "model_workspace"
schema = "bronze"

os.makedirs(local_dir, exist_ok=True)

# connect
transport = paramiko.Transport((hostname, port))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.chdir(remote_dir)

# Preprocess each file in root directory and read time stamp
for file in sftp.listdir():
    if file.endswith(".csv"):

        table_name = file.replace(".csv", "").replace("-", "_")
        full_table_name = f"{catalog}.{schema}.{table_name}"


        if spark._jsparkSession.catalog().tableExists(full_table_name):
            print(f"Skipping:{full_table_name}")
            continue

        # ingest and save to dfbs amd after that save to schema
        local_path = os.path.join(local_dir, file)
        try:
            sftp.get(file, local_path)
            print(f"Downloaded:{file}")

            spark_path = f"{spark_dir}/{file}"

            df = spark.read.option("header", True).csv(spark_path)
            print(f"Loaded: {file}")

            # Regex to check if the filename contains a date generated by chat gpt
            match = re.search(r'_(\d{4}-\d{2}-\d{2})_to_', file)
            if match:
                temp = match.group(1)
                parsed_datetime = datetime.strptime(temp, "%Y-%m-%d")
                df = df.withColumn("DateTime", lit(parsed_datetime))
            else:
                print(f"Skipping {file}: cant parse date")
                continue

            def clean_column_name(name):
                return name.strip().replace(" ", "_").replace(":", "").replace("-", "_")

            df = df.toDF(*[clean_column_name(c) for c in df.columns])

            # save to bronze schema
            df.write.format("delta").mode("overwrite").saveAsTable(full_table_name)
            print(f"written to schema:{full_table_name}")

        except Exception as e:
            print(f"err {file}: {e}")


sftp.close()
transport.close()
